본문 바로가기
AI System/OpenAI API와 바이브 코딩으로 배우는 AI 서비스 개발

d02 - 06. Audio - 04. voice agent - step4. full interrupt

by Toddler_AD 2026. 6. 3.

step4_full_interrupt.py

"""
================================================================================
Step 4: 사용자 인터럽트 기능 추가
================================================================================

이 단계에서 배우는 것:
- 사용자가 말하면 에이전트 응답 즉시 중단
- 중지 명령어 인식 ("멈춰", "그만" 등)
- 상태 관리 (agent_speaking, should_suppress_response)

실행 방법:
    python step4_full_interrupt.py

예상 동작:
    1. Step 3에서 전송한 사용자 음성을 전사하고, 응답을 생성해 음성으로 재생
    2. 에이전트가 말하는 중에 사용자가 끼어들면 즉시 중단
    3. "멈춰", "그만" 같은 명령으로 응답 차단
"""

import asyncio
import base64
from difflib import SequenceMatcher
import json
import os
import queue
import re
import threading
import time
from dotenv import load_dotenv
import websockets
import sounddevice as sd


load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

if not OPENAI_API_KEY:
    print("❌ OPENAI_API_KEY를 .env 파일에 설정하세요")
    exit(1)


# 오디오 입출력 설정:
# - SAMPLE_RATE는 1초를 몇 개의 샘플(sample)로 표현할지 결정합니다.
# - CHANNELS=1은 모노(mono) 입력/출력이라 한 채널 오디오만 사용한다는 뜻입니다.
# - DTYPE="int16"은 PCM16 형식으로 16비트 정수 샘플을 사용한다는 뜻입니다.
# - BLOCKSIZE는 한 번의 콜백(callback)에서 받을 오디오 프레임(frame) 크기입니다.
SAMPLE_RATE = 24000   # Realtime 예제에서 사용하는 24kHz 입출력 샘플레이트입니다.
CHANNELS = 1          # 음성을 한 채널로만 받아 전송하고 재생합니다.
DTYPE = "int16"      # 서버와 주고받는 PCM16 오디오 형식과 맞춥니다.
BLOCKSIZE = 960       # 약 40ms 분량(960 / 24000초)의 오디오를 한 번에 처리합니다.


async def open_websocket(url: str, headers: dict):
    """websockets 버전 차이에 따른 헤더 파라미터 호환 처리"""
    # 실행 흐름 요약:
    # 1. 최신 websockets 시그니처를 먼저 시도합니다.
    # 2. 구버전이면 fallback 경로로 연결합니다.
    # 3. 아래 실습 코드는 버전 차이를 의식하지 않습니다.
    try:
        # 최신 websockets 버전용 인자를 우선 사용합니다.
        return await websockets.connect(url, additional_headers=headers)
    except TypeError:
        # 구버전 호환 경로입니다.
        return await websockets.connect(url, extra_headers=headers)


