본문 바로가기
AI System/OpenAI API와 바이브 코딩으로 배우는 AI 서비스 개발

d01 - 02. function calling - 04. streaming funtion calls v2

by Toddler_AD 2026. 6. 1.

04. 스트리밍 함수 호출 v2

이 노트북은 stream=True로 함수 호출 인자 JSON이 조각 단위로 도착하는 흐름을 다룹니다. 스트리밍 이벤트를 직접 순회하면서 함수 이름, call_id, 인자 델타를 누적하는 방법을 확인합니다.

# Colab 또는 로컬 노트북 실행 환경을 구분하기 위해 sys를 가져옵니다.
import sys
# 패키지 설치 명령을 현재 Python 커널에서 실행하기 위해 subprocess를 가져옵니다.
import subprocess
# 패키지 설치 여부를 확인하기 위해 importlib.util을 가져옵니다.
import importlib.util
# 환경 변수에서 API 키를 읽고 설정하기 위해 os를 가져옵니다.
import os
# API 키를 화면에 노출하지 않고 입력받기 위해 getpass를 가져옵니다.
import getpass
# .env 파일 위치를 다루기 위해 pathlib의 Path를 가져옵니다.
from pathlib import Path

# google.colab 모듈이 있으면 현재 런타임이 Google Colab이라고 판단합니다.
IN_COLAB = "google.colab" in sys.modules
# 노트북에서 사용하는 import 이름과 pip 패키지 이름을 짝지어 둡니다.
REQUIRED_PACKAGES = {
    "openai": "openai>=2.26.0",
    "dotenv": "python-dotenv>=1.2.2",
    "pydantic": "pydantic>=2.11.0",
    "PIL": "pillow>=12.1.1",
    "requests": "requests>=2.32.5",
    "numpy": "numpy>=2.3.3",
    "websockets": "websockets>=15.0.1",
    "websocket": "websocket-client>=1.8.0",
    "nest_asyncio": "nest-asyncio>=1.6.0",
}

# 현재 커널에서 import할 수 없는 패키지만 설치하는 함수입니다.
def ensure_package(import_name: str, package_name: str) -> None:
    # 이미 import 가능한 패키지는 설치를 건너뜁니다.
    if importlib.util.find_spec(import_name) is not None:
        # 설치가 필요 없음을 호출자에게 조용히 알리고 돌아갑니다.
        return
    # Colab에서는 현재 노트북 커널의 Python에 패키지를 설치해야 합니다.
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package_name])

# Colab 기본 런타임에는 일부 OpenAI 실습 패키지가 없을 수 있으므로 먼저 준비합니다.
if IN_COLAB:
    # 실습 전체에서 필요한 패키지를 하나씩 확인하고 부족한 것만 설치합니다.
    for import_name, package_name in REQUIRED_PACKAGES.items():
        # 누락된 패키지를 현재 Colab 런타임에 설치합니다.
        ensure_package(import_name, package_name)

# 패키지 설치 이후 .env 로딩 기능을 사용할 수 있도록 load_dotenv를 가져옵니다.
from dotenv import load_dotenv

# 현재 작업 폴더의 .env 파일이 있으면 로컬 실행처럼 환경 변수로 올립니다.
load_dotenv(Path.cwd() / ".env")
# OPENAI_API_KEY가 이미 있으면 그대로 사용하고, 없으면 Colab Secrets 또는 입력으로 보완합니다.
if not os.getenv("OPENAI_API_KEY"):
    # Colab Secrets에서 OPENAI_API_KEY를 읽어 볼 변수를 준비합니다.
    secret_key = None
    # Colab이 아닌 로컬 환경에서는 google.colab import가 실패할 수 있으므로 예외를 허용합니다.
    try:
        # Colab Secrets의 userdata API를 가져옵니다.
        from google.colab import userdata
        # Secrets에 저장된 OPENAI_API_KEY 값을 읽습니다.
        secret_key = userdata.get("OPENAI_API_KEY")
    except Exception:
        # Colab Secrets를 사용할 수 없으면 이후 수동 입력 단계로 넘어갑니다.
        secret_key = None
    # Secrets에서 키를 찾았다면 현재 런타임 환경 변수로 설정합니다.
    if secret_key:
        # OpenAI SDK가 자동으로 읽을 수 있도록 표준 환경 변수 이름에 저장합니다.
        os.environ["OPENAI_API_KEY"] = secret_key
    else:
        # 키가 없으면 노트북 실행자가 직접 입력하도록 요청합니다.
        entered_key = getpass.getpass("OPENAI_API_KEY를 입력하세요: ").strip()
        # 빈 문자열이 아닌 값을 입력한 경우에만 환경 변수로 등록합니다.
        if entered_key:
            # OpenAI SDK가 자동으로 읽을 수 있도록 표준 환경 변수 이름에 저장합니다.
            os.environ["OPENAI_API_KEY"] = entered_key

