さー AI祭りです

もっともっと、OCIでAIを使い倒して課金されまくりましょうね。

ということで、定番の音声ファイルから文字起こしにチャレンジします。

OCI AI 文字起こし なんて検索しても、普通にコンソールで文字起こししてるサイトばっかですが、そんなものエンドユーザーに提供しにくいですよね。

なので、定番のstreamlit で、良い感じのGUIで提供してしまいましょう。

改造すれば、SSO認証なんかも簡単に出来るので、会社内専用 文字起こし サイトが作れますよ。


流石に この手のサービスは、有料が多いのですが、OCIでも出来ちゃいます。

ちょっと、課金されるし、結果は、ムズイJSONで出力されるので、整形する必要がありますね。

でも、そこは、さらに AI になんとかしてもらって 良い感じの結果にしてもらっちゃう、AIだのみの方法です。

OCI 音声 AIは、文字列から音声にするのと、音声から文字列にするサービスとなっていて、今回は、音声から文字列にするサービスを使います。

日本語に対応していないのかと思っていたのですが、Open AI の Whisperなるモデルを利用出来るので、マルチリンガルだそうです。

ということで、実装したのが、これです。

音声ファイルをドロップして、待つこと数分から数十分

どうですか。って、元音声ファイルを聞かないとわかりませんよね。

適当にネットに落ちてたサンプルで試したんですが、めっちゃ良い結果でした。

98% ぐらいの完成度でしたね。

しかも、結果をAIに解釈させたので、元の音声よりまともな文章になっています。

これ使えますよマジで

ということで、ソースです。

pipで、ffmpeg-pythonとか、oci等の依存ライブラリを入れてね

 

whisper3.py

import streamlit as st
 
import time
import json
import ffmpeg
import tempfile
import os
 
import oci
from oci.config import from_file
from oci.object_storage import ObjectStorageClient
from oci.ai_speech import AIServiceSpeechClient
from oci.ai_speech.models import CreateTranscriptionJobDetails, ObjectListInlineInputLocation, ObjectLocation, OutputLocation, TranscriptionModelDetails
from oci.generative_ai_inference import GenerativeAiInferenceClient
from oci.generative_ai_inference.models import (
    ChatDetails,
    CohereChatRequest,
    OnDemandServingMode
)
from datetime import datetime
 
config = oci.config.from_file("~/.oci/config", "DEFAULT")
 
object_storage = ObjectStorageClient(config)
namespace = object_storage.get_namespace().data
COMPARTMENT_ID = "ocid1.compartment.oc1..aaaaaaaamgmw22hogecwqnunirb3urhoger4ihdgoilkdjkv2sabokaq5svc"
TOKENBLOCKSIZE = 2000  # トークンのブロックサイズ
 
config = oci.config.from_file("~/.oci/config", "DEFAULT")
 
# 音声情報用バケット関連
object_storage = ObjectStorageClient(config)
namespace = object_storage.get_namespace().data
bucketnm="whisper"
 
# AIサービス音声クライアントを作成
speech_client = AIServiceSpeechClient(config)
 
# Generative AIクライアントを作成
aiclient = GenerativeAiInferenceClient(config=config)
MODEL_DEF = "cohere.command-a-03-2025"
 
st.title('文字起こし V.3')
 
uploaded_file = st.file_uploader(
    "音声/動画ファイルをドラッグ&ドロップまたは選択してください", type=['mp3', 'wav', 'mp4', 'm4a', 'aac', 'oga', 'wma', 'wmv', 'ogg', 'mov', '3gp', '3g2'], label_visibility="collapsed")
 
status_placeholder = st.empty()
 
# ファイルアップロード
def upload_contents_to_bucket(file_content,object_name):
 
    try:
        res:oci.response.Response = object_storage.put_object(namespace_name=namespace, bucket_name=bucketnm,
                                object_name=object_name, put_object_body=file_content)
        if res.status == 200:
            print(f"File {object_name} uploaded successfully.")
            return True
        else:
            print(f"Failed to upload file {object_name}. Status code: {res.status}")
            return False
    except Exception as e:
        print(f"An error occurred while uploading file {object_name}: {e}")
        return False    
        
# ファイル削除
def delete_file_from_bucket(object_name):
 
    res:oci.response.Response = object_storage.delete_object(namespace_name=namespace, bucket_name=bucketnm, object_name=object_name)
 
    if res.status == 204:
        print(f"Object {object_name} deleted successfully.")
        return True
    else:
        print(f"Failed to delete object {object_name}. Status code: {res.status}")
        return False
 