class AudioPlayer:
    """오디오 재생"""
    
    def __init__(self, sample_rate: int):
        # 실행 흐름 요약:
        # 1. 서버 오디오 청크를 큐에 저장합니다.
        # 2. 워커 스레드가 큐를 소비하며 로컬 스피커에 재생합니다.
        # 3. 인터럽트 시 clear()로 큐를 비워 즉시 멈춘 듯한 체감을 만듭니다.
        # 출력 오디오 포맷과 같은 샘플레이트로 재생해야 음성이 정상 속도로 들립니다.
        self.sample_rate = sample_rate
        # 네트워크 수신과 스피커 출력을 분리하기 위한 큐입니다.
        self.audio_queue = queue.Queue(maxsize=100)
        # 실제 스피커 출력은 별도 스레드에서 처리합니다.
        self.worker = threading.Thread(target=self._play_audio, daemon=True)
        self.started = False
        self._is_playing = False
        self._state_lock = threading.Lock()
    
    def start(self):
        # 스피커 출력 워커를 한 번만 켜서 이후 재생 요청을 계속 처리합니다.
        if not self.started:
            self.started = True
            # 재생 워커를 한 번만 시작합니다.
            self.worker.start()
    
    def add_audio(self, audio_bytes: bytes):
        # 네트워크에서 받은 오디오 청크를 재생 큐에 넣습니다.
        if self.audio_queue.full():
            try:
                # 큐가 꽉 차면 가장 오래된 오디오를 버려 지연 누적을 줄입니다.
                self.audio_queue.get_nowait()
            except queue.Empty:
                pass
        try:
            # 새 PCM 청크를 큐에 추가합니다.
            self.audio_queue.put_nowait(audio_bytes)
        except queue.Full:
            pass
    
    def clear(self):
        """큐의 모든 오디오 제거 (응답 중단 시 사용)"""
        # 인터럽트 시 이미 받아 둔 오디오 청크를 모두 버려야 체감상 즉시 멈춥니다.
        while True:
            try:
                self.audio_queue.get_nowait()
            except queue.Empty:
                break

    def has_pending_audio(self) -> bool:
        """현재 재생 중이거나 큐에 남은 오디오가 있으면 True"""
        # Step 4에서도 서버 응답 종료와 로컬 스피커 재생 완료를 분리해서 봅니다.
        with self._state_lock:
            currently_playing = self._is_playing
        return currently_playing or (not self.audio_queue.empty())
    
    def _play_audio(self):
        # 실행 흐름 요약:
        # 1. 오디오 출력 스트림을 엽니다.
        # 2. 큐에서 청크를 꺼내 순서대로 씁니다.
        # 3. None 센티널을 받으면 워커를 끝냅니다.
        try:
            # 큐에서 받은 PCM 청크를 출력 스트림에 순서대로 씁니다.
            with sd.RawOutputStream(
                samplerate=self.sample_rate,
                channels=CHANNELS,
                dtype=DTYPE,
            ) as stream:
                while True:
                    audio_data = self.audio_queue.get()
                    if audio_data is None:
                        break
                    with self._state_lock:
                        # 현재 로컬 스피커 출력이 진행 중임을 표시합니다.
                        self._is_playing = True
                    try:
                        stream.write(audio_data)
                    finally:
                        with self._state_lock:
                            # 한 청크 재생이 끝나면 상태를 내립니다.
                            self._is_playing = False
        except Exception as e:
            print(f"[❌] 오디오 재생 오류: {e}")
    
    def close(self):
        # 종료 시에는 워커 스레드가 빠져나오도록 센티널을 넣고 join 합니다.
        try:
            # None을 종료 센티널(sentinel)로 사용해 워커를 빠져나오게 합니다.
            self.audio_queue.put_nowait(None)
        except queue.Full:
            pass
        self.worker.join(timeout=1.0)
        with self._state_lock:
            self._is_playing = False