# API 키가 끝까지 없으면 다음 OpenAI API 호출이 실패하므로 명확한 오류를 냅니다.
if not os.getenv("OPENAI_API_KEY"):
    # Colab에서는 Secrets 또는 입력, 로컬에서는 .env 또는 환경 변수를 설정해야 함을 알려 줍니다.
    raise RuntimeError("OPENAI_API_KEY가 없습니다. Colab Secrets, 수동 입력, 또는 .env 파일로 설정해 주세요.")

# 이후 셀에서 Colab 여부를 참고할 수 있도록 간단히 출력합니다.
print(f"개발환경: {'Colab' if IN_COLAB else '로컬'}")
# API 키를 직접 출력하지 않고 준비 완료 여부만 알려 줍니다.
print("OPENAI_API_KEY 준비 완료")

 

1단계. 실행 환경 준비

스트리밍은 한 번에 완성된 응답 객체를 받는 방식과 다릅니다. 이벤트를 순서대로 읽고 누적해야 하므로, 환경 준비와 도구 정의를 먼저 분리해 확인합니다.

# 운영체제 환경 변수에서 모델명과 API 키를 읽기 위해 os를 가져옵니다.
import os
# 함수 호출 인자와 실행 결과를 JSON 문자열로 다루기 위해 json을 가져옵니다.
import json
# API 호출 시간을 측정하기 위해 time을 가져옵니다.
import time
# 실행 결과에 현재 시각을 남기기 위해 datetime을 가져옵니다.
from datetime import datetime
# 노트북 실행 위치와 저장 폴더를 안전하게 다루기 위해 Path를 가져옵니다.
from pathlib import Path
# 다양한 응답 객체를 느슨하게 다루기 위해 Any 타입을 가져옵니다.
from typing import Any
# 워크스페이스 루트의 .env 파일을 읽기 위해 load_dotenv를 가져옵니다.
from dotenv import load_dotenv
# OpenAI API를 호출하는 공식 SDK 클라이언트입니다.
from openai import OpenAI

# 현재 노트북이 실행되는 위치를 기준으로 경로를 계산합니다.
CURRENT_DIR = Path.cwd()
# VS Code에서 루트 폴더에서 실행하는 경우 02.function_calling 폴더를 찾습니다.
if (CURRENT_DIR / "02.function_calling").exists():
    CURRICULUM_ROOT = CURRENT_DIR / "02.function_calling"
# 노트북 파일이 들어 있는 폴더에서 직접 실행하는 경우 현재 위치를 사용합니다.
elif CURRENT_DIR.name == "02.function_calling":
    CURRICULUM_ROOT = CURRENT_DIR
# 그 밖의 위치에서는 상대 경로를 해석해 실습 폴더를 찾습니다.
else:
    CURRICULUM_ROOT = Path("02.function_calling").resolve()
# 워크스페이스 루트는 실습 폴더의 상위 폴더입니다.
WORKSPACE_ROOT = CURRICULUM_ROOT.parent
# API 응답과 실행 추적 파일을 저장할 output 폴더 경로입니다.
OUTPUT_DIR = CURRICULUM_ROOT / "output"
# output 폴더가 없으면 생성하고, 이미 있으면 그대로 둡니다.
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# 워크스페이스 루트의 .env 파일에서 OPENAI_API_KEY를 로드합니다.
load_dotenv(WORKSPACE_ROOT / ".env")
# API 키가 없으면 뒤 단계에서 모호하게 실패하지 않도록 즉시 중단합니다.
if not os.getenv("OPENAI_API_KEY"):
    raise RuntimeError("OPENAI_API_KEY가 없습니다. 워크스페이스 루트의 .env 파일을 확인해 주세요.")

# OpenAI SDK 클라이언트를 생성합니다. 이후 모든 Responses API 호출은 이 client에서 시작합니다.
client = OpenAI()
# Function Calling 실습에 사용할 모델입니다. 환경 변수가 있으면 그 값을 우선 사용합니다.
FUNCTION_MODEL = os.getenv("OPENAI_FUNCTION_MODEL", "gpt-5-mini")
# 주 모델 접근이 어려울 때 사용할 보조 모델입니다.
FALLBACK_MODEL = os.getenv("OPENAI_FUNCTION_FALLBACK_MODEL", "gpt-4.1-mini")

# 절대 경로 대신 워크스페이스 기준 상대 경로를 보기 좋게 출력하기 위한 작은 변환입니다.
def workspace_path(path: Path) -> str:
    # resolve()로 경로를 정규화한 뒤 as_posix()로 Windows에서도 / 표기를 사용합니다.
    return path.resolve().relative_to(WORKSPACE_ROOT.resolve()).as_posix()

# OpenAI SDK의 usage 객체를 JSON 저장 가능한 dict로 바꿉니다.
def usage_to_dict(usage: Any) -> dict[str, Any]:
    # usage가 없으면 빈 dict를 반환해 저장 코드에서 None 처리를 줄입니다.
    if usage is None:
        return {}
    # Pydantic 객체는 model_dump()로 dict로 변환할 수 있습니다.
    if hasattr(usage, "model_dump"):
        return usage.model_dump()
    # 이미 dict라면 그대로 사용합니다.
    if isinstance(usage, dict):
        return usage
    # 예상 밖 형태는 빈 dict로 정리합니다.
    return {}