# 音声からテキストへ変換
def speech2text(object_name, poolingwait=10):
 
    start_time = datetime.now()
 
    # ジョブ詳細定義
    create_transcription_job_details : CreateTranscriptionJobDetails = CreateTranscriptionJobDetails(
        compartment_id=COMPARTMENT_ID,
        display_name="PythonSDKSampleTranscriptionJob",
        description="Transcription job created by Python SDK",
        input_location=ObjectListInlineInputLocation(
            location_type="OBJECT_LIST_INLINE_INPUT_LOCATION",
            object_locations=[ObjectLocation(
                namespace_name=namespace,
                bucket_name=bucketnm,
                object_names=[object_name])]),
        output_location=OutputLocation(
            namespace_name=namespace, bucket_name=bucketnm),
        model_details=TranscriptionModelDetails(
            model_type="WHISPER_MEDIUM", language_code="ja")
    )
 
    # ジョブ作成
    res:oci.response.Response = speech_client.create_transcription_job( create_transcription_job_details)
    tjob : oci.ai_speech.models.TranscriptionJob = res.data
 
    st.session_state.job_id = tjob.id
 
    status_message = ""
 
    try:
 
        # ジョブ完了ホーリング
        while True:
            time.sleep(poolingwait)
 
            res = speech_client.get_transcription_job(tjob.id)
            tjob = res.data
 
            current_time = datetime.now()
            processing_time = current_time - start_time
            processing_time_str = str(processing_time)
 
            new_status_message = f"処理状況: {tjob.lifecycle_state}  経過時間: {processing_time_str}"
            if new_status_message != status_message:
                status_placeholder.write("更新中...")
                time.sleep(1)
                status_placeholder.empty()
 
            status_placeholder.write(new_status_message)
            status_message = new_status_message
 
            if tjob.lifecycle_state in ["SUCCEEDED"]:
                # 完了
                print("Transcription job finished.")
                res = speech_client.list_transcription_tasks(tjob.id)
                ttasklist: oci.ai_speech.models.TranscriptionTaskCollection = res.data
                tasksummary : oci.ai_speech.models.TranscriptionTaskSummary
 
                result = []
                for tasksummary in ttasklist.items:
                    res = speech_client.get_transcription_task(tjob.id,tasksummary.id)
                    ttask: oci.ai_speech.models.TranscriptionTask = res.data
                    outputlocation = ttask.output_location
                    outbucketname = outputlocation.bucket_name
                    outnamespace = outputlocation.namespace_name
                    outobjects = outputlocation.object_names
                    for outobject in outobjects:
                        print(f"Output object: {outobject} in bucket {outbucketname} namespace {outnamespace}")
 
                        res = object_storage.get_object(namespace_name=outnamespace, bucket_name=outbucketname, object_name=outobject)
                        responce : oci.response.Response = res
                        if( responce.status != 200 ):
                            print(f"Failed to get object {outobject}. Status code: {responce.status}")
                            continue
                        outputresut = json.load(responce.data.raw)
                        #結果を削除
                        object_storage.delete_object(namespace_name=outnamespace, bucket_name=outbucketname, object_name=outobject)
 
                        #結果出力
                        result.append(outputresut)
 
                return result
 
            else :
                if tjob.lifecycle_state in ["FAILED", "CANCELING", "CANCELED"]:
                    # 失敗
                    print(f"Transcription job failed with status: {tjob.lifecycle_state}")
                    return None
 
        return None
    finally:
        print("Transcription job ended.")
        speech_client.delete_transcription_job(tjob.id)
 
# 音声からのテキスト変換結果を解析
def analyzetalk( textjson, blocksize=500 ):
 
    # トークンの間を区切る
    talkarray = []
    rootnode = textjson[0]
    if( rootnode["status"] == "SUCCESS" ) :
        for node in rootnode["transcriptions"]: 
 
            lastEndTime = 0.0
            resulttalk = ""
 
            for token in node["tokens"]:
                ttype = token["type"]
                if ttype != "WORD":
                    continue
                word = token["token"]
                confidence = float(token["confidence"])
                if( confidence < 0.1 ):
                    word = "?"
 
                startTime = float(token["startTime"].rstrip("s"))
                endTime = float(token["endTime"].rstrip("s"))
 
                if( lastEndTime > 0.0 and lastEndTime < startTime ):
                    #間がある場合、次のトークに分ける
                    talkarray.append(resulttalk)
                    resulttalk = ""
                # 次のトークンへ更新
                lastEndTime = endTime
                resulttalk += word
 
    # LLMに渡すトークンに分割する
    talkblocks = []
    block = []
    for talk in talkarray:
        #現在のブロックサイズ計算
        blocklength = 0
        for intalk in block:
            blocklength += len(intalk)
 
        if( blocklength > blocksize ) :
            # ブロックが500文字を超えたら、ブロックを保存して新しいブロックを開始
            talkblocks.append(block)
            block = []
 
        # ブロックにトークを追加
        block.append(talk)
 
    # 最後のブロックを追加
    if block:
        talkblocks.append(block)
 
    return talkblocks
 
