본문 바로가기
AI System/OpenAI API와 바이브 코딩으로 배우는 AI 서비스 개발

d02 - 06. Audio - 04. voice agent - step3. audio input

by Toddler_AD 2026. 6. 3.

step3_audio_input.py

"""
================================================================================
Step 3: 오디오 입력(전사)만 다루기
================================================================================

이 단계에서 배우는 것:
- 마이크에서 오디오 캡처
- Server VAD로 발화 구간 감지
- 사용자 발화를 텍스트로 전사해서 출력

실행 방법:
    python step3_audio_input.py

예상 동작:
    1. 마이크 음성을 서버로 실시간 전송
    2. 발화 시작/종료 이벤트 표시
    3. 사용자 전사 텍스트 출력

주의:
    - 이 Step은 에이전트 응답 생성/재생을 하지 않습니다.
    - 전체 대화/인터럽트는 Step 4에서 다룹니다.
"""

import asyncio
import base64
import json
import os

from dotenv import load_dotenv
import sounddevice as sd
import websockets


load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

if not OPENAI_API_KEY:
    print("❌ OPENAI_API_KEY를 .env 파일에 설정하세요")
    raise SystemExit(1)


# 오디오 입력 설정:
# - SAMPLE_RATE는 1초를 몇 개의 샘플(sample)로 표현할지 결정합니다.
# - CHANNELS=1은 모노(mono) 입력이라 마이크 한 채널만 사용한다는 뜻입니다.
# - DTYPE="int16"은 PCM16 형식으로 16비트 정수 샘플을 사용한다는 뜻입니다.
# - BLOCKSIZE는 한 번의 콜백(callback)에서 받을 오디오 프레임(frame) 크기입니다.
SAMPLE_RATE = 24000   # Realtime 예제에서 사용하는 24kHz 입력 샘플레이트입니다.
CHANNELS = 1          # 음성을 한 채널로만 받아 전송합니다.
DTYPE = "int16"      # 서버에 보낼 PCM16 오디오 형식과 맞춥니다.
BLOCKSIZE = 960       # 약 40ms 분량(960 / 24000초)의 오디오를 한 번에 처리합니다.


async def open_websocket(url: str, headers: dict):
    """websockets 버전 차이에 따른 헤더 파라미터 호환 처리"""
    # 실행 흐름 요약:
    # 1. 최신 websockets 연결 시그니처를 우선 시도합니다.
    # 2. 구버전이면 대체 시그니처로 연결합니다.
    # 3. 나머지 코드는 버전 차이를 몰라도 됩니다.
    try:
        # 최신 버전용 헤더 인자를 먼저 시도합니다.
        return await websockets.connect(url, additional_headers=headers)
    except TypeError:
        # 구버전 호환 경로입니다.
        return await websockets.connect(url, extra_headers=headers)


class MicrophoneCapture:
    """마이크 오디오를 캡처하여 asyncio 큐로 전달"""

    def __init__(self, loop: asyncio.AbstractEventLoop, sample_rate: int):
        # 실행 흐름 요약:
        # 1. 오디오 콜백은 sounddevice 스레드에서 실행됩니다.
        # 2. 그 결과를 asyncio.Queue로 넘겨 비동기 태스크가 소비합니다.
        # 3. 이렇게 해야 마이크 입력과 WebSocket 전송을 자연스럽게 분리할 수 있습니다.
        self.loop = loop
        self.sample_rate = sample_rate
        # 오디오 콜백 스레드와 asyncio 루프 사이를 이어 주는 버퍼입니다.
        self.audio_queue = asyncio.Queue(maxsize=100)

    def _callback(self, indata, frames, time_info, status):
        # 이 함수는 오디오 장치가 새 프레임을 줄 때마다 매우 자주 호출됩니다.
        if status:
            print(f"[⚠️] 마이크 상태: {status}")

        # RawInputStream이 준 바이트를 그대로 서버 전송용 큐에 넘깁니다.
        audio_bytes = bytes(indata)
        self.loop.call_soon_threadsafe(self._enqueue, audio_bytes)

    def _enqueue(self, audio_bytes: bytes):
        # 이 함수는 이벤트 루프 스레드에서 실행되므로 asyncio.Queue에 안전하게 넣을 수 있습니다.
        if self.audio_queue.full():
            try:
                # 큐가 꽉 차면 가장 오래된 프레임을 버려 실시간성을 유지합니다.
                self.audio_queue.get_nowait()
            except asyncio.QueueEmpty:
                pass

        try:
            # 최신 마이크 프레임을 큐에 적재합니다.
            self.audio_queue.put_nowait(audio_bytes)
        except asyncio.QueueFull:
            pass

    async def start_capture(self, stop_event: asyncio.Event):
        # 실행 흐름 요약:
        # 1. 입력 스트림을 열어 마이크 콜백을 활성화합니다.
        # 2. 실제 오디오 적재는 콜백이 담당합니다.
        # 3. 이 코루틴은 종료 신호를 감시하며 컨텍스트를 유지합니다.
        # 사용자에게 마이크 입력이 시작됐음을 안내합니다.
        print("[✅] 마이크 캡처 시작")
        print("    💡 말하면 발화 이벤트/전사가 표시됩니다\n")

        # sounddevice 입력 스트림은 별도 오디오 콜백으로 계속 데이터를 전달합니다.
        with sd.RawInputStream(
            samplerate=self.sample_rate,
            channels=CHANNELS,
            dtype=DTYPE,
            blocksize=BLOCKSIZE,
            callback=self._callback,
        ):
            while not stop_event.is_set():
                # 오디오 캡처 자체는 콜백이 담당하므로 이 루프는 종료 신호만 감시합니다.
                await asyncio.sleep(0.1)