# 긴 텍스트를 콘솔에서 보기 좋은 미리보기로 줄입니다.
def preview_text(value: str, limit: int = 220) -> str:
    # 줄바꿈과 여러 공백을 하나의 공백으로 접어 한 줄 미리보기를 만듭니다.
    compact = " ".join(str(value).split())
    # 길이가 충분히 짧으면 원문을 그대로 반환합니다.
    if len(compact) <= limit:
        return compact
    # 너무 길면 말줄임표를 붙여 앞부분만 보여 줍니다.
    return compact[: limit - 3] + "..."

# 요청 모델 정보를 출력해 노트북 실행 조건을 먼저 확인합니다.
print(f"실습 폴더: {CURRICULUM_ROOT}")
# 결과 저장 위치도 함께 출력해 학습자가 파일을 찾기 쉽게 합니다.
print(f"결과 저장 폴더: {OUTPUT_DIR}")
# 실제 요청에 사용할 주 모델명을 출력합니다.
print(f"요청 모델: {FUNCTION_MODEL}")
실습 폴더: c:\Users\ai4nu\2026-ai-tech\AS.1.1_0601\02.function_calling
결과 저장 폴더: c:\Users\ai4nu\2026-ai-tech\AS.1.1_0601\02.function_calling\output
요청 모델: gpt-5-mini
 

2단계. 날씨 도구와 이벤트 해석 함수 준비

모델에게 날씨 조회 도구 스키마를 전달하고, 실제 함수는 Python 코드가 실행합니다. 스트리밍 이벤트는 다양한 타입으로 오기 때문에 필요한 필드를 안전하게 읽는 작은 함수를 함께 준비합니다.

# get_weather 도구가 사용할 교육용 날씨 데이터입니다.
WEATHER_DATA = {
    # 서울의 예시 날씨입니다.
    "서울": {"celsius": "22°C", "fahrenheit": "72°F", "condition": "맑음"},
    # 부산의 예시 날씨입니다.
    "부산": {"celsius": "19°C", "fahrenheit": "66°F", "condition": "흐림"},
    # 도쿄의 예시 날씨입니다.
    "도쿄": {"celsius": "21°C", "fahrenheit": "70°F", "condition": "비"},
    # 런던의 예시 날씨입니다.
    "런던": {"celsius": "13°C", "fahrenheit": "55°F", "condition": "흐림"},
}

# search_orders 도구가 사용할 교육용 주문 데이터입니다.
ORDER_DATA = [
    # 배송 예정 주문 1건입니다.
    {"order_id": "ORD-1001", "customer": "김민수", "status": "배송 예정", "amount": 42000},
    # 배송 완료 주문 1건입니다.
    {"order_id": "ORD-1002", "customer": "박서연", "status": "배송 완료", "amount": 18000},
    # 배송 예정 주문 1건입니다.
    {"order_id": "ORD-1003", "customer": "이지훈", "status": "배송 예정", "amount": 75000},
    # 준비 중 주문 1건입니다.
    {"order_id": "ORD-1004", "customer": "최나래", "status": "준비 중", "amount": 36000},
    # 배송 예정 주문 1건입니다.
    {"order_id": "ORD-1005", "customer": "홍유진", "status": "배송 예정", "amount": 51000},
]

# 별자리별 운세 문구를 담은 교육용 데이터입니다.
HOROSCOPE_DATA = {
    # 물병자리 예시 응답입니다.
    "물병자리": "새로운 아이디어를 실행하면 작은 성과가 빠르게 보입니다.",
    # 물고기자리 예시 응답입니다.
    "물고기자리": "직관이 또렷해져 중요한 결정을 차분히 내릴 수 있습니다.",
    # 양자리 예시 응답입니다.
    "양자리": "빠른 실행력이 강점이 되는 날입니다. 우선순위부터 정리하세요.",
    # 황소자리 예시 응답입니다.
    "황소자리": "꾸준함이 성과로 이어집니다. 계획한 루틴을 지켜보세요.",
}

# 모델이 호출할 수 있는 get_horoscope 함수의 스키마입니다.
HOROSCOPE_TOOL = {
    # Responses API 도구 항목이 함수임을 나타냅니다.
    "type": "function",
    # 모델 응답의 function_call.name에 들어갈 함수 이름입니다.
    "name": "get_horoscope",
    # 모델이 언제 이 함수를 써야 하는지 이해하도록 설명합니다.
    "description": "오늘의 별자리 운세를 조회합니다.",
    # 함수 인자의 JSON Schema입니다.
    "parameters": {
        # 인자는 객체 형태로 받습니다.
        "type": "object",
        # sign이라는 문자열 필드를 허용합니다.
        "properties": {"sign": {"type": "string", "description": "별자리 이름 (예: 물병자리)"}},
        # sign은 함수 실행에 반드시 필요합니다.
        "required": ["sign"],
        # 스키마에 없는 필드는 받지 않습니다.
        "additionalProperties": False,
    },
    # strict=True는 모델 인자가 스키마를 더 엄격히 따르도록 합니다.
    "strict": True,
}