class MicrophoneCapture:
    """마이크 캡처"""
    
    def __init__(self, loop: asyncio.AbstractEventLoop, sample_rate: int):
        # 실행 흐름 요약:
        # 1. 마이크 콜백이 PCM 프레임을 받습니다.
        # 2. 이벤트 루프에 안전하게 넘겨 asyncio.Queue에 저장합니다.
        # 3. send_audio 코루틴이 그 큐를 서버로 밀어냅니다.
        self.loop = loop
        self.sample_rate = sample_rate
        # 오디오 콜백에서 받은 PCM 프레임을 asyncio 태스크가 소비할 큐입니다.
        self.audio_queue = asyncio.Queue(maxsize=100)
    
    def _callback(self, indata, frames, time_info, status):
        # 이 콜백은 매우 자주 호출되므로, 무거운 로직 대신 큐 적재만 빠르게 처리합니다.
        # RawInputStream이 준 프레임을 bytes로 바꿔 이벤트 루프 쪽 큐에 넘깁니다.
        audio_bytes = bytes(indata)
        self.loop.call_soon_threadsafe(self._enqueue, audio_bytes)
    
    def _enqueue(self, audio_bytes: bytes):
        # 오래된 프레임보다 최신 프레임이 더 중요하므로 큐가 가득 차면 앞쪽을 버립니다.
        if self.audio_queue.full():
            try:
                # 오래된 프레임을 버리고 최신 프레임을 유지합니다.
                self.audio_queue.get_nowait()
            except asyncio.QueueEmpty:
                pass
        try:
            self.audio_queue.put_nowait(audio_bytes)
        except asyncio.QueueFull:
            pass
    
    async def start_capture(self, stop_event: asyncio.Event):
        # 실행 흐름 요약:
        # 1. RawInputStream 컨텍스트를 열어 마이크 캡처를 시작합니다.
        # 2. 실제 적재는 _callback이 담당합니다.
        # 3. 이 코루틴은 stop_event가 세워질 때까지 컨텍스트를 유지합니다.
        # 마이크는 콜백 기반이므로 이 컨텍스트가 살아 있는 동안 계속 캡처됩니다.
        with sd.RawInputStream(
            samplerate=self.sample_rate,
            channels=CHANNELS,
            dtype=DTYPE,
            blocksize=BLOCKSIZE,
            callback=self._callback,
        ):
            while not stop_event.is_set():
                await asyncio.sleep(0.1)


