step2_audio_output.py
"""
================================================================================
Step 2: 음성 출력 추가하기
================================================================================
이 단계에서 배우는 것:
- 오디오 스트리밍 받기
- PCM16 오디오 재생
- base64 디코딩
실행 방법:
python step2_audio_output.py
예상 동작:
1. Step 1의 기능 + 음성 출력
2. 에이전트의 음성 응답을 스피커로 재생
"""
import asyncio
import base64
import json
import os
import queue
import threading
from dotenv import load_dotenv
import websockets
import sounddevice as sd
# .env 파일에서 API 키 로드
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
print("❌ OPENAI_API_KEY를 .env 파일에 설정하세요")
exit(1)
async def open_websocket(url: str, headers: dict):
"""websockets 버전 차이에 따른 헤더 파라미터 호환 처리"""
# 실행 흐름 요약:
# 1. 최신 websockets 시그니처로 연결을 시도합니다.
# 2. 실패 시 구버전 시그니처로 연결합니다.
# 3. 호출부는 버전 차이를 신경 쓰지 않아도 됩니다.
try:
# 최신 websockets 버전은 additional_headers를 사용합니다.
return await websockets.connect(url, additional_headers=headers)
except TypeError:
# 구버전에서는 extra_headers를 사용하므로 예외 시 이 경로로 재시도합니다.
return await websockets.connect(url, extra_headers=headers)
# 오디오 출력 설정:
# - SAMPLE_RATE는 1초를 몇 개의 샘플(sample)로 표현할지 결정합니다.
# - CHANNELS=1은 모노(mono) 출력이라 스피커로 한 채널 오디오를 재생한다는 뜻입니다.
# - DTYPE="int16"은 PCM16 형식으로 16비트 정수 샘플을 사용한다는 뜻입니다.
SAMPLE_RATE = 24000 # Realtime 예제에서 사용하는 24kHz 출력 샘플레이트입니다.
CHANNELS = 1 # 음성을 한 채널로만 재생합니다.
DTYPE = "int16" # 서버에서 받은 PCM16 오디오 형식과 맞춥니다.
class AudioPlayer:
"""
오디오를 재생하는 간단한 클래스
동작 원리:
1. 네트워크에서 받은 오디오 데이터를 큐에 저장
2. 별도 스레드에서 큐의 데이터를 꺼내 스피커로 재생
"""
def __init__(self, sample_rate: int):
# 실행 흐름 요약:
# 1. 네트워크 스레드 대신 큐를 통해 오디오 청크(chunk)를 받습니다.
# 2. 워커 스레드가 큐에서 청크를 꺼내 순서대로 재생합니다.
# 3. 종료 시에는 stop_event와 None 센티널(sentinel)로 워커를 정리합니다.
# 서버가 보내는 PCM16 오디오를 같은 샘플레이트로 재생해야 속도와 음정이 맞습니다.
self.sample_rate = sample_rate
# 네트워크 수신 속도와 스피커 재생 속도를 분리하기 위해 큐를 둡니다.
self.audio_queue = queue.Queue(maxsize=100)
# 실제 스피커 출력은 백그라운드 스레드에서 계속 처리합니다.
self.worker = threading.Thread(target=self._play_audio, daemon=True)
self.started = False
self.stop_event = threading.Event()
# 현재 stream.write 중인지 추적해 종료 시점을 더 정확히 판단합니다.
self._is_playing = False
self._state_lock = threading.Lock()
def start(self):
"""재생 시작"""
# 이 메서드는 "출력 장치를 미리 준비하고 워커를 켠다"는 의미입니다.
if not self.started:
self.started = True
# 워커 스레드를 한 번만 시작해 이후 add_audio 호출을 처리하게 합니다.
self.worker.start()
def add_audio(self, audio_bytes: bytes):
"""재생할 오디오 추가"""
# 이 메서드는 네트워크에서 받은 오디오 청크를 로컬 재생 버퍼에 쌓는 역할입니다.
if self.stop_event.is_set():
# 종료 절차가 시작된 뒤에는 새 오디오를 받지 않습니다.
return
if self.audio_queue.full():
try:
# 큐가 가득 차면 가장 오래된 청크를 버려 전체 지연이 무한히 늘지 않게 합니다.
self.audio_queue.get_nowait()
except queue.Empty:
pass
try:
# 재생할 PCM 바이트를 큐에 밀어 넣습니다.
self.audio_queue.put_nowait(audio_bytes)
except queue.Full:
pass
def _play_audio(self):
"""백그라운드 스레드에서 오디오 재생"""
# 실행 흐름 요약:
# 1. 출력 스트림을 연 뒤 큐에서 청크를 기다립니다.
# 2. 청크가 오면 stream.write로 즉시 재생합니다.
# 3. None 또는 stop_event가 오면 루프를 끝냅니다.
try:
# sounddevice 출력 스트림을 열어 큐에 쌓인 PCM 청크를 순서대로 씁니다.
with sd.RawOutputStream(
samplerate=self.sample_rate,
channels=CHANNELS,
dtype=DTYPE,
) as stream:
while True:
# 큐에서 다음 오디오 청크를 기다립니다.
audio_data = self.audio_queue.get()
if audio_data is None or self.stop_event.is_set(): # 종료 신호
break
try:
with self._state_lock:
# 지금부터 실제 스피커 출력이 진행 중임을 기록합니다.
self._is_playing = True
stream.write(audio_data)
except Exception as e:
# 종료 직전 스트림 상태 경쟁으로 생기는 경고는 무시
if self.stop_event.is_set() and "Stream is stopped" in str(e):
break
raise
finally:
with self._state_lock:
# 한 청크 재생이 끝나면 재생 중 플래그를 내립니다.
self._is_playing = False
except Exception as e:
if not self.stop_event.is_set():
print(f"[❌] 오디오 재생 오류: {e}")
def has_pending_audio(self) -> bool:
"""현재 재생 중이거나 큐에 남은 오디오가 있으면 True"""
# Step 2의 핵심은 response.done과 로컬 재생 완료를 구분하는 것입니다.
# 그래서 "지금 쓰는 중인지"와 "큐에 남아 있는지"를 함께 봅니다.
with self._state_lock:
currently_playing = self._is_playing
# 워커가 실제로 쓰는 중이거나 아직 큐에 남은 청크가 있으면 끝난 것이 아닙니다.
return currently_playing or (not self.audio_queue.empty())
def close(self):
"""재생 종료"""
# 실행 흐름 요약:
# 1. 새 오디오 수신을 막습니다.
# 2. 남은 오디오를 정리합니다.
# 3. 워커가 빠져나오도록 종료 신호를 넣습니다.
# 4. 스레드 종료를 기다립니다.
if not self.started:
return
# 더 이상 재생하지 않도록 종료 플래그를 먼저 세웁니다.
self.stop_event.set()
# 남아 있는 오디오를 비워 종료 시 write 경쟁을 줄임
while True:
try:
self.audio_queue.get_nowait()
except queue.Empty:
break
# 워커가 반드시 빠져나오도록 종료 신호를 큐에 넣음
try:
self.audio_queue.put_nowait(None)
except queue.Full:
try:
# 큐가 꽉 찼다면 하나를 비운 뒤 종료 신호를 넣습니다.
self.audio_queue.get_nowait()
self.audio_queue.put_nowait(None)
except (queue.Empty, queue.Full):
pass
# 워커가 정리될 시간을 조금 주고 종료합니다.
self.worker.join(timeout=1.5)
self.started = False
async def main():
"""
Realtime API에 연결하여 음성 대화를 수행합니다.
"""
# 실행 흐름 요약:
# 1. AudioPlayer를 먼저 시작합니다.
# 2. Realtime 세션을 audio 출력 모달리티(modality)로 설정합니다.
# 3. 사용자 텍스트를 보내고 response.create를 호출합니다.
# 4. 텍스트 델타(delta)와 오디오 델타를 동시에 처리합니다.
# 5. response.done 이후에도 로컬 재생 큐가 빌 때까지 기다립니다.
# 단계 제목을 먼저 출력해 실습 맥락을 보여줍니다.
print("\n" + "=" * 60)
print(" Step 2: 음성 출력 추가")
print("=" * 60 + "\n")
# WebSocket URL 설정
url = "wss://api.openai.com/v1/realtime?model=gpt-realtime-mini"
# 헤더에 API 키 포함
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
}
print("[1/6] 오디오 플레이어 시작...")
# 네트워크 수신과 독립적으로 동작할 오디오 플레이어를 미리 켭니다.
player = AudioPlayer(SAMPLE_RATE)
player.start()
print("\n[✅] 오디오 플레이어 시작\n")
print("[2/6] WebSocket 연결 중...")
# Realtime 세션을 열고 이후 이벤트를 같은 WebSocket으로 주고받습니다.
async with (await open_websocket(url, headers)) as ws:
print("[✅] WebSocket 연결 완료\n")
# 세션 생성 이벤트 받기
# 이 단계에서는 session.created가 왔는지만 확인하면 충분합니다.
response = await ws.recv()
event = json.loads(response)
print(f"[3/6] 세션 생성: {event['type']}\n")
# 이번 단계는 오디오 출력을 써야 하므로 output_modalities를 audio로 설정합니다.
print("[4/6] 세션 설정 전송 (오디오 모드)...")
session_update = {
"type": "session.update",
"session": {
"type": "realtime",
"instructions": "당신은 친절한 한국어 AI 비서입니다.",
"output_modalities": ["audio"],
"audio": {
"output": {
"format": {
# 서버가 PCM16 24kHz로 내려주도록 지정합니다.
"type": "audio/pcm",
"rate": SAMPLE_RATE,
},
# 예제에서 사용할 음성 보이스를 선택합니다.
"voice": "alloy",
},
},
}
}
await ws.send(json.dumps(session_update))
# 설정이 반영되기 전에는 바로 대화를 시작하지 않고 확인 응답을 기다립니다.
# 특히 오디오 포맷과 보이스(voice)가 반영되었는지 확인한 뒤에 질문을 보내야 합니다.
while True:
response = await ws.recv()
event = json.loads(response)
event_type = event.get("type", "")
if event_type == "session.updated":
print(f"\n[✅] 세션 설정 완료: {event_type}\n")
break
if event_type == "error":
# 오디오 세션 설정 실패 시 플레이어를 닫고 종료합니다.
print(f"[❌] 세션 설정 에러: {event.get('error')}\n")
player.close()
return
# 대화 시작
print("[5/6] 대화 시작")
print("-" * 60)
# Step 2에서는 텍스트 질문을 보내고 음성 응답을 받아 재생합니다.
user_message = "안녕하세요! 간단하게 자기소개 해주세요."
print(f"\n👤 사용자: {user_message}")
# 사용자 메시지를 대화 아이템으로 추가합니다.
conversation_item = {
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [
{
"type": "input_text",
"text": user_message
}
]
}
}
await ws.send(json.dumps(conversation_item))
# 메시지 등록 후 별도 response.create를 보내 실제 응답 생성을 시작합니다.
response_create = {"type": "response.create"}
await ws.send(json.dumps(response_create))
# 이후 로그 정리를 위한 상태 값들입니다.
# audio_started: 실제 오디오 청크를 한 번이라도 받았는가?
# audio_stream_finished: 서버 기준 오디오 스트림이 끝났는가?
# assistant_line_open: 콘솔에서 한 줄 출력이 열려 있는가?
# saw_audio_transcript: 오디오 전사 텍스트를 우선 출력했는가?
audio_started = False
audio_stream_finished = False
assistant_line_open = False
saw_audio_transcript = False
# 응답 받기
print("\n[6/6] 에이전트 응답 중... (음성 재생)\n")
def close_assistant_line():
# 텍스트를 한 줄로 이어서 출력하다가 이벤트 경계에서 줄을 닫습니다.
nonlocal assistant_line_open
if assistant_line_open:
print()
assistant_line_open = False
while True:
# 서버가 보내는 각 이벤트를 하나씩 해석합니다.
response = await ws.recv()
event = json.loads(response)
event_type = event.get("type")
# 텍스트 응답 (참고용)
if event_type in {
"response.output_audio_transcript.delta",
"response.audio_transcript.delta",
}:
delta = event.get("delta", "")
if delta and not assistant_line_open:
# 첫 텍스트 조각이 오면 에이전트 접두어를 한 번만 출력합니다.
print("[🤖 에이전트] ", end="", flush=True)
assistant_line_open = True
print(delta, end="", flush=True)
# 오디오 전사 텍스트가 있으면 이것을 우선 화면에 보여줍니다.
saw_audio_transcript = True
elif event_type in {"response.output_text.delta", "response.text.delta"}:
delta = event.get("delta", "")
# 오디오 전사가 오지 않는 경우에만 텍스트 델타를 출력
if delta and not saw_audio_transcript:
if not assistant_line_open:
print("[🤖 에이전트] ", end="", flush=True)
assistant_line_open = True
print(delta, end="", flush=True)
elif event_type in {"response.output_text.done", "response.text.done"}:
close_assistant_line()
elif event_type in {
"response.output_audio_transcript.done",
"response.audio_transcript.done",
}:
close_assistant_line()
# 오디오 응답 (실제 재생)
elif event_type in {"response.output_audio.delta", "response.audio.delta"}:
# base64로 인코딩된 오디오 데이터
audio_base64 = event.get("delta", "")
if audio_base64:
audio_started = True
# base64 디코딩
audio_bytes = base64.b64decode(audio_base64)
# 디코딩한 PCM 바이트를 플레이어 큐에 넣으면 워커 스레드가 재생합니다.
player.add_audio(audio_bytes)
elif event_type in {"response.output_audio.done", "response.audio.done"}:
# 서버가 더 이상 오디오 청크를 보내지 않는 시점만 먼저 기록합니다.
audio_stream_finished = True
# 응답 완료
elif event_type == "response.done":
close_assistant_line()
# response.done은 서버 응답 스트리밍 완료이지, 로컬 재생 완료와는 별개입니다.
print("[✅] 응답 완료\n")
break
# 에러 처리
elif event_type == "error":
close_assistant_line()
print(f"\n[❌] 에러: {event.get('error')}")
break
# 서버 전송 완료와 로컬 재생 완료는 다를 수 있으므로 큐가 빌 때까지 대기합니다.
# 이 대기가 없으면 스피커에서 아직 재생 중인데 프로그램이 먼저 끝나 버릴 수 있습니다.
while player.has_pending_audio():
await asyncio.sleep(0.05)
if audio_started and audio_stream_finished:
# 서버 오디오 스트림 종료 후 로컬 스피커까지 모두 재생된 시점입니다.
print("[✅] 오디오 재생 완료")
print("=" * 60)
print(" 연결 종료")
print("=" * 60 + "\n")
# 마지막으로 플레이어 스레드를 정리합니다.
player.close()
if __name__ == "__main__":
# asyncio 진입점에서 메인 코루틴을 실행합니다.
asyncio.run(main())'AI System > OpenAI API와 바이브 코딩으로 배우는 AI 서비스 개발' 카테고리의 다른 글
| d02 - 06. Audio - 04. voice agent - step4. full interrupt (0) | 2026.06.03 |
|---|---|
| d02 - 06. Audio - 04. voice agent - step3. audio input (0) | 2026.06.03 |
| d02 - 06. Audio - 04. voice agent - step1. basic connection (0) | 2026.06.02 |
| d02 - 06. Audio - 04. voice agent - README.md (0) | 2026.06.02 |
| d02 - 06. Audio - 03. audio in llm v2 (0) | 2026.06.02 |