# 모델이 호출할 수 있는 get_weather 함수의 스키마입니다.
WEATHER_TOOL = {
    # 도구 타입은 function입니다.
    "type": "function",
    # 실제 Python 함수 이름과 맞춘 도구 이름입니다.
    "name": "get_weather",
    # 모델이 날씨 조회 상황을 인식하도록 설명합니다.
    "description": "지정한 도시의 현재 날씨를 조회합니다.",
    # 함수 인자 구조를 정의합니다.
    "parameters": {
        # 인자는 객체입니다.
        "type": "object",
        # location과 units 두 필드를 허용합니다.
        "properties": {
            # location은 도시 이름입니다.
            "location": {"type": "string", "description": "도시 이름 (예: 서울)"},
            # units는 온도 단위이며 두 값 중 하나여야 합니다.
            "units": {"type": "string", "enum": ["celsius", "fahrenheit"], "description": "온도 단위"},
        },
        # 두 필드 모두 필수입니다.
        "required": ["location", "units"],
        # 예상하지 못한 필드는 거부합니다.
        "additionalProperties": False,
    },
    # strict 모드로 인자 흔들림을 줄입니다.
    "strict": True,
}

# 주문 조회 함수의 도구 스키마입니다.
SEARCH_ORDERS_TOOL = {
    # 도구 타입은 function입니다.
    "type": "function",
    # 모델이 반환할 함수 이름입니다.
    "name": "search_orders",
    # 주문 상태별 조회 기능임을 설명합니다.
    "description": "주문 데이터에서 상태별 주문을 조회합니다.",
    # 함수 인자 스키마입니다.
    "parameters": {
        # 인자는 객체입니다.
        "type": "object",
        # status와 limit 필드를 정의합니다.
        "properties": {
            # status는 조회할 주문 상태입니다.
            "status": {"type": "string", "description": "조회할 주문 상태 (예: 배송 예정)"},
            # limit은 최대 조회 개수입니다.
            "limit": {"type": "integer", "description": "최대 조회 개수"},
        },
        # status와 limit은 모두 필요합니다.
        "required": ["status", "limit"],
        # 추가 필드를 금지해 함수 입력을 예측 가능하게 만듭니다.
        "additionalProperties": False,
    },
    # strict 모드로 서버 측 스키마 검증을 강화합니다.
    "strict": True,
}

# 안내 메일 발송 함수의 도구 스키마입니다.
SEND_NOTIFICATION_EMAIL_TOOL = {
    # 도구 타입은 function입니다.
    "type": "function",
    # 모델 응답에서 사용할 함수 이름입니다.
    "name": "send_notification_email",
    # 메일 발송 기능임을 모델에 알려 줍니다.
    "description": "안내 메일을 발송합니다.",
    # 함수 인자 스키마입니다.
    "parameters": {
        # 인자는 객체입니다.
        "type": "object",
        # 수신자, 제목, 본문 필드를 정의합니다.
        "properties": {
            # to는 수신자 이메일 주소입니다.
            "to": {"type": "string", "description": "수신자 이메일 주소"},
            # subject는 메일 제목입니다.
            "subject": {"type": "string", "description": "메일 제목"},
            # body는 메일 본문입니다.
            "body": {"type": "string", "description": "메일 본문"},
        },
        # 메일 발송에는 세 필드가 모두 필요합니다.
        "required": ["to", "subject", "body"],
        # 추가 필드는 받지 않습니다.
        "additionalProperties": False,
    },
    # strict 모드로 인자 구조를 고정합니다.
    "strict": True,
}

# 계산 함수의 도구 스키마입니다.
CALCULATE_TOOL = {
    # 도구 타입은 function입니다.
    "type": "function",
    # 모델 응답에서 사용할 함수 이름입니다.
    "name": "calculate",
    # 간단한 사칙연산을 수행한다고 설명합니다.
    "description": "간단한 수식을 계산합니다.",
    # 함수 인자 스키마입니다.
    "parameters": {
        # 인자는 객체입니다.
        "type": "object",
        # expression 필드 하나를 받습니다.
        "properties": {"expression": {"type": "string", "description": "계산할 수식 (예: 2 * 3 + 4)"}},
        # expression은 필수입니다.
        "required": ["expression"],
        # 추가 필드는 받지 않습니다.
        "additionalProperties": False,
    },
    # strict 모드로 수식 필드만 받도록 합니다.
    "strict": True,
}