class VoiceAgent:
    """
    음성 에이전트 (인터럽트 지원)
    """
    
    def __init__(self):
        # 실행 흐름 요약:
        # 1. 사용자/에이전트 발화 상태를 추적합니다.
        # 2. 최근 응답 텍스트를 저장해 에코를 판별합니다.
        # 3. 인터럽트와 중지 명령의 중복 취소를 제어합니다.
        # 상태 플래그
        self.agent_speaking = False      # 에이전트가 말하는 중인가?
        self.user_speaking = False       # 사용자가 말하는 중인가?
        self.should_suppress_response = False  # 응답을 억제해야 하는가?
        self.assistant_text_open = False  # 콘솔에서 에이전트 텍스트 줄이 열려있는가?
        
        # 에코(스피커 재유입) 억제를 위한 최근 응답 추적
        self.current_assistant_transcript = ""
        self.last_assistant_transcript = ""
        self.last_assistant_response_ended_at = 0.0
        self.echo_suppression_window_sec = 2.5
        self.echo_similarity_threshold = 0.72
        self.last_cancel_at = 0.0
        self.cancel_debounce_sec = 0.35
        self.interrupt_cancelled_in_turn = False

    @staticmethod
    def normalize_text(text: str) -> str:
        """유사도 비교를 위해 공백/문장부호를 제거한 정규화 문자열 반환"""
        # 에코 비교 시 띄어쓰기와 문장부호 차이로 인한 오탐을 줄입니다.
        return re.sub(r"[^\w]", "", text.lower(), flags=re.UNICODE)

    def close_assistant_line(self):
        """열려 있는 에이전트 텍스트 줄을 안전하게 닫습니다."""
        # 텍스트 스트리밍은 한 줄로 이어서 출력하므로 이벤트 경계에서 줄 정리가 필요합니다.
        if self.assistant_text_open:
            print()
            self.assistant_text_open = False

    def has_active_or_pending_response(self, player: AudioPlayer) -> bool:
        """서버 응답 스트림 또는 로컬 재생 버퍼가 남아있는지 확인"""
        # 서버가 끝났더라도 스피커 큐가 남아 있으면 아직 응답이 체감상 끝난 것이 아닙니다.
        return self.agent_speaking or player.has_pending_audio()

    def is_probable_echo(self, transcript: str) -> bool:
        """
        최근 에이전트 발화와 사용자 전사가 지나치게 유사하면
        스피커 재유입(에코)으로 판단합니다.
        """
        # 실행 흐름 요약:
        # 1. 최근 응답이 있는지와 시간 창(window) 안인지 확인합니다.
        # 2. 문자열을 정규화해 비교 가능한 형태로 만듭니다.
        # 3. 포함 관계 또는 유사도 비율로 에코 여부를 판단합니다.
        if not transcript or not self.last_assistant_transcript:
            return False

        elapsed = time.monotonic() - self.last_assistant_response_ended_at
        if elapsed > self.echo_suppression_window_sec:
            return False

        user_norm = self.normalize_text(transcript)
        assistant_norm = self.normalize_text(self.last_assistant_transcript)
        if len(user_norm) < 4 or len(assistant_norm) < 4:
            return False

        # 한 문장이 다른 문장에 포함되는 경우(부분 재수음) 우선 차단
        if user_norm in assistant_norm or assistant_norm in user_norm:
            return True

        # 포함 관계가 아니더라도 전체 문자열 유사도가 높으면 에코로 판단합니다.
        similarity = SequenceMatcher(None, user_norm, assistant_norm).ratio()
        return similarity >= self.echo_similarity_threshold
    
    def is_stop_command(self, text: str) -> bool:
        """
        중지 명령어인지 확인
        
        Args:
            text: 사용자 발화 텍스트
            
        Returns:
            bool: 중지 명령어이면 True
        """
        # 강의 관점에서는 "정지 명령어 라우팅" 예제로 이해하면 좋습니다.
        text_lower = text.lower().strip()
        stop_keywords = (
            "멈춰", "멈추", "그만", "스탑", "stop", 
            "잠깐만", "조용히", "닥쳐"
        )
        return any(keyword in text_lower for keyword in stop_keywords)
    
    async def cancel_response(self, ws, player: AudioPlayer, reason: str = "[🛑 인터럽트] 응답 중단"):
        """
        현재 응답 취소
        
        Args:
            ws: WebSocket 연결
            player: 오디오 플레이어
        """
        # 실행 흐름 요약:
        # 1. debounce로 중복 취소를 막습니다.
        # 2. response.cancel을 서버에 보냅니다.
        # 3. 로컬 오디오 큐도 함께 비웁니다.
        # 4. 콘솔 줄 상태를 정리하고 필요하면 이유를 출력합니다.
        # 짧은 시간 내 중복 취소 요청을 방지
        now = time.monotonic()
        if (now - self.last_cancel_at) < self.cancel_debounce_sec:
            # 같은 발화에서 취소 이벤트가 연속 발생해도 서버에 과하게 보내지 않습니다.
            return
        self.last_cancel_at = now

        # 응답 취소 요청
        cancel_message = {"type": "response.cancel"}
        await ws.send(json.dumps(cancel_message))
        
        # 재생 큐 비우기
        player.clear()

        # 텍스트 스트리밍 중이었다면 줄바꿈 정리
        self.close_assistant_line()
        
        if reason:
            print(reason)

    async def request_response(self, ws):
        """사용자 전사 완료 후 수동으로 응답 생성을 요청합니다."""
        # Step 4는 create_response=False이므로 전사 완료 후 직접 응답 생성을 트리거합니다.
        await ws.send(json.dumps({"type": "response.create"}))
    
    async def send_audio(self, ws, mic_capture: MicrophoneCapture, stop_event: asyncio.Event):
        """마이크 오디오를 서버로 전송"""
        # 실행 흐름 요약:
        # 1. 마이크 큐에서 PCM 프레임을 받습니다.
        # 2. base64로 인코딩합니다.
        # 3. input_audio_buffer.append 이벤트로 서버에 연속 전송합니다.
        while not stop_event.is_set():
            # 마이크 큐에서 최신 PCM 프레임을 하나 꺼냅니다.
            audio_bytes = await mic_capture.audio_queue.get()
            audio_base64 = base64.b64encode(audio_bytes).decode("utf-8")
            
            message = {
                "type": "input_audio_buffer.append",
                "audio": audio_base64
            }
            # 서버는 input_audio_buffer.append 이벤트를 연속으로 받아 사용자 음성을 누적합니다.
            await ws.send(json.dumps(message))
    
    async def receive_events(self, ws, player: AudioPlayer, stop_event: asyncio.Event):
        """서버 이벤트 수신 및 처리"""
        # 실행 흐름 요약:
        # 1. 사용자 발화 이벤트를 받아 인터럽트 여부를 판단합니다.
        # 2. 전사 완료 후 에코/정지 명령/일반 질문을 분기합니다.
        # 3. 에이전트 텍스트와 오디오 스트림을 동시에 처리합니다.
        # 4. response.done에서 상태를 정리해 다음 턴으로 넘어갑니다.
        async for message in ws:
            # 서버 이벤트를 JSON으로 풀어 어떤 종류인지 먼저 확인합니다.
            event = json.loads(message)
            event_type = event.get("type")
            
            # ===== 사용자 발화 이벤트 =====
            if event_type == "input_audio_buffer.speech_started":
                self.user_speaking = True
                self.interrupt_cancelled_in_turn = False
                self.close_assistant_line()
                print("\n[🎤 사용자] 발화 시작")
                
                # 서버 스트림이 끝났더라도 로컬 재생 버퍼가 남아있을 수 있어
                # 재생 중/버퍼 잔량까지 포함해 인터럽트를 처리합니다.
                if self.has_active_or_pending_response(player):
                    # 사용자가 다시 말하기 시작하면 현재 응답을 즉시 끊습니다.
                    await self.cancel_response(ws, player, reason="")
                    self.interrupt_cancelled_in_turn = True
                    # 새로운 질문이므로 억제 해제
                    self.should_suppress_response = False
            
            elif event_type == "input_audio_buffer.speech_stopped":
                self.user_speaking = False
                self.close_assistant_line()
                print("[🎤 사용자] 발화 종료\n")
            
            elif event_type == "conversation.item.input_audio_transcription.completed":
                transcript = event.get("transcript", "")
                self.close_assistant_line()
                # 사용자가 실제로 무엇을 말했는지 먼저 콘솔에 보여줍니다.
                print(f"[💬 사용자] {transcript}\n")

                # 최근 에이전트 응답과 지나치게 유사하면 에코로 간주
                if self.is_probable_echo(transcript):
                    self.should_suppress_response = True
                    if self.has_active_or_pending_response(player):
                        await self.cancel_response(ws, player, reason="")
                    print("[🔇 에코 감지] 스피커 재유입 음성으로 판단하여 무시\n")
                    self.interrupt_cancelled_in_turn = False
                    continue
                
                # 중지 명령어 체크
                if self.is_stop_command(transcript):
                    # 현재 응답 중단 + 다음 응답 억제
                    self.should_suppress_response = True
                    self.agent_speaking = False
                    if self.has_active_or_pending_response(player):
                        await self.cancel_response(ws, player, reason="")
                    print("[⏹️  중지 명령] 응답 생성 차단\n")
                    self.interrupt_cancelled_in_turn = False
                else:
                    # 일반 발화 - 새로운 응답 허용
                    self.should_suppress_response = False

                    # 일반 끼어들기일 때만 인터럽트 로그를 한 번 출력
                    if self.interrupt_cancelled_in_turn:
                        print("[🛑 인터럽트] 응답 중단")
                    self.interrupt_cancelled_in_turn = False

                    # 자동 응답(create_response=False)을 사용하지 않으므로
                    # 전사 완료 시점에 수동으로 응답을 생성합니다.
                    # 이 설계 덕분에 "사용자 전사 먼저, 에이전트 응답 나중" 흐름이 유지됩니다.
                    if transcript.strip():
                        await self.request_response(ws)
            
            # ===== 에이전트 응답 이벤트 =====
            elif event_type in {
                "response.output_text.delta",
                "response.text.delta",
                "response.output_audio_transcript.delta",
                "response.audio_transcript.delta",
            }:
                # 억제 모드면 취소
                if self.should_suppress_response:
                    if self.has_active_or_pending_response(player):
                        await self.cancel_response(ws, player, reason="")
                    continue
                
                delta = event.get("delta", "")
                if delta and not self.assistant_text_open:
                    # 첫 텍스트 조각에서만 에이전트 접두어를 엽니다.
                    print("[🤖 에이전트] ", end="", flush=True)
                    self.assistant_text_open = True

                if delta and event_type in {
                    "response.output_audio_transcript.delta",
                    "response.audio_transcript.delta",
                    "response.output_text.delta",
                    "response.text.delta",
                }:
                    # 나중에 에코 비교를 할 수 있도록 이번 응답 텍스트를 누적합니다.
                    self.current_assistant_transcript += delta
                print(delta, end="", flush=True)
            
            elif event_type in {
                "response.output_text.done",
                "response.text.done",
                "response.output_audio_transcript.done",
                "response.audio_transcript.done",
            }:
                self.close_assistant_line()
            
            elif event_type in {"response.output_audio.delta", "response.audio.delta"}:
                # 억제 모드면 취소
                if self.should_suppress_response:
                    if self.has_active_or_pending_response(player):
                        await self.cancel_response(ws, player, reason="")
                    continue
                
                audio_base64 = event.get("delta", "")
                if audio_base64:
                    # 오디오 청크가 도착하면 현재 에이전트가 말하는 중이라고 표시합니다.
                    self.agent_speaking = True
                    audio_bytes = base64.b64decode(audio_base64)
                    player.add_audio(audio_bytes)
            
            elif event_type in {"response.output_audio.done", "response.audio.done"}:
                # 서버 기준 오디오 스트림이 끝난 시점입니다.
                self.agent_speaking = False
            
            elif event_type == "response.done":
                self.agent_speaking = False
                self.close_assistant_line()

                if self.current_assistant_transcript.strip():
                    # 이번 응답 전문을 저장해 다음 사용자 전사와의 에코 비교에 사용합니다.
                    self.last_assistant_transcript = self.current_assistant_transcript.strip()
                self.current_assistant_transcript = ""
                self.last_assistant_response_ended_at = time.monotonic()
                
                if self.should_suppress_response:
                    self.should_suppress_response = False
                    print("[⏹️  응답 차단 완료]\n")
                else:
                    # response.done은 서버 기준으로 한 턴의 응답 스트리밍이 끝났다는 뜻입니다.
                    print("[✅ 에이전트] 응답 완료\n")
            
            # ===== 에러 처리 =====
            elif event_type == "error":
                error_msg = event.get("error", {}).get("message", "")
                # 무해한 취소 오류는 무시
                if "no active response" in error_msg:
                    continue
                self.close_assistant_line()
                print(f"\n[❌] 에러: {error_msg}")
                stop_event.set()
            
            if stop_event.is_set():
                break