# 結果テキストをAIで、自然言語へ変換する
def parse_talking( talkblocks ) :
 
    talkblock = " ".join(talkblocks)  # 最初のブロックを使用
 
    msg = '''
音声から文字列に変換された結果を以下に記載しました。この文字列を自然に言葉に変換して下さい。
結果コンテンツだけでを出力し、コメントや説明は不要です。
'''
#    msg = "音声から文字列に変換された結果を以下に記載しました。この文字列を自然に言葉に変換して下さい。\n"
    msg += talkblock
 
    # チャットリクエストの作成
    chat_request = CohereChatRequest(
        message=msg,
        max_tokens=3000,
        temperature=0.9,
        is_echo=True,
        is_stream=False
    )
 
    # サービングモードの指定(利用モデルIDを正しく指定)
    serving_mode = OnDemandServingMode(
        model_id=MODEL_DEF
    )
 
    # チャット詳細情報をまとめる
    chat_details = ChatDetails(
        compartment_id=COMPARTMENT_ID,
        chat_request=chat_request,
        serving_mode=serving_mode
    )
 
    # チャットAPIを呼び出す
    response = aiclient.chat(chat_details)
 
    # ここに音声からのテキスト変換結果を解析するロジックを追加
    return response.data.chat_response.text
 
# ffmpeg 音声形式情報取得
def audiofile_info(audiofile):
    try:
        probe = ffmpeg.probe(audiofile)
        format_info = probe['format']
        duration = float(format_info['duration'])
        bitrate = format_info['bit_rate']
        return format_info, duration, bitrate
    except ffmpeg.Error as e:
        print(f"Error probing audio file: {e}")
        return None, None
 
# ffmpeg 音声形式対応コーデックに変換
def convert_audio(input_file, output_file, codec='libopus', bitrate='64k'):
    try:
        (
            ffmpeg
            .input(input_file)
            .output(output_file, vn=None, acodec=codec, audio_bitrate=bitrate)
            .run(overwrite_output=True) # 既存の出力ファイルを上書き
        )
        print(f"'{input_file}' を '{output_file}' に変換しました。")
    except ffmpeg.Error as e:
        print(f"エラーが発生しました: {e.stderr.decode('utf8')}")
        raise
    except FileNotFoundError:
        print("FFmpegがシステムPATHに見つかりません。FFmpegをインストールし、PATHを設定してください。")
        raise
    except Exception as e:
        print(f"予期せぬエラーが発生しました: {e}")
        raise
 
 
if uploaded_file is not None:
 
    status_placeholder.write("ファイルを解析しています。")
 
    # 一時ファイルを作成してUploadedFileの内容を書き込む
    temp_file_path = ""
    with tempfile.NamedTemporaryFile(delete=False, suffix="whisper") as tmp_file:
        with st.spinner('処理中です。しばらくお待ちください...'):
            tmp_file.write(uploaded_file.read())
            temp_file_path = tmp_file.name
 
    #音声ファイル情報取得
    convertfile=False
    convertfilename=""
    fomatinfo, duration, bitrate = audiofile_info(temp_file_path)
    if( (fomatinfo['format_name'] in ["mp3","wav","ogg"]) == False ):
        with st.spinner('音声ファイルへ変換中です。'):
            convertfilename = temp_file_path+".ogg"
            convert_audio(temp_file_path, convertfilename)
            convertfile = True
 
    ##一時ファイル削除
    os.remove(temp_file_path)
 
    # 対象音声情報を決定する
    if convertfile == True:
        with open(convertfilename, 'rb') as f:
            file_content = f.read()
        os.remove(convertfilename)
    else:
        file_content = uploaded_file.getvalue()
 
    status_placeholder.write("文字起こしを開始します。")
 
    object_name = uploaded_file.name
    if( upload_contents_to_bucket(file_content,object_name) == False ):
        st.error(f"ファイル '{object_name}' のアップロードに失敗しました。")
    else:
        st.session_state.uploaded_file_name = object_name
 
        with st.spinner('処理中です。しばらくお待ちください...'):
            textjson = speech2text(object_name)
            delete_file_from_bucket(object_name)
 
            if textjson is None:
                # 失敗
                st.error("文字起こしに失敗しました。")
                status_placeholder.write("失敗しました。")
            else:
                talkblocks = analyzetalk(textjson, TOKENBLOCKSIZE)
 
                status_placeholder.write("自然言語へ変換中...")
 
                for block in talkblocks:
                    finalresult = parse_talking(block)
                    st.markdown(finalresult, unsafe_allow_html=True)
 
                status_placeholder.empty()
#                st.session_state.final_result = finalresult
                st.success("文字起こしと自然言語変換が完了しました。結果を表示しています。")
 
else:
    st.write("音声/動画ファイルをアップロードして、処理を開始してください。")
 

 

CompartMentは、自身のに変えて下さい。

オブジェクトストレージを使うので、バケットwhisperを作成しておいて下さい。

で、起動は、

streamlit run whisper3.py --server.port 8501 --server.enableCORS false

です。

当然ながら、streamlit 環境と OCI SDK 環境は、構築して下さい。

Windowsだったら、WSL+Ubuntu24.04 + python3 + OCI CLI + OCI python SDK で、構築出来ますよ。

 

Joomla templates by a4joomla