# 도시 날씨를 조회하는 실제 Python 함수입니다.
def get_weather(location: str, units: str = "celsius") -> str:
    # 요청 도시가 데이터에 없으면 기본값을 사용합니다.
    selected = WEATHER_DATA.get(location, {"celsius": "20°C", "fahrenheit": "68°F", "condition": "정보 없음"})
    # 함수 실행 결과를 모델에 전달하기 좋게 JSON 문자열로 반환합니다.
    return json.dumps({"location": location, "temperature": selected.get(units, selected["celsius"]), "condition": selected["condition"], "units": units, "retrieved_at": datetime.now().isoformat(timespec="seconds")}, ensure_ascii=False)

# 주문 데이터를 상태와 개수 기준으로 조회하는 실제 Python 함수입니다.
def search_orders(status: str, limit: int = 5) -> str:
    # 요청한 status와 같은 주문만 골라냅니다.
    filtered = [order for order in ORDER_DATA if order["status"] == status]
    # limit이 너무 작아도 최소 1개 범위로 다루기 위해 max를 사용합니다.
    limited = filtered[: max(1, int(limit))]
    # 조회 결과를 JSON 문자열로 반환합니다.
    return json.dumps({"status": status, "count": len(limited), "orders": limited}, ensure_ascii=False)

# 메일 발송을 흉내 내는 실제 Python 함수입니다.
def send_notification_email(to: str, subject: str, body: str) -> str:
    # 실제 메일 발송 대신 발송 성공 형태의 JSON을 만듭니다.
    return json.dumps({"status": "success", "to": to, "subject": subject, "body": body, "sent_at": datetime.now().isoformat(timespec="seconds")}, ensure_ascii=False)

# 간단한 사칙연산을 수행하는 실제 Python 함수입니다.
def calculate(expression: str) -> str:
    # 허용할 문자만 남겨 임의 코드 실행 위험을 낮춥니다.
    allowed_chars = set("0123456789+-*/()., ")
    # 허용되지 않은 문자가 있으면 계산하지 않고 오류 JSON을 반환합니다.
    if not all(char in allowed_chars for char in expression):
        return json.dumps({"error": "허용되지 않은 문자가 포함되어 계산할 수 없습니다."}, ensure_ascii=False)
    # 수식 계산은 교육용 예제이므로 제한된 입력에서만 eval을 사용합니다.
    try:
        # 계산 결과를 얻습니다.
        result = eval(expression)  # noqa: S307
    # 계산 중 오류가 나면 오류 메시지를 JSON으로 반환합니다.
    except Exception as exc:
        return json.dumps({"error": f"계산 중 오류가 발생했습니다: {exc}"}, ensure_ascii=False)
    # 정상 계산 결과를 JSON 문자열로 반환합니다.
    return json.dumps({"expression": expression, "result": result}, ensure_ascii=False)

# 별자리 운세를 조회하는 실제 Python 함수입니다.
def get_horoscope(sign: str) -> str:
    # 요청한 별자리가 없으면 일반 메시지를 기본값으로 사용합니다.
    horoscope = HOROSCOPE_DATA.get(sign, "긍정적인 태도를 유지하면 좋은 기회를 잡을 수 있습니다.")
    # 모델에 다시 전달할 함수 실행 결과를 JSON 문자열로 만듭니다.
    return json.dumps({"sign": sign, "horoscope": horoscope, "date": datetime.now().strftime("%Y-%m-%d")}, ensure_ascii=False)

# function_call.name 값을 실제 Python 함수로 연결하는 라우팅 표입니다.
FUNCTION_ROUTER = {
    # 날씨 조회 함수 연결입니다.
    "get_weather": get_weather,
    # 주문 조회 함수 연결입니다.
    "search_orders": search_orders,
    # 메일 발송 함수 연결입니다.
    "send_notification_email": send_notification_email,
    # 계산 함수 연결입니다.
    "calculate": calculate,
    # 별자리 운세 함수 연결입니다.
    "get_horoscope": get_horoscope,
}

# Responses API 응답의 output 배열에서 function_call 항목을 찾아내는 함수입니다.
def extract_function_calls(response: Any) -> list[dict[str, Any]]:
    # 응답 객체에서 output 속성을 읽습니다.
    output_items = getattr(response, "output", [])
    # 추출한 함수 호출들을 담을 리스트입니다.
    calls: list[dict[str, Any]] = []
    # output 배열은 메시지, 함수 호출 등 여러 종류의 항목을 포함할 수 있습니다.
    for item in output_items:
        # 항목의 type이 function_call인지 확인합니다.
        if getattr(item, "type", "") != "function_call":
            continue
        # arguments는 JSON 문자열로 들어오므로 원문을 먼저 보관합니다.
        raw_arguments = getattr(item, "arguments", "") or "{}"
        # JSON 문자열을 Python dict로 파싱합니다.
        try:
            arguments_dict = json.loads(raw_arguments)
            arguments_error = None
        # 파싱 실패 시 함수 실행 대신 오류 정보를 남길 수 있게 합니다.
        except Exception as exc:
            arguments_dict = {}
            arguments_error = str(exc)
        # 하나의 함수 호출 정보를 dict로 정리합니다.
        calls.append({"name": getattr(item, "name", ""), "call_id": getattr(item, "call_id", ""), "arguments": raw_arguments, "arguments_dict": arguments_dict, "arguments_error": arguments_error})
    # 수집한 함수 호출 목록을 반환합니다.
    return calls

