본문 바로가기
AI System/OpenAI API와 바이브 코딩으로 배우는 AI 서비스 개발

d02 - 06. Audio - 04. voice agent - step2. audio output

by Toddler_AD 2026. 6. 3.

step2_audio_output.py

"""
================================================================================
Step 2: 음성 출력 추가하기
================================================================================

이 단계에서 배우는 것:
- 오디오 스트리밍 받기
- PCM16 오디오 재생
- base64 디코딩

실행 방법:
    python step2_audio_output.py

예상 동작:
    1. Step 1의 기능 + 음성 출력
    2. 에이전트의 음성 응답을 스피커로 재생
"""

import asyncio
import base64
import json
import os
import queue
import threading
from dotenv import load_dotenv
import websockets
import sounddevice as sd


# .env 파일에서 API 키 로드
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

if not OPENAI_API_KEY:
    print("❌ OPENAI_API_KEY를 .env 파일에 설정하세요")
    exit(1)


async def open_websocket(url: str, headers: dict):
    """websockets 버전 차이에 따른 헤더 파라미터 호환 처리"""
    # 실행 흐름 요약:
    # 1. 최신 websockets 시그니처로 연결을 시도합니다.
    # 2. 실패 시 구버전 시그니처로 연결합니다.
    # 3. 호출부는 버전 차이를 신경 쓰지 않아도 됩니다.
    try:
        # 최신 websockets 버전은 additional_headers를 사용합니다.
        return await websockets.connect(url, additional_headers=headers)
    except TypeError:
        # 구버전에서는 extra_headers를 사용하므로 예외 시 이 경로로 재시도합니다.
        return await websockets.connect(url, extra_headers=headers)


# 오디오 출력 설정:
# - SAMPLE_RATE는 1초를 몇 개의 샘플(sample)로 표현할지 결정합니다.
# - CHANNELS=1은 모노(mono) 출력이라 스피커로 한 채널 오디오를 재생한다는 뜻입니다.
# - DTYPE="int16"은 PCM16 형식으로 16비트 정수 샘플을 사용한다는 뜻입니다.
SAMPLE_RATE = 24000   # Realtime 예제에서 사용하는 24kHz 출력 샘플레이트입니다.
CHANNELS = 1          # 음성을 한 채널로만 재생합니다.
DTYPE = "int16"      # 서버에서 받은 PCM16 오디오 형식과 맞춥니다.