async def main():
    """메인 함수"""
    # 실행 흐름 요약:
    # 1. AudioPlayer와 VoiceAgent 상태 객체를 초기화합니다.
    # 2. Realtime 세션을 입력/출력 오디오 통합 모드로 설정합니다.
    # 3. 마이크 캡처, 서버 전송, 서버 이벤트 수신을 동시에 실행합니다.
    # 4. Step 4는 인터럽트와 중지 명령을 이벤트 루프 안에서 상태로 제어합니다.
    # 실행 중인 실습 단계를 콘솔에 먼저 보여줍니다.
    print("\n" + "=" * 70)
    print("  Step 4: 사용자 인터럽트 기능 추가")
    print("=" * 70 + "\n")
    
    url = "wss://api.openai.com/v1/realtime?model=gpt-realtime-mini"
    headers = {
        "Authorization": f"Bearer {OPENAI_API_KEY}",
    }
    
    print("[1/4] 초기화...")
    player = AudioPlayer(SAMPLE_RATE)
    player.start()
    agent = VoiceAgent()
    print("[✅] 완료\n")
    
    print("[2/4] WebSocket 연결...")
    async with (await open_websocket(url, headers)) as ws:
        print("[✅] 연결 완료\n")
        
        # 세션 생성
        # 연결 직후 오는 초기 이벤트는 이 단계에서 session.created 확인 용도로 한 번 소비합니다.
        await ws.recv()
        print("[3/4] 세션 생성 완료\n")
        
        # 입력 오디오 전사와 출력 오디오 재생을 모두 켜는 통합 세션을 설정합니다.
        print("[4/4] 세션 설정 (인터럽트 응답 활성화)...")
        session_update = {
            "type": "session.update",
            "session": {
                "type": "realtime",
                "instructions": "당신은 친절한 한국어 AI 비서입니다.",
                "output_modalities": ["audio"],
                "audio": {
                    "input": {
                        "format": {
                            "type": "audio/pcm",
                            "rate": SAMPLE_RATE,
                        },
                        "noise_reduction": {
                            "type": "far_field"
                        },
                        "transcription": {
                            "model": "whisper-1",
                            "language": "ko",
                        },
                        "turn_detection": {
                            # 발화 구간은 서버 VAD(Voice Activity Detection)가 판단하고,
                            # 응답 생성은 코드가 수동 제어합니다.
                            "type": "server_vad",
                            "threshold": 0.7,
                            "silence_duration_ms": 900,
                            "prefix_padding_ms": 300,
                            "create_response": False,
                            # 사용자가 끼어들 때 서버 측 인터럽트 보조 기능도 활성화합니다.
                            "interrupt_response": True,
                        },
                    },
                    "output": {
                        "format": {
                            "type": "audio/pcm",
                            "rate": SAMPLE_RATE,
                        },
                        "voice": "alloy",
                    },
                },
            }
        }
        await ws.send(json.dumps(session_update))
        while True:
            response = await ws.recv()
            event = json.loads(response)
            event_type = event.get("type", "")

            if event_type == "session.updated":
                print("[✅] 설정 완료\n")
                break

            if event_type == "error":
                error_msg = event.get("error", {}).get("message", "알 수 없는 오류")
                print(f"[❌] 설정 에러: {error_msg}\n")
                player.close()
                return
        
        print("=" * 70)
        print("  🎤 마이크로 말하세요!")
        print("  💡 에이전트가 말하는 중에 끼어들어보세요 (즉시 중단됩니다)")
        print("  🎧 스피커 재유입 방지를 위해 이어폰/헤드셋 사용을 권장합니다")
        print("  🛑 '멈춰', '그만' 같은 명령으로 응답 차단 가능")
        print("  ⏹️  종료: Ctrl+C")
        print("=" * 70 + "\n")
        
        stop_event = asyncio.Event()
        # 현재 이벤트 루프를 마이크 콜백 브리지에 전달합니다.
        loop = asyncio.get_event_loop()
        mic_capture = MicrophoneCapture(loop, SAMPLE_RATE)
        
        # 3개 태스크 실행
        # 이 세 태스크가 동시에 돌아야 "듣기-보내기-받기"가 끊기지 않습니다.
        mic_task = asyncio.create_task(mic_capture.start_capture(stop_event))
        send_task = asyncio.create_task(agent.send_audio(ws, mic_capture, stop_event))
        recv_task = asyncio.create_task(agent.receive_events(ws, player, stop_event))
        
        try:
            # 입력 캡처, 서버 전송, 서버 이벤트 수신이 동시에 돌아야 자연스러운 음성 대화가 됩니다.
            await asyncio.gather(mic_task, send_task, recv_task)
        except KeyboardInterrupt:
            print("\n\n[⏹️] 종료 중...")
            stop_event.set()
            mic_task.cancel()
            send_task.cancel()
            recv_task.cancel()
            await asyncio.gather(mic_task, send_task, recv_task, return_exceptions=True)
        
        print("\n" + "=" * 70)
        print("  연결 종료")
        print("=" * 70 + "\n")
        
        # 종료 시 플레이어 스레드도 함께 정리합니다.
        player.close()


if __name__ == "__main__":
    try:
        # asyncio 진입점입니다.
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n[⏹️] 프로그램을 종료했습니다.")