# 모델이 요청한 함수 이름과 인자를 받아 실제 Python 함수를 실행하는 함수입니다.
def run_requested_function(name: str, arguments: dict[str, Any]) -> str:
    # 함수 이름으로 실행 대상 함수를 찾습니다.
    target = FUNCTION_ROUTER.get(name)
    # 등록되지 않은 함수명이라면 오류 JSON을 반환합니다.
    if target is None:
        return json.dumps({"error": f"알 수 없는 함수입니다: {name}"}, ensure_ascii=False)
    # 함수 실행 중 오류가 나도 모델에게 전달 가능한 JSON으로 바꿉니다.
    try:
        return str(target(**arguments))
    # 예외가 발생하면 함수명, 인자, 오류 메시지를 함께 기록합니다.
    except Exception as exc:
        return json.dumps({"error": "함수 실행 중 예외가 발생했습니다.", "function": name, "arguments": arguments, "details": str(exc)}, ensure_ascii=False)

# 다음 Responses API 호출에 이전 응답 output을 그대로 이어 붙이는 함수입니다.
def append_response_output(input_items: list[Any], response: Any) -> None:
    # output이 list인 경우에만 누적 입력에 추가합니다.
    if isinstance(getattr(response, "output", None), list):
        input_items.extend(response.output)

# 함수 실행 결과를 Responses API가 이해하는 function_call_output 항목으로 추가합니다.
def append_function_outputs(input_items: list[Any], function_outputs: list[dict[str, str]]) -> None:
    # 실행한 함수 결과를 하나씩 순회합니다.
    for item in function_outputs:
        # call_id는 모델이 만든 function_call과 실행 결과를 연결하는 핵심 식별자입니다.
        input_items.append({"type": "function_call_output", "call_id": item["call_id"], "output": item["output"]})

 

 

3단계. stream=True로 함수 호출 인자 델타 수집

이 단계가 핵심입니다. client.responses.create(..., stream=True)를 호출하면 완성된 response가 아니라 이벤트 iterator가 반환됩니다. 각 이벤트에서 함수 호출 인자 조각을 모아 최종 JSON을 복원합니다.

# 스트리밍으로 함수 호출을 요청할 사용자 프롬프트입니다.
STREAM_PROMPT = "서울의 날씨를 섭씨 단위로 조회해서 핵심만 알려주세요."
# 스트림 처리 시작 시간을 기록합니다.
started = time.perf_counter()
# 이벤트 타입별 개수를 저장할 dict입니다.
event_counts: dict[str, int] = {}
# output_index별로 진행 중인 함수 호출 정보를 저장합니다.
pending_calls: dict[int, dict[str, Any]] = {}
# 첫 인자 델타가 도착한 시간을 저장합니다.
first_argument_delta_seconds = None
# Responses API를 스트리밍 모드로 직접 호출합니다.
stream = client.responses.create(
    # 함수 호출을 생성할 모델입니다.
    model=FUNCTION_MODEL,
    # 사용자 요청입니다.
    input=[{"role": "user", "content": STREAM_PROMPT}],
    # 모델이 사용할 수 있는 날씨 도구입니다.
    tools=[WEATHER_TOOL],
    # 학습 재현성을 위해 get_weather 호출을 강제합니다.
    tool_choice={"type": "function", "name": "get_weather"},
    # 스트리밍 모드를 켭니다.
    stream=True,
)
# 스트리밍 이벤트를 하나씩 순회합니다.
for event in stream:
    # 이벤트 타입 문자열을 읽습니다.
    event_type = str(getattr(event, "type", "unknown"))
    # 이벤트 타입별 개수를 누적합니다.
    event_counts[event_type] = event_counts.get(event_type, 0) + 1
    # 새 output item이 추가되면 function_call인지 확인합니다.
    if event_type == "response.output_item.added":
        # output 배열에서의 위치입니다.
        output_index = int(getattr(event, "output_index", -1))
        # 추가된 item 객체입니다.
        item = getattr(event, "item", None)
        # function_call item이면 pending_calls에 기본 정보를 저장합니다.
        if getattr(item, "type", "") == "function_call":
            pending_calls[output_index] = {"output_index": output_index, "id": getattr(item, "id", None), "call_id": getattr(item, "call_id", ""), "name": getattr(item, "name", ""), "arguments": ""}
    # 함수 인자 조각이 도착하면 누적합니다.
    elif event_type == "response.function_call_arguments.delta":
        # 어느 output item의 인자인지 나타내는 위치입니다.
        output_index = int(getattr(event, "output_index", -1))
        # 이번 이벤트의 JSON 조각입니다.
        delta = str(getattr(event, "delta", ""))
        # 해당 함수 호출이 등록되어 있으면 조각을 붙입니다.
        if output_index in pending_calls:
            if delta and first_argument_delta_seconds is None:
                first_argument_delta_seconds = time.perf_counter() - started
            pending_calls[output_index]["arguments"] += delta
    # 인자 전체가 완료 이벤트로 도착하면 최종 문자열로 교체합니다.
    elif event_type == "response.function_call_arguments.done":
        # 어느 output item의 완료 이벤트인지 읽습니다.
        output_index = int(getattr(event, "output_index", -1))
        # 완성된 arguments 문자열입니다.
        arguments = str(getattr(event, "arguments", ""))
        # 완료 문자열이 있으면 누적 문자열 대신 사용합니다.
        if output_index in pending_calls and arguments:
            pending_calls[output_index]["arguments"] = arguments