class AudioPlayer:
    """
    오디오를 재생하는 간단한 클래스
    
    동작 원리:
    1. 네트워크에서 받은 오디오 데이터를 큐에 저장
    2. 별도 스레드에서 큐의 데이터를 꺼내 스피커로 재생
    """
    
    def __init__(self, sample_rate: int):
        # 실행 흐름 요약:
        # 1. 네트워크 스레드 대신 큐를 통해 오디오 청크(chunk)를 받습니다.
        # 2. 워커 스레드가 큐에서 청크를 꺼내 순서대로 재생합니다.
        # 3. 종료 시에는 stop_event와 None 센티널(sentinel)로 워커를 정리합니다.
        # 서버가 보내는 PCM16 오디오를 같은 샘플레이트로 재생해야 속도와 음정이 맞습니다.
        self.sample_rate = sample_rate
        # 네트워크 수신 속도와 스피커 재생 속도를 분리하기 위해 큐를 둡니다.
        self.audio_queue = queue.Queue(maxsize=100)
        # 실제 스피커 출력은 백그라운드 스레드에서 계속 처리합니다.
        self.worker = threading.Thread(target=self._play_audio, daemon=True)
        self.started = False
        self.stop_event = threading.Event()
        # 현재 stream.write 중인지 추적해 종료 시점을 더 정확히 판단합니다.
        self._is_playing = False
        self._state_lock = threading.Lock()
    
    def start(self):
        """재생 시작"""
        # 이 메서드는 "출력 장치를 미리 준비하고 워커를 켠다"는 의미입니다.
        if not self.started:
            self.started = True
            # 워커 스레드를 한 번만 시작해 이후 add_audio 호출을 처리하게 합니다.
            self.worker.start()
    
    def add_audio(self, audio_bytes: bytes):
        """재생할 오디오 추가"""
        # 이 메서드는 네트워크에서 받은 오디오 청크를 로컬 재생 버퍼에 쌓는 역할입니다.
        if self.stop_event.is_set():
            # 종료 절차가 시작된 뒤에는 새 오디오를 받지 않습니다.
            return

        if self.audio_queue.full():
            try:
                # 큐가 가득 차면 가장 오래된 청크를 버려 전체 지연이 무한히 늘지 않게 합니다.
                self.audio_queue.get_nowait()
            except queue.Empty:
                pass
        
        try:
            # 재생할 PCM 바이트를 큐에 밀어 넣습니다.
            self.audio_queue.put_nowait(audio_bytes)
        except queue.Full:
            pass
    
    def _play_audio(self):
        """백그라운드 스레드에서 오디오 재생"""
        # 실행 흐름 요약:
        # 1. 출력 스트림을 연 뒤 큐에서 청크를 기다립니다.
        # 2. 청크가 오면 stream.write로 즉시 재생합니다.
        # 3. None 또는 stop_event가 오면 루프를 끝냅니다.
        try:
            # sounddevice 출력 스트림을 열어 큐에 쌓인 PCM 청크를 순서대로 씁니다.
            with sd.RawOutputStream(
                samplerate=self.sample_rate,
                channels=CHANNELS,
                dtype=DTYPE,
            ) as stream:
                while True:
                    # 큐에서 다음 오디오 청크를 기다립니다.
                    audio_data = self.audio_queue.get()
                    if audio_data is None or self.stop_event.is_set():  # 종료 신호
                        break
                    try:
                        with self._state_lock:
                            # 지금부터 실제 스피커 출력이 진행 중임을 기록합니다.
                            self._is_playing = True
                        stream.write(audio_data)
                    except Exception as e:
                        # 종료 직전 스트림 상태 경쟁으로 생기는 경고는 무시
                        if self.stop_event.is_set() and "Stream is stopped" in str(e):
                            break
                        raise
                    finally:
                        with self._state_lock:
                            # 한 청크 재생이 끝나면 재생 중 플래그를 내립니다.
                            self._is_playing = False
        except Exception as e:
            if not self.stop_event.is_set():
                print(f"[❌] 오디오 재생 오류: {e}")

    def has_pending_audio(self) -> bool:
        """현재 재생 중이거나 큐에 남은 오디오가 있으면 True"""
        # Step 2의 핵심은 response.done과 로컬 재생 완료를 구분하는 것입니다.
        # 그래서 "지금 쓰는 중인지"와 "큐에 남아 있는지"를 함께 봅니다.
        with self._state_lock:
            currently_playing = self._is_playing
        # 워커가 실제로 쓰는 중이거나 아직 큐에 남은 청크가 있으면 끝난 것이 아닙니다.
        return currently_playing or (not self.audio_queue.empty())
    
    def close(self):
        """재생 종료"""
        # 실행 흐름 요약:
        # 1. 새 오디오 수신을 막습니다.
        # 2. 남은 오디오를 정리합니다.
        # 3. 워커가 빠져나오도록 종료 신호를 넣습니다.
        # 4. 스레드 종료를 기다립니다.
        if not self.started:
            return

        # 더 이상 재생하지 않도록 종료 플래그를 먼저 세웁니다.
        self.stop_event.set()

        # 남아 있는 오디오를 비워 종료 시 write 경쟁을 줄임
        while True:
            try:
                self.audio_queue.get_nowait()
            except queue.Empty:
                break

        # 워커가 반드시 빠져나오도록 종료 신호를 큐에 넣음
        try:
            self.audio_queue.put_nowait(None)
        except queue.Full:
            try:
                # 큐가 꽉 찼다면 하나를 비운 뒤 종료 신호를 넣습니다.
                self.audio_queue.get_nowait()
                self.audio_queue.put_nowait(None)
            except (queue.Empty, queue.Full):
                pass

        # 워커가 정리될 시간을 조금 주고 종료합니다.
        self.worker.join(timeout=1.5)
        self.started = False