async def send_microphone_to_server(ws, mic_capture: MicrophoneCapture, stop_event: asyncio.Event):
    """마이크 오디오를 input_audio_buffer.append 이벤트로 서버 전송"""
    # 실행 흐름 요약:
    # 1. 마이크 큐에서 PCM 프레임을 꺼냅니다.
    # 2. base64 문자열로 인코딩합니다.
    # 3. input_audio_buffer.append 이벤트로 서버에 계속 보냅니다.
    while not stop_event.is_set():
        try:
            # 짧은 타임아웃으로 큐를 확인해 종료 이벤트에 유연하게 반응합니다.
            audio_bytes = await asyncio.wait_for(mic_capture.audio_queue.get(), timeout=0.2)
        except asyncio.TimeoutError:
            continue

        # 서버가 요구하는 형태에 맞게 PCM 바이트를 base64 문자열로 인코딩합니다.
        message = {
            "type": "input_audio_buffer.append",
            "audio": base64.b64encode(audio_bytes).decode("utf-8"),
        }
        await ws.send(json.dumps(message))


async def receive_transcription_events(ws, stop_event: asyncio.Event):
    """발화 시작/종료 및 전사 이벤트만 출력"""
    # 실행 흐름 요약:
    # 1. 서버가 보낸 이벤트를 계속 수신합니다.
    # 2. speech_started / speech_stopped / transcription.completed만 출력합니다.
    # 3. 오류가 오면 stop_event를 세우고 루프를 종료합니다.
    async for message in ws:
        # WebSocket 메시지를 JSON 이벤트로 변환해 종류별로 처리합니다.
        event = json.loads(message)
        event_type = event.get("type")

        if event_type == "input_audio_buffer.speech_started":
            print("\n[🎤 사용자] 발화 시작")

        elif event_type == "input_audio_buffer.speech_stopped":
            print("[🎤 사용자] 발화 종료")

        elif event_type == "conversation.item.input_audio_transcription.completed":
            transcript = event.get("transcript", "").strip()
            if transcript:
                # 전사가 비어 있지 않을 때만 화면에 보여줍니다.
                print(f"[💬 사용자] {transcript}\n")

        elif event_type == "error":
            error_msg = event.get("error", {}).get("message", "알 수 없는 오류")
            print(f"\n[❌] 에러: {error_msg}")
            stop_event.set()

        if stop_event.is_set():
            # 종료 신호가 생기면 이벤트 수신 루프도 빠져나옵니다.
            break