# 전체 스트리밍 시간을 계산합니다.
stream_total_seconds = time.perf_counter() - started
# 복원된 함수 호출 목록을 담습니다.
function_calls: list[dict[str, Any]] = []
# output_index 순서대로 함수 호출을 정리합니다.
for index in sorted(pending_calls.keys()):
    # 하나의 pending call을 가져옵니다.
    call = pending_calls[index]
    # arguments 원문 문자열을 읽습니다.
    raw_arguments = call.get("arguments", "")
    # arguments JSON을 Python dict로 파싱합니다.
    try:
        parsed_args = json.loads(raw_arguments) if raw_arguments else {}
        parse_error = None
    # JSON 파싱에 실패하면 빈 dict와 오류 메시지를 저장합니다.
    except Exception as exc:
        parsed_args = {}
        parse_error = str(exc)
    # 파싱 결과가 dict가 아니면 빈 dict를 사용합니다.
    call["arguments_dict"] = parsed_args if isinstance(parsed_args, dict) else {}
    # 파싱 오류 메시지를 저장합니다.
    call["arguments_error"] = parse_error
    # 정리된 호출을 목록에 추가합니다.
    function_calls.append(call)
# 함수 호출이 없으면 실습을 중단합니다.
if not function_calls:
    raise RuntimeError("스트리밍 응답에서 function_call을 찾지 못했습니다.")
# 이벤트 요약을 출력합니다.
print(f"이벤트 종류 수: {len(event_counts)}")
print(f"함수 호출 수: {len(function_calls)}")
print(f"첫 인자 델타 도착: {first_argument_delta_seconds}")
print(json.dumps(function_calls, ensure_ascii=False, indent=2))
이벤트 종류 수: 7
함수 호출 수: 1
첫 인자 델타 도착: 2.7014724999899045
[
  {
    "output_index": 1,
    "id": "fc_07891eb43dbda9b9006a1c5024d3088198a3f431e537467171",
    "call_id": "call_5xJwmekceZP4nj1Vg0ayBRQO",
    "name": "get_weather",
    "arguments": "{\"location\":\"서울\",\"units\":\"celsius\"}",
    "arguments_dict": {
      "location": "서울",
      "units": "celsius"
    },
    "arguments_error": null
  }
]
 

4단계. 복원한 함수 호출 실행 후 요약 요청

스트리밍으로 받은 인자도 결국 일반 Function Calling과 같은 방식으로 실행합니다. 함수 결과를 만든 뒤, 별도의 Responses API 호출로 학습자가 읽을 최종 요약을 생성합니다.

# 실행한 함수 결과를 담을 리스트입니다.
function_results: list[dict[str, Any]] = []
# 복원한 함수 호출들을 순회합니다.
for call in function_calls:
    # 인자 파싱 오류가 있으면 오류 JSON을 결과로 둡니다.
    if call.get("arguments_error"):
        output = json.dumps({"error": call.get("arguments_error"), "raw_arguments": call.get("arguments")}, ensure_ascii=False)
    # 인자가 정상이라면 실제 Python 함수를 실행합니다.
    else:
        output = run_requested_function(call.get("name", ""), call.get("arguments_dict", {}))
    # 실행 결과를 기록합니다.
    function_results.append({"name": call.get("name"), "call_id": call.get("call_id"), "arguments": call.get("arguments"), "output": output, "arguments_error": call.get("arguments_error")})
# 요약 생성을 위한 프롬프트입니다.
summary_prompt = "아래는 스트리밍으로 수집한 함수 실행 결과입니다.\n" + json.dumps(function_results, ensure_ascii=False, indent=2) + "\n\n학습자에게 보여줄 최종 안내 문장을 3문장으로 작성해 주세요."
# 요약 API 호출 시작 시간을 기록합니다.
summary_started = time.perf_counter()
# 요약 호출에 사용할 요청 payload를 dict로 구성합니다.
summary_payload = {
    # 요약 생성에 사용할 모델입니다.
    "model": FUNCTION_MODEL,
    # 요약 역할 지시문입니다.
    "instructions": "당신은 학습용 요약 도우미입니다. 함수 실행 결과를 초급자가 이해하기 쉬운 3문장으로 요약하세요.",
    # 함수 실행 결과가 포함된 입력입니다.
    "input": summary_prompt,
    # 요약이 잘리지 않도록 출력 토큰 한도를 넉넉히 둡니다.
    "max_output_tokens": 640,
}
# gpt-5 계열 모델이면 reasoning 설정을 함께 전달합니다.
if FUNCTION_MODEL.lower().startswith("gpt-5"):
    summary_payload["reasoning"] = {"effort": "low"}
