본문 바로가기
AI System/OpenAI API와 바이브 코딩으로 배우는 AI 서비스 개발

d02 - 06. Audio - 04. voice agent - step1. basic connection

by Toddler_AD 2026. 6. 2.

step1_basic_connection.py

"""
================================================================================
Step 1: Realtime API 기본 연결하기
================================================================================

이 단계에서 배우는 것:
- WebSocket으로 OpenAI Realtime API 연결
- 세션 설정 전송
- 텍스트로 메시지 주고받기

실행 방법:
    python step1_basic_connection.py

예상 동작:
    1. WebSocket 연결
    2. 세션 설정 전송
    3. "안녕하세요"라는 텍스트 메시지 전송
    4. 에이전트의 텍스트 응답 받아 출력
"""

import asyncio
import json
import os
from dotenv import load_dotenv
import websockets


# .env 파일에서 API 키 로드
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

if not OPENAI_API_KEY:
    print("❌ OPENAI_API_KEY를 .env 파일에 설정하세요")
    exit(1)


async def open_websocket(url: str, headers: dict):
    """websockets 버전 차이에 따른 헤더 파라미터 호환 처리"""
    # 실행 흐름 요약:
    # 1. 최신 websockets 시그니처로 연결을 시도합니다.
    # 2. TypeError가 나면 구버전 시그니처로 한 번 더 연결합니다.
    # 3. 호출부는 라이브러리 버전을 의식하지 않고 같은 함수를 사용합니다.
    try:
        # 최신 websockets 버전은 additional_headers를 사용합니다.
        return await websockets.connect(url, additional_headers=headers)
    except TypeError:
        # 구버전 호환을 위해 extra_headers도 지원합니다.
        return await websockets.connect(url, extra_headers=headers)


async def main():
    """
    Realtime API에 연결하여 기본 텍스트 대화를 수행합니다.
    """
    # 실행 흐름 요약:
    # 1. Realtime API와 WebSocket 연결을 맺습니다.
    # 2. session.update로 텍스트 출력 전용 세션을 설정합니다.
    # 3. conversation.item.create로 사용자 메시지를 추가합니다.
    # 4. response.create로 응답 생성을 요청합니다.
    # 5. response.output_text.delta와 response.done을 순서대로 처리합니다.
    # 실습 단계 제목을 먼저 출력해 지금 어떤 예제를 실행 중인지 분명히 보여줍니다.
    print("\n" + "=" * 60)
    print("  Step 1: Realtime API 기본 연결")
    print("=" * 60 + "\n")
    
    # WebSocket URL 설정
    url = "wss://api.openai.com/v1/realtime?model=gpt-realtime-mini"
    
    # 헤더에 API 키 포함
    headers = {
        "Authorization": f"Bearer {OPENAI_API_KEY}",
    }
    
    print("[1/4] WebSocket 연결 중...")
    
    # Realtime API와 지속 연결을 맺고, 이후 이벤트를 같은 소켓에서 주고받습니다.
    async with (await open_websocket(url, headers)) as ws:
        print("[✅] WebSocket 연결 완료\n")
        
        # 연결 직후 서버가 만들어 준 세션 정보를 한 번 먼저 받습니다.
        # 여기서는 session.created 같은 초기 이벤트가 오는지만 확인하면 충분합니다.
        response = await ws.recv()
        event = json.loads(response)
        print(f"[2/4] 세션 생성: {event['type']}\n")
        
        # 이 세션이 어떤 방식으로 동작할지 설정을 서버에 보냅니다.
        print("[3/4] 세션 설정 전송...")
        session_update = {
            "type": "session.update",
            "session": {
                # Realtime 세션이라는 점을 명시합니다.
                "type": "realtime",
                # 에이전트의 기본 역할과 말투를 지정합니다.
                "instructions": "당신은 친절한 한국어 AI 비서입니다.",
                # Step 1은 텍스트만 다루므로 출력 모달리티(modality)는 text만 사용합니다.
                "output_modalities": ["text"],
            }
        }
        await ws.send(json.dumps(session_update))
        
        # session.updated가 올 때까지 기다려 설정이 실제 반영됐는지 확인합니다.
        # 이 확인 단계를 생략하면 아직 설정이 반영되기 전에 다음 요청을 보내게 될 수 있습니다.
        while True:
            response = await ws.recv()
            event = json.loads(response)
            event_type = event.get("type", "")

            if event_type == "session.updated":
                print(f"[✅] 세션 설정 완료: {event_type}\n")
                break

            if event_type == "error":
                # 설정 단계에서 실패하면 이후 대화를 진행해도 의미가 없으므로 종료합니다.
                print(f"[❌] 세션 설정 에러: {event.get('error')}\n")
                return
        
        # 대화 시작
        print("[4/4] 대화 시작")
        print("-" * 60)
        
        # 이번 턴에서 모델에게 전달할 사용자 입력 텍스트입니다.
        # Step 1은 가장 작은 텍스트 예제이므로 짧고 분명한 질문을 사용합니다.
        user_message = "안녕하세요! 오늘 날씨 어때요?"
        print(f"\n👤 사용자: {user_message}")
        
        # conversation.item.create 이벤트로 대화 히스토리에 사용자 메시지를 추가합니다.
        conversation_item = {
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "user",
                "content": [
                    {
                        # 텍스트 입력이므로 input_text 타입을 사용합니다.
                        "type": "input_text",
                        "text": user_message
                    }
                ]
            }
        }
        await ws.send(json.dumps(conversation_item))
        
        # 메시지를 기록만 하는 것으로는 답이 생성되지 않으므로 별도로 response.create를 보냅니다.
        # Realtime API에서는 "대화 아이템 추가"와 "응답 생성 요청"이 분리되는 점이 중요합니다.
        response_create = {"type": "response.create"}
        await ws.send(json.dumps(response_create))
        
        # 이후 들어오는 텍스트 델타(delta)를 한 줄에 이어서 보여주기 위해 접두어를 먼저 출력합니다.
        print("\n[🤖 에이전트] ", end="", flush=True)

        while True:
            # 서버는 응답을 여러 이벤트 조각으로 나눠 보내므로 한 개씩 계속 처리합니다.
            # 즉, 여기서는 HTTP 응답 본문 하나를 읽는 것이 아니라 이벤트 스트림을 소비합니다.
            response = await ws.recv()
            event = json.loads(response)
            event_type = event.get("type")
            
            # 텍스트 응답 조각(delta)은 도착하는 즉시 이어 붙여 출력합니다.
            if event_type in {"response.output_text.delta", "response.text.delta"}:
                delta = event.get("delta", "")
                print(delta, end="", flush=True)
            
            # 텍스트 스트림이 끝나면 다음 로그가 보기 좋도록 줄바꿈만 정리합니다.
            elif event_type in {"response.output_text.done", "response.text.done"}:
                print("\n")
            
            # response.done은 한 턴의 전체 응답 처리가 끝났다는 신호입니다.
            elif event_type == "response.done":
                print("\n[✅] 응답 완료")
                break
            
            # 서버 오류가 오면 내용을 보여주고 루프를 종료합니다.
            elif event_type == "error":
                print(f"\n[❌] 에러: {event.get('error')}")
                break
        
        # 정상 종료 로그를 남겨 사용자가 연결이 끝난 시점을 볼 수 있게 합니다.
        print("\n" + "=" * 60)
        print("  연결 종료")
        print("=" * 60 + "\n")


if __name__ == "__main__":
    # asyncio 진입점에서 main 코루틴을 실행합니다.
    asyncio.run(main())