step3_audio_input.py
"""
================================================================================
Step 3: 오디오 입력(전사)만 다루기
================================================================================
이 단계에서 배우는 것:
- 마이크에서 오디오 캡처
- Server VAD로 발화 구간 감지
- 사용자 발화를 텍스트로 전사해서 출력
실행 방법:
python step3_audio_input.py
예상 동작:
1. 마이크 음성을 서버로 실시간 전송
2. 발화 시작/종료 이벤트 표시
3. 사용자 전사 텍스트 출력
주의:
- 이 Step은 에이전트 응답 생성/재생을 하지 않습니다.
- 전체 대화/인터럽트는 Step 4에서 다룹니다.
"""
import asyncio
import base64
import json
import os
from dotenv import load_dotenv
import sounddevice as sd
import websockets
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
print("❌ OPENAI_API_KEY를 .env 파일에 설정하세요")
raise SystemExit(1)
# 오디오 입력 설정:
# - SAMPLE_RATE는 1초를 몇 개의 샘플(sample)로 표현할지 결정합니다.
# - CHANNELS=1은 모노(mono) 입력이라 마이크 한 채널만 사용한다는 뜻입니다.
# - DTYPE="int16"은 PCM16 형식으로 16비트 정수 샘플을 사용한다는 뜻입니다.
# - BLOCKSIZE는 한 번의 콜백(callback)에서 받을 오디오 프레임(frame) 크기입니다.
SAMPLE_RATE = 24000 # Realtime 예제에서 사용하는 24kHz 입력 샘플레이트입니다.
CHANNELS = 1 # 음성을 한 채널로만 받아 전송합니다.
DTYPE = "int16" # 서버에 보낼 PCM16 오디오 형식과 맞춥니다.
BLOCKSIZE = 960 # 약 40ms 분량(960 / 24000초)의 오디오를 한 번에 처리합니다.
async def open_websocket(url: str, headers: dict):
"""websockets 버전 차이에 따른 헤더 파라미터 호환 처리"""
# 실행 흐름 요약:
# 1. 최신 websockets 연결 시그니처를 우선 시도합니다.
# 2. 구버전이면 대체 시그니처로 연결합니다.
# 3. 나머지 코드는 버전 차이를 몰라도 됩니다.
try:
# 최신 버전용 헤더 인자를 먼저 시도합니다.
return await websockets.connect(url, additional_headers=headers)
except TypeError:
# 구버전 호환 경로입니다.
return await websockets.connect(url, extra_headers=headers)
class MicrophoneCapture:
"""마이크 오디오를 캡처하여 asyncio 큐로 전달"""
def __init__(self, loop: asyncio.AbstractEventLoop, sample_rate: int):
# 실행 흐름 요약:
# 1. 오디오 콜백은 sounddevice 스레드에서 실행됩니다.
# 2. 그 결과를 asyncio.Queue로 넘겨 비동기 태스크가 소비합니다.
# 3. 이렇게 해야 마이크 입력과 WebSocket 전송을 자연스럽게 분리할 수 있습니다.
self.loop = loop
self.sample_rate = sample_rate
# 오디오 콜백 스레드와 asyncio 루프 사이를 이어 주는 버퍼입니다.
self.audio_queue = asyncio.Queue(maxsize=100)
def _callback(self, indata, frames, time_info, status):
# 이 함수는 오디오 장치가 새 프레임을 줄 때마다 매우 자주 호출됩니다.
if status:
print(f"[⚠️] 마이크 상태: {status}")
# RawInputStream이 준 바이트를 그대로 서버 전송용 큐에 넘깁니다.
audio_bytes = bytes(indata)
self.loop.call_soon_threadsafe(self._enqueue, audio_bytes)
def _enqueue(self, audio_bytes: bytes):
# 이 함수는 이벤트 루프 스레드에서 실행되므로 asyncio.Queue에 안전하게 넣을 수 있습니다.
if self.audio_queue.full():
try:
# 큐가 꽉 차면 가장 오래된 프레임을 버려 실시간성을 유지합니다.
self.audio_queue.get_nowait()
except asyncio.QueueEmpty:
pass
try:
# 최신 마이크 프레임을 큐에 적재합니다.
self.audio_queue.put_nowait(audio_bytes)
except asyncio.QueueFull:
pass
async def start_capture(self, stop_event: asyncio.Event):
# 실행 흐름 요약:
# 1. 입력 스트림을 열어 마이크 콜백을 활성화합니다.
# 2. 실제 오디오 적재는 콜백이 담당합니다.
# 3. 이 코루틴은 종료 신호를 감시하며 컨텍스트를 유지합니다.
# 사용자에게 마이크 입력이 시작됐음을 안내합니다.
print("[✅] 마이크 캡처 시작")
print(" 💡 말하면 발화 이벤트/전사가 표시됩니다\n")
# sounddevice 입력 스트림은 별도 오디오 콜백으로 계속 데이터를 전달합니다.
with sd.RawInputStream(
samplerate=self.sample_rate,
channels=CHANNELS,
dtype=DTYPE,
blocksize=BLOCKSIZE,
callback=self._callback,
):
while not stop_event.is_set():
# 오디오 캡처 자체는 콜백이 담당하므로 이 루프는 종료 신호만 감시합니다.
await asyncio.sleep(0.1)
async def send_microphone_to_server(ws, mic_capture: MicrophoneCapture, stop_event: asyncio.Event):
"""마이크 오디오를 input_audio_buffer.append 이벤트로 서버 전송"""
# 실행 흐름 요약:
# 1. 마이크 큐에서 PCM 프레임을 꺼냅니다.
# 2. base64 문자열로 인코딩합니다.
# 3. input_audio_buffer.append 이벤트로 서버에 계속 보냅니다.
while not stop_event.is_set():
try:
# 짧은 타임아웃으로 큐를 확인해 종료 이벤트에 유연하게 반응합니다.
audio_bytes = await asyncio.wait_for(mic_capture.audio_queue.get(), timeout=0.2)
except asyncio.TimeoutError:
continue
# 서버가 요구하는 형태에 맞게 PCM 바이트를 base64 문자열로 인코딩합니다.
message = {
"type": "input_audio_buffer.append",
"audio": base64.b64encode(audio_bytes).decode("utf-8"),
}
await ws.send(json.dumps(message))
async def receive_transcription_events(ws, stop_event: asyncio.Event):
"""발화 시작/종료 및 전사 이벤트만 출력"""
# 실행 흐름 요약:
# 1. 서버가 보낸 이벤트를 계속 수신합니다.
# 2. speech_started / speech_stopped / transcription.completed만 출력합니다.
# 3. 오류가 오면 stop_event를 세우고 루프를 종료합니다.
async for message in ws:
# WebSocket 메시지를 JSON 이벤트로 변환해 종류별로 처리합니다.
event = json.loads(message)
event_type = event.get("type")
if event_type == "input_audio_buffer.speech_started":
print("\n[🎤 사용자] 발화 시작")
elif event_type == "input_audio_buffer.speech_stopped":
print("[🎤 사용자] 발화 종료")
elif event_type == "conversation.item.input_audio_transcription.completed":
transcript = event.get("transcript", "").strip()
if transcript:
# 전사가 비어 있지 않을 때만 화면에 보여줍니다.
print(f"[💬 사용자] {transcript}\n")
elif event_type == "error":
error_msg = event.get("error", {}).get("message", "알 수 없는 오류")
print(f"\n[❌] 에러: {error_msg}")
stop_event.set()
if stop_event.is_set():
# 종료 신호가 생기면 이벤트 수신 루프도 빠져나옵니다.
break
async def main():
# 실행 흐름 요약:
# 1. Realtime 세션을 "오디오 입력/전사 전용"으로 설정합니다.
# 2. Server VAD(Voice Activity Detection)와 Whisper 전사를 켭니다.
# 3. 마이크 캡처, 서버 전송, 이벤트 수신을 동시에 실행합니다.
# 4. 이 단계에서는 create_response=False로 두어 에이전트 응답은 만들지 않습니다.
# 단계 제목을 출력해 지금 실행 중인 실습을 구분합니다.
print("\n" + "=" * 60)
print(" Step 3: 오디오 입력(전사)만 다루기")
print("=" * 60 + "\n")
url = "wss://api.openai.com/v1/realtime?model=gpt-realtime-mini"
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
}
print("[1/3] WebSocket 연결 중...")
async with (await open_websocket(url, headers)) as ws:
print("[✅] 연결 완료\n")
# 세션 생성 이벤트는 연결 직후 한 번 소비합니다.
await ws.recv()
print("[2/3] 세션 생성 완료\n")
print("[3/3] 세션 설정 (오디오 입력/전사 전용)...")
session_update = {
"type": "session.update",
"session": {
"type": "realtime",
# Step 3은 전사 확인용 실습이라는 점을 명시합니다.
"instructions": "이 세션은 사용자 음성 입력 전사 확인용입니다.",
# 모델 텍스트 응답은 만들지 않지만 전사 텍스트 이벤트는 받기 위해
# 출력 모달리티(modality)는 text로 유지합니다.
"output_modalities": ["text"],
"audio": {
"input": {
"format": {
"type": "audio/pcm",
"rate": SAMPLE_RATE,
},
"transcription": {
# 사용자 음성을 Whisper로 전사합니다.
"model": "whisper-1",
"language": "ko",
},
"noise_reduction": {
# 원거리(far field) 환경을 가정한 노이즈 감소를 사용합니다.
"type": "far_field",
},
"turn_detection": {
# 서버 VAD(Voice Activity Detection)가 발화 시작/끝을 판단합니다.
"type": "server_vad",
"threshold": 0.7,
"silence_duration_ms": 900,
"prefix_padding_ms": 300,
# Step 3은 전사만 확인하므로 에이전트 응답 생성은 끕니다.
"create_response": False,
"interrupt_response": False,
},
},
},
},
}
await ws.send(json.dumps(session_update))
while True:
response = await ws.recv()
event = json.loads(response)
event_type = event.get("type", "")
if event_type == "session.updated":
print("[✅] 설정 완료\n")
break
if event_type == "error":
error_msg = event.get("error", {}).get("message", "알 수 없는 오류")
print(f"[❌] 설정 에러: {error_msg}\n")
return
print("=" * 60)
print(" 🎤 마이크로 말하세요!")
print(" 💡 이 단계에서는 사용자 전사만 출력합니다")
print(" ⏹️ 종료: Ctrl+C")
print("=" * 60 + "\n")
# 이후 세 개의 동시 작업이 함께 돌아갑니다.
stop_event = asyncio.Event()
loop = asyncio.get_running_loop()
mic_capture = MicrophoneCapture(loop, SAMPLE_RATE)
# 마이크 캡처, 서버 전송, 서버 이벤트 수신을 병렬로 실행합니다.
mic_task = asyncio.create_task(mic_capture.start_capture(stop_event))
send_task = asyncio.create_task(send_microphone_to_server(ws, mic_capture, stop_event))
recv_task = asyncio.create_task(receive_transcription_events(ws, stop_event))
try:
await asyncio.gather(mic_task, send_task, recv_task)
except KeyboardInterrupt:
# Ctrl+C가 들어오면 세 태스크에 모두 종료 신호를 전파합니다.
print("\n\n[⏹️] 종료 중...")
stop_event.set()
mic_task.cancel()
send_task.cancel()
recv_task.cancel()
await asyncio.gather(mic_task, send_task, recv_task, return_exceptions=True)
print("\n" + "=" * 60)
print(" 연결 종료")
print("=" * 60 + "\n")
if __name__ == "__main__":
try:
# asyncio 이벤트 루프에서 메인 코루틴을 실행합니다.
asyncio.run(main())
except KeyboardInterrupt:
print("\n[⏹️] 프로그램을 종료했습니다.")'AI System > OpenAI API와 바이브 코딩으로 배우는 AI 서비스 개발' 카테고리의 다른 글
| d02 - 06. Audio - 04. voice agent - step4. full interrupt (0) | 2026.06.03 |
|---|---|
| d02 - 06. Audio - 04. voice agent - step2. audio output (0) | 2026.06.03 |
| d02 - 06. Audio - 04. voice agent - step1. basic connection (0) | 2026.06.02 |
| d02 - 06. Audio - 04. voice agent - README.md (0) | 2026.06.02 |
| d02 - 06. Audio - 03. audio in llm v2 (0) | 2026.06.02 |