# 함수 실행 결과를 바탕으로 최종 요약을 요청합니다.
summary_response = client.responses.create(**summary_payload)
# 최종 요약 텍스트를 읽습니다.
summary_text = summary_response.output_text
# 주 모델 응답이 비어 있으면 폴백 모델로 한 번 더 직접 호출합니다.
if not summary_text:
    fallback_response = client.responses.create(
        # 보조 모델명입니다.
        model=FALLBACK_MODEL,
        # 같은 요약 역할 지시문입니다.
        instructions=summary_payload["instructions"],
        # 같은 함수 실행 결과 입력입니다.
        input=summary_prompt,
        # 폴백 호출도 충분한 출력 토큰을 사용합니다.
        max_output_tokens=640,
    )
    # 이후 저장 단계에서 사용할 응답 객체를 폴백 응답으로 교체합니다.
    summary_response = fallback_response
    # 폴백 응답의 텍스트를 읽습니다.
    summary_text = summary_response.output_text
# 요약 호출 시간을 계산합니다.
summary_elapsed = time.perf_counter() - summary_started
# 요약 텍스트가 비어 있으면 중단합니다.
if not summary_text:
    raise RuntimeError("요약 응답 텍스트가 비어 있습니다.")
# 최종 요약을 출력합니다.
print(summary_text)
요청하신 서울의 현재 날씨는 맑음이며 기온은 22°C입니다. 이 정보는 섭씨(celsius) 단위로 2026-06-01T00:13:45에 수집되었습니다. 다른 도시나 시간대의 날씨를 보고 싶으면 알려주세요.
 

5단계. 스트리밍 이벤트와 결과 저장

스트리밍은 최종 텍스트뿐 아니라 이벤트 흐름 자체가 학습 자료입니다. 이벤트 타입별 개수, 첫 델타 도착 시간, 복원된 함수 인자를 함께 저장합니다.

# 최종 요약 파일 경로입니다.
output_text_path = OUTPUT_DIR / "04_streaming_v2_summary_output.txt"
# 이벤트 정보 파일 경로입니다.
events_path = OUTPUT_DIR / "04_streaming_v2_events.json"
# 함수 실행 결과 파일 경로입니다.
results_path = OUTPUT_DIR / "04_streaming_v2_function_results.json"
# 메타데이터 파일 경로입니다.
metadata_path = OUTPUT_DIR / "04_streaming_v2_meta.json"
# 최종 요약을 저장합니다.
output_text_path.write_text(summary_text + "\n", encoding="utf-8")
# 이벤트 정보를 저장합니다.
events_payload = {"event_counts": event_counts, "first_argument_delta_seconds": None if first_argument_delta_seconds is None else round(first_argument_delta_seconds, 4), "stream_total_seconds": round(stream_total_seconds, 4)}
# 이벤트 JSON을 파일에 씁니다.
events_path.write_text(json.dumps(events_payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
# 함수 실행 결과를 저장합니다.
results_path.write_text(json.dumps({"function_results": function_results}, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
# usage를 dict로 변환합니다.
usage = usage_to_dict(summary_response.usage)
# 메타데이터를 구성합니다.
metadata = {"example": "04.streaming_function_calls_v2", "requested_model": FUNCTION_MODEL, "summary_actual_model": summary_response.model, "function_call_count": len(function_calls), "event_type_count": len(event_counts), "first_argument_delta_seconds": events_payload["first_argument_delta_seconds"], "stream_total_seconds": events_payload["stream_total_seconds"], "summary_elapsed_seconds": round(summary_elapsed, 4), "summary_input_tokens": usage.get("input_tokens"), "summary_output_tokens": usage.get("output_tokens"), "summary_total_tokens": usage.get("total_tokens"), "summary_output_path": workspace_path(output_text_path), "events_path": workspace_path(events_path), "results_path": workspace_path(results_path)}
# 메타데이터를 저장합니다.
metadata_path.write_text(json.dumps(metadata, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
# 저장 위치를 출력합니다.
print(f"요약 저장: {workspace_path(output_text_path)}")
print(f"이벤트 저장: {workspace_path(events_path)}")
print(f"메타데이터 저장: {workspace_path(metadata_path)}")
요약 저장: 02.function_calling/output/04_streaming_v2_summary_output.txt
이벤트 저장: 02.function_calling/output/04_streaming_v2_events.json
메타데이터 저장: 02.function_calling/output/04_streaming_v2_meta.json