async def main():
    """
    Realtime API에 연결하여 음성 대화를 수행합니다.
    """
    # 실행 흐름 요약:
    # 1. AudioPlayer를 먼저 시작합니다.
    # 2. Realtime 세션을 audio 출력 모달리티(modality)로 설정합니다.
    # 3. 사용자 텍스트를 보내고 response.create를 호출합니다.
    # 4. 텍스트 델타(delta)와 오디오 델타를 동시에 처리합니다.
    # 5. response.done 이후에도 로컬 재생 큐가 빌 때까지 기다립니다.
    # 단계 제목을 먼저 출력해 실습 맥락을 보여줍니다.
    print("\n" + "=" * 60)
    print("  Step 2: 음성 출력 추가")
    print("=" * 60 + "\n")
    
    # WebSocket URL 설정
    url = "wss://api.openai.com/v1/realtime?model=gpt-realtime-mini"
    
    # 헤더에 API 키 포함
    headers = {
        "Authorization": f"Bearer {OPENAI_API_KEY}",
    }

    print("[1/6] 오디오 플레이어 시작...")

    # 네트워크 수신과 독립적으로 동작할 오디오 플레이어를 미리 켭니다.
    player = AudioPlayer(SAMPLE_RATE)
    player.start()
    print("\n[✅] 오디오 플레이어 시작\n")

    print("[2/6] WebSocket 연결 중...")
    
    # Realtime 세션을 열고 이후 이벤트를 같은 WebSocket으로 주고받습니다.
    async with (await open_websocket(url, headers)) as ws:
        print("[✅] WebSocket 연결 완료\n")
        
        # 세션 생성 이벤트 받기
        # 이 단계에서는 session.created가 왔는지만 확인하면 충분합니다.
        response = await ws.recv()
        event = json.loads(response)
        print(f"[3/6] 세션 생성: {event['type']}\n")
        
        # 이번 단계는 오디오 출력을 써야 하므로 output_modalities를 audio로 설정합니다.
        print("[4/6] 세션 설정 전송 (오디오 모드)...")
        session_update = {
            "type": "session.update",
            "session": {
                "type": "realtime",
                "instructions": "당신은 친절한 한국어 AI 비서입니다.",
                "output_modalities": ["audio"],
                "audio": {
                    "output": {
                        "format": {
                            # 서버가 PCM16 24kHz로 내려주도록 지정합니다.
                            "type": "audio/pcm",
                            "rate": SAMPLE_RATE,
                        },
                        # 예제에서 사용할 음성 보이스를 선택합니다.
                        "voice": "alloy",
                    },
                },
            }
        }
        await ws.send(json.dumps(session_update))
        
        # 설정이 반영되기 전에는 바로 대화를 시작하지 않고 확인 응답을 기다립니다.
        # 특히 오디오 포맷과 보이스(voice)가 반영되었는지 확인한 뒤에 질문을 보내야 합니다.
        while True:
            response = await ws.recv()
            event = json.loads(response)
            event_type = event.get("type", "")

            if event_type == "session.updated":
                print(f"\n[✅] 세션 설정 완료: {event_type}\n")
                break

            if event_type == "error":
                # 오디오 세션 설정 실패 시 플레이어를 닫고 종료합니다.
                print(f"[❌] 세션 설정 에러: {event.get('error')}\n")
                player.close()
                return
        
        # 대화 시작
        print("[5/6] 대화 시작")
        print("-" * 60)
        
        # Step 2에서는 텍스트 질문을 보내고 음성 응답을 받아 재생합니다.
        user_message = "안녕하세요! 간단하게 자기소개 해주세요."
        print(f"\n👤 사용자: {user_message}")
        
        # 사용자 메시지를 대화 아이템으로 추가합니다.
        conversation_item = {
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "user",
                "content": [
                    {
                        "type": "input_text",
                        "text": user_message
                    }
                ]
            }
        }
        await ws.send(json.dumps(conversation_item))
        
        # 메시지 등록 후 별도 response.create를 보내 실제 응답 생성을 시작합니다.
        response_create = {"type": "response.create"}
        await ws.send(json.dumps(response_create))

        # 이후 로그 정리를 위한 상태 값들입니다.
        # audio_started: 실제 오디오 청크를 한 번이라도 받았는가?
        # audio_stream_finished: 서버 기준 오디오 스트림이 끝났는가?
        # assistant_line_open: 콘솔에서 한 줄 출력이 열려 있는가?
        # saw_audio_transcript: 오디오 전사 텍스트를 우선 출력했는가?
        audio_started = False
        audio_stream_finished = False
        assistant_line_open = False
        saw_audio_transcript = False
        
        # 응답 받기
        print("\n[6/6] 에이전트 응답 중... (음성 재생)\n")

        def close_assistant_line():
            # 텍스트를 한 줄로 이어서 출력하다가 이벤트 경계에서 줄을 닫습니다.
            nonlocal assistant_line_open
            if assistant_line_open:
                print()
                assistant_line_open = False
        
        while True:
            # 서버가 보내는 각 이벤트를 하나씩 해석합니다.
            response = await ws.recv()
            event = json.loads(response)
            event_type = event.get("type")
            
            # 텍스트 응답 (참고용)
            if event_type in {
                "response.output_audio_transcript.delta",
                "response.audio_transcript.delta",
            }:
                delta = event.get("delta", "")
                if delta and not assistant_line_open:
                    # 첫 텍스트 조각이 오면 에이전트 접두어를 한 번만 출력합니다.
                    print("[🤖 에이전트] ", end="", flush=True)
                    assistant_line_open = True
                print(delta, end="", flush=True)
                # 오디오 전사 텍스트가 있으면 이것을 우선 화면에 보여줍니다.
                saw_audio_transcript = True

            elif event_type in {"response.output_text.delta", "response.text.delta"}:
                delta = event.get("delta", "")
                # 오디오 전사가 오지 않는 경우에만 텍스트 델타를 출력
                if delta and not saw_audio_transcript:
                    if not assistant_line_open:
                        print("[🤖 에이전트] ", end="", flush=True)
                        assistant_line_open = True
                    print(delta, end="", flush=True)
            
            elif event_type in {"response.output_text.done", "response.text.done"}:
                close_assistant_line()

            elif event_type in {
                "response.output_audio_transcript.done",
                "response.audio_transcript.done",
            }:
                close_assistant_line()
            
            # 오디오 응답 (실제 재생)
            elif event_type in {"response.output_audio.delta", "response.audio.delta"}:
                # base64로 인코딩된 오디오 데이터
                audio_base64 = event.get("delta", "")
                if audio_base64:
                    audio_started = True
                    # base64 디코딩
                    audio_bytes = base64.b64decode(audio_base64)
                    # 디코딩한 PCM 바이트를 플레이어 큐에 넣으면 워커 스레드가 재생합니다.
                    player.add_audio(audio_bytes)
            
            elif event_type in {"response.output_audio.done", "response.audio.done"}:
                # 서버가 더 이상 오디오 청크를 보내지 않는 시점만 먼저 기록합니다.
                audio_stream_finished = True
            
            # 응답 완료
            elif event_type == "response.done":
                close_assistant_line()
                # response.done은 서버 응답 스트리밍 완료이지, 로컬 재생 완료와는 별개입니다.
                print("[✅] 응답 완료\n")
                break
            
            # 에러 처리
            elif event_type == "error":
                close_assistant_line()
                print(f"\n[❌] 에러: {event.get('error')}")
                break
        
        # 서버 전송 완료와 로컬 재생 완료는 다를 수 있으므로 큐가 빌 때까지 대기합니다.
        # 이 대기가 없으면 스피커에서 아직 재생 중인데 프로그램이 먼저 끝나 버릴 수 있습니다.
        while player.has_pending_audio():
            await asyncio.sleep(0.05)

        if audio_started and audio_stream_finished:
            # 서버 오디오 스트림 종료 후 로컬 스피커까지 모두 재생된 시점입니다.
            print("[✅] 오디오 재생 완료")
        
        print("=" * 60)
        print("  연결 종료")
        print("=" * 60 + "\n")
        
        # 마지막으로 플레이어 스레드를 정리합니다.
        player.close()


if __name__ == "__main__":
    # asyncio 진입점에서 메인 코루틴을 실행합니다.
    asyncio.run(main())