async def main():
    # 실행 흐름 요약:
    # 1. Realtime 세션을 "오디오 입력/전사 전용"으로 설정합니다.
    # 2. Server VAD(Voice Activity Detection)와 Whisper 전사를 켭니다.
    # 3. 마이크 캡처, 서버 전송, 이벤트 수신을 동시에 실행합니다.
    # 4. 이 단계에서는 create_response=False로 두어 에이전트 응답은 만들지 않습니다.
    # 단계 제목을 출력해 지금 실행 중인 실습을 구분합니다.
    print("\n" + "=" * 60)
    print("  Step 3: 오디오 입력(전사)만 다루기")
    print("=" * 60 + "\n")

    url = "wss://api.openai.com/v1/realtime?model=gpt-realtime-mini"
    headers = {
        "Authorization": f"Bearer {OPENAI_API_KEY}",
    }

    print("[1/3] WebSocket 연결 중...")
    async with (await open_websocket(url, headers)) as ws:
        print("[✅] 연결 완료\n")

        # 세션 생성 이벤트는 연결 직후 한 번 소비합니다.
        await ws.recv()
        print("[2/3] 세션 생성 완료\n")

        print("[3/3] 세션 설정 (오디오 입력/전사 전용)...")
        session_update = {
            "type": "session.update",
            "session": {
                "type": "realtime",
                # Step 3은 전사 확인용 실습이라는 점을 명시합니다.
                "instructions": "이 세션은 사용자 음성 입력 전사 확인용입니다.",
                # 모델 텍스트 응답은 만들지 않지만 전사 텍스트 이벤트는 받기 위해
                # 출력 모달리티(modality)는 text로 유지합니다.
                "output_modalities": ["text"],
                "audio": {
                    "input": {
                        "format": {
                            "type": "audio/pcm",
                            "rate": SAMPLE_RATE,
                        },
                        "transcription": {
                            # 사용자 음성을 Whisper로 전사합니다.
                            "model": "whisper-1",
                            "language": "ko",
                        },
                        "noise_reduction": {
                            # 원거리(far field) 환경을 가정한 노이즈 감소를 사용합니다.
                            "type": "far_field",
                        },
                        "turn_detection": {
                            # 서버 VAD(Voice Activity Detection)가 발화 시작/끝을 판단합니다.
                            "type": "server_vad",
                            "threshold": 0.7,
                            "silence_duration_ms": 900,
                            "prefix_padding_ms": 300,
                            # Step 3은 전사만 확인하므로 에이전트 응답 생성은 끕니다.
                            "create_response": False,
                            "interrupt_response": False,
                        },
                    },
                },
            },
        }
        await ws.send(json.dumps(session_update))

        while True:
            response = await ws.recv()
            event = json.loads(response)
            event_type = event.get("type", "")

            if event_type == "session.updated":
                print("[✅] 설정 완료\n")
                break

            if event_type == "error":
                error_msg = event.get("error", {}).get("message", "알 수 없는 오류")
                print(f"[❌] 설정 에러: {error_msg}\n")
                return

        print("=" * 60)
        print("  🎤 마이크로 말하세요!")
        print("  💡 이 단계에서는 사용자 전사만 출력합니다")
        print("  ⏹️  종료: Ctrl+C")
        print("=" * 60 + "\n")

        # 이후 세 개의 동시 작업이 함께 돌아갑니다.
        stop_event = asyncio.Event()
        loop = asyncio.get_running_loop()
        mic_capture = MicrophoneCapture(loop, SAMPLE_RATE)

        # 마이크 캡처, 서버 전송, 서버 이벤트 수신을 병렬로 실행합니다.
        mic_task = asyncio.create_task(mic_capture.start_capture(stop_event))
        send_task = asyncio.create_task(send_microphone_to_server(ws, mic_capture, stop_event))
        recv_task = asyncio.create_task(receive_transcription_events(ws, stop_event))

        try:
            await asyncio.gather(mic_task, send_task, recv_task)
        except KeyboardInterrupt:
            # Ctrl+C가 들어오면 세 태스크에 모두 종료 신호를 전파합니다.
            print("\n\n[⏹️] 종료 중...")
            stop_event.set()
            mic_task.cancel()
            send_task.cancel()
            recv_task.cancel()
            await asyncio.gather(mic_task, send_task, recv_task, return_exceptions=True)

        print("\n" + "=" * 60)
        print("  연결 종료")
        print("=" * 60 + "\n")


if __name__ == "__main__":
    try:
        # asyncio 이벤트 루프에서 메인 코루틴을 실행합니다.
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n[⏹️] 프로그램을 종료했습니다.")