본문 바로가기
AI System/OpenAI API와 바이브 코딩으로 배우는 AI 서비스 개발

d01 - 01. text_generation - 04. Streaming_v2

by Toddler_AD 2026. 6. 1.

04. Responses API 스트리밍 v2

이 노트북은 stream=True로 스트리밍 응답을 받습니다. 스트리밍 이벤트를 반복문으로 하나씩 읽고, 텍스트 델타를 누적합니다.

# Colab 또는 로컬 노트북 실행 환경을 구분하기 위해 sys를 가져옵니다.
import sys
# 패키지 설치 명령을 현재 Python 커널에서 실행하기 위해 subprocess를 가져옵니다.
import subprocess
# 패키지 설치 여부를 확인하기 위해 importlib.util을 가져옵니다.
import importlib.util
# 환경 변수에서 API 키를 읽고 설정하기 위해 os를 가져옵니다.
import os
# API 키를 화면에 노출하지 않고 입력받기 위해 getpass를 가져옵니다.
import getpass
# .env 파일 위치를 다루기 위해 pathlib의 Path를 가져옵니다.
from pathlib import Path

# google.colab 모듈이 있으면 현재 런타임이 Google Colab이라고 판단합니다.
IN_COLAB = "google.colab" in sys.modules
# 노트북에서 사용하는 import 이름과 pip 패키지 이름을 짝지어 둡니다.
REQUIRED_PACKAGES = {
    "openai": "openai>=2.26.0",
    "dotenv": "python-dotenv>=1.2.2",
    "pydantic": "pydantic>=2.11.0",
    "PIL": "pillow>=12.1.1",
    "requests": "requests>=2.32.5",
    "numpy": "numpy>=2.3.3",
    "websockets": "websockets>=15.0.1",
    "websocket": "websocket-client>=1.8.0",
    "nest_asyncio": "nest-asyncio>=1.6.0",
}

# 현재 커널에서 import할 수 없는 패키지만 설치하는 함수입니다.
def ensure_package(import_name: str, package_name: str) -> None:
    # 이미 import 가능한 패키지는 설치를 건너뜁니다.
    if importlib.util.find_spec(import_name) is not None:
        # 설치가 필요 없음을 호출자에게 조용히 알리고 돌아갑니다.
        return
    # Colab에서는 현재 노트북 커널의 Python에 패키지를 설치해야 합니다.
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package_name])

# Colab 기본 런타임에는 일부 OpenAI 실습 패키지가 없을 수 있으므로 먼저 준비합니다.
if IN_COLAB:
    # 실습 전체에서 필요한 패키지를 하나씩 확인하고 부족한 것만 설치합니다.
    for import_name, package_name in REQUIRED_PACKAGES.items():
        # 누락된 패키지를 현재 Colab 런타임에 설치합니다.
        ensure_package(import_name, package_name)

# 패키지 설치 이후 .env 로딩 기능을 사용할 수 있도록 load_dotenv를 가져옵니다.
from dotenv import load_dotenv

# 현재 작업 폴더의 .env 파일이 있으면 로컬 실행처럼 환경 변수로 올립니다.
load_dotenv(Path.cwd() / ".env")
# OPENAI_API_KEY가 이미 있으면 그대로 사용하고, 없으면 Colab Secrets 또는 입력으로 보완합니다.
if not os.getenv("OPENAI_API_KEY"):
    # Colab Secrets에서 OPENAI_API_KEY를 읽어 볼 변수를 준비합니다.
    secret_key = None
    # Colab이 아닌 로컬 환경에서는 google.colab import가 실패할 수 있으므로 예외를 허용합니다.
    try:
        # Colab Secrets의 userdata API를 가져옵니다.
        from google.colab import userdata
        # Secrets에 저장된 OPENAI_API_KEY 값을 읽습니다.
        secret_key = userdata.get("OPENAI_API_KEY")
    except Exception:
        # Colab Secrets를 사용할 수 없으면 이후 수동 입력 단계로 넘어갑니다.
        secret_key = None
    # Secrets에서 키를 찾았다면 현재 런타임 환경 변수로 설정합니다.
    if secret_key:
        # OpenAI SDK가 자동으로 읽을 수 있도록 표준 환경 변수 이름에 저장합니다.
        os.environ["OPENAI_API_KEY"] = secret_key
    else:
        # 키가 없으면 노트북 실행자가 직접 입력하도록 요청합니다.
        entered_key = getpass.getpass("OPENAI_API_KEY를 입력하세요: ").strip()
        # 빈 문자열이 아닌 값을 입력한 경우에만 환경 변수로 등록합니다.
        if entered_key:
            # OpenAI SDK가 자동으로 읽을 수 있도록 표준 환경 변수 이름에 저장합니다.
            os.environ["OPENAI_API_KEY"] = entered_key

# API 키가 끝까지 없으면 다음 OpenAI API 호출이 실패하므로 명확한 오류를 냅니다.
if not os.getenv("OPENAI_API_KEY"):
    # Colab에서는 Secrets 또는 입력, 로컬에서는 .env 또는 환경 변수를 설정해야 함을 알려 줍니다.
    raise RuntimeError("OPENAI_API_KEY가 없습니다. Colab Secrets, 수동 입력, 또는 .env 파일로 설정해 주세요.")

# 이후 셀에서 Colab 여부를 참고할 수 있도록 간단히 출력합니다.
print(f"개발환경: {'Colab' if IN_COLAB else '로컬'}")
# API 키를 직접 출력하지 않고 준비 완료 여부만 알려 줍니다.
print("OPENAI_API_KEY 준비 완료")

 

 

1단계. 최소 실행 환경 준비

이 단계에서는 API 호출에 필요한 기본 준비를 합니다. import, .env 로드, OpenAI() 클라이언트 생성, output 폴더 생성을 차례대로 확인합니다.

아래 셀에서 특히 client = OpenAI() 줄을 확인하세요. 이후 모든 API 호출은 이 client 객체에서 시작됩니다.

# 운영체제 환경 변수와 .env 값을 읽기 위해 os를 사용합니다.
import os
# JSON 메타데이터를 파일로 저장하기 위해 json을 사용합니다.
import json
# 실행 시간을 비교하는 예제에서 사용할 시간 측정 모듈입니다.
import time
# 노트북 실행 위치와 output 폴더 경로를 안전하게 다루기 위한 Path입니다.
from pathlib import Path

# .env 파일에 저장된 OPENAI_API_KEY를 현재 Python 환경으로 불러옵니다.
from dotenv import load_dotenv
# OpenAI API를 직접 호출하기 위한 공식 SDK 클라이언트입니다.
from openai import OpenAI

# 현재 노트북이 워크스페이스 루트에서 실행되는지, 01.text_generation 폴더에서 실행되는지 확인합니다.
CURRENT_DIR = Path.cwd().resolve()
# 워크스페이스 루트에서 실행 중이면 01.text_generation 폴더를 실습 루트로 사용합니다.
if (CURRENT_DIR / "01.text_generation").exists():
    TEXT_GENERATION_DIR = CURRENT_DIR / "01.text_generation"
# 이미 01.text_generation 폴더 안에서 실행 중이면 현재 폴더를 실습 루트로 사용합니다.
else:
    TEXT_GENERATION_DIR = CURRENT_DIR

# 워크스페이스 루트는 01.text_generation 폴더의 부모 폴더입니다.
WORKSPACE_ROOT = TEXT_GENERATION_DIR.parent
# 실행 결과를 저장할 output 폴더 경로입니다.
OUTPUT_DIR = TEXT_GENERATION_DIR / "output"
# output 폴더가 없으면 새로 만들고, 이미 있으면 그대로 둡니다.
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# 워크스페이스 루트의 .env 파일을 읽습니다.
load_dotenv(WORKSPACE_ROOT / ".env")

# OpenAI API 키가 설정되어 있는지 먼저 확인합니다.
if not os.getenv("OPENAI_API_KEY"):
    raise RuntimeError("OPENAI_API_KEY가 없습니다. 워크스페이스 루트의 .env 파일을 확인해 주세요.")

# OpenAI API를 호출할 클라이언트를 생성합니다. 이 줄이 모든 API 호출의 출발점입니다.
client = OpenAI()

# 이후 셀에서 저장 경로를 확인할 수 있도록 출력합니다.
print(f"실습 폴더: {TEXT_GENERATION_DIR}")
print(f"결과 저장 폴더: {OUTPUT_DIR}")
실습 폴더: C:\Users\ai4nu\2026-ai-tech\AS.1.1_0601\01.text_generation
결과 저장 폴더: C:\Users\ai4nu\2026-ai-tech\AS.1.1_0601\01.text_generation\output
 

2단계. 스트리밍 요청 준비하기

# 스트리밍 요청에 사용할 모델입니다.
RESPONSES_MODEL = os.getenv("OPENAI_RESPONSES_MODEL", "gpt-4.1-mini")
# 스트리밍으로 받을 답변의 주제와 형식입니다.
STREAM_PROMPT = "Responses API가 왜 중요한지 초급 교육생 시점으로 설명해 주세요. 형식: 1) 핵심 포인트 3개, 2) 실무에서 편한 이유 2개, 3) 한 줄 요약."

print(f"요청 모델: {RESPONSES_MODEL}")
print(STREAM_PROMPT)
요청 모델: gpt-4.1-mini
Responses API가 왜 중요한지 초급 교육생 시점으로 설명해 주세요. 형식: 1) 핵심 포인트 3개, 2) 실무에서 편한 이유 2개, 3) 한 줄 요약.
 

3단계. stream=True로 Responses API 직접 호출하기

아래 셀의 핵심은 stream=True와 for event in stream:입니다. 이벤트 타입이 response.output_text.delta일 때 텍스트 조각을 누적합니다.

# 스트리밍 시작 시간을 기록합니다.
stream_started = time.perf_counter()
# 첫 텍스트 조각이 도착한 시간을 저장할 변수입니다.
first_delta_seconds = None
# 이벤트 타입별 발생 횟수를 저장할 딕셔너리입니다.
event_counts = {}
# 텍스트 델타를 순서대로 모을 리스트입니다.
chunks = []

# Responses API를 스트리밍 모드로 직접 호출합니다.
stream = client.responses.create(
    # 사용할 모델입니다.
    model=RESPONSES_MODEL,
    # 실제 사용자 요청입니다.
    input=STREAM_PROMPT,
    # 최대 출력 토큰 수입니다.
    max_output_tokens=420,
    # 스트리밍 모드를 켭니다.
    stream=True,
)

# 스트리밍 이벤트를 하나씩 읽습니다.
for event in stream:
    # 이벤트 타입을 문자열로 읽습니다.
    event_type = event.type
    # 이벤트 타입별 개수를 누적합니다.
    event_counts[event_type] = event_counts.get(event_type, 0) + 1
    # 텍스트 델타 이벤트인지 확인합니다.
    if event_type == "response.output_text.delta":
        # 첫 델타라면 도착 시간을 기록합니다.
        if first_delta_seconds is None:
            first_delta_seconds = time.perf_counter() - stream_started
        # event.delta에 들어 있는 텍스트 조각을 저장합니다.
        chunks.append(event.delta)

# 텍스트 조각을 하나로 합칩니다.
streaming_text = "".join(chunks).strip()
# 전체 실행 시간을 계산합니다.
total_seconds = time.perf_counter() - stream_started

print("[스트리밍 결과]")
print(streaming_text)
print(f"첫 델타 도착 시간: {first_delta_seconds}")
print(f"전체 실행 시간: {total_seconds:.4f}초")
print(f"이벤트 타입별 개수: {event_counts}")
[스트리밍 결과]
1) 핵심 포인트 3개  
- **자동화와 효율성:** Responses API를 사용하면 사람의 개입 없이 자동으로 답변을 주고받을 수 있어 작업 속도가 빨라집니다.  
- **일관된 대화 품질 유지:** API를 통해 동일한 기준과 방식으로 응답을 제공할 수 있어, 사용자들이 항상 안정적인 경험을 할 수 있습니다.  
- **다양한 시스템 연동 가능:** 여러 프로그램이나 서비스와 쉽게 연결할 수 있어, 업무에 맞게 맞춤형 응답 시스템을 만들 수 있습니다.  

2) 실무에서 편한 이유 2개  
- **시간 절약:** 반복적인 답변 작업을 자동으로 처리해 주기 때문에 직원들이 더 중요한 업무에 집중할 수 있습니다.  
- **관리와 모니터링 용이:** API를 통해 응답 상태를 쉽게 추적·분석할 수 있어서 문제를 빠르게 발견하고 개선할 수 있습니다.  

3) 한 줄 요약  
Responses API는 자동화와 효율성을 높여 일상 업무를 간편하게 만들어 주는 중요한 도구입니다.
첫 델타 도착 시간: 2.302803600032348
전체 실행 시간: 5.0913초
이벤트 타입별 개수: {'response.created': 1, 'response.in_progress': 1, 'response.output_item.added': 1, 'response.content_part.added': 1, 'response.output_text.delta': 241, 'response.output_text.done': 1, 'response.content_part.done': 1, 'response.output_item.done': 1, 'response.completed': 1}
 

4단계. 스트리밍 결과가 비었을 때 직접 일반 호출로 확인하기

스트리밍 이벤트 구조는 SDK 버전에 따라 다르게 보일 수 있습니다. 텍스트가 비었다면 같은 프롬프트를 일반 Responses API 호출로 다시 보내 API 자체가 정상인지 확인합니다.

# 스트리밍 텍스트가 비어 있는지 확인합니다.
if not streaming_text:
    # 일반 호출을 직접 수행합니다.
    fallback_response = client.responses.create(
        # 같은 모델을 사용합니다.
        model=RESPONSES_MODEL,
        # 같은 프롬프트를 사용합니다.
        input=STREAM_PROMPT,
        # 최대 출력 토큰 수입니다.
        max_output_tokens=320,
    )
    # 일반 호출 결과를 최종 텍스트로 사용합니다.
    streaming_text = fallback_response.output_text
    # 일반 호출 보완 여부를 기록합니다.
    fallback_used = True
# 스트리밍으로 텍스트를 얻었다면 보완 호출을 사용하지 않았다고 기록합니다.
else:
    fallback_used = False

print("[최종 텍스트]")
print(streaming_text)
print(f"일반 호출 보완 사용 여부: {fallback_used}")
[최종 텍스트]
1) 핵심 포인트 3개  
- **자동화와 효율성:** Responses API를 사용하면 사람의 개입 없이 자동으로 답변을 주고받을 수 있어 작업 속도가 빨라집니다.  
- **일관된 대화 품질 유지:** API를 통해 동일한 기준과 방식으로 응답을 제공할 수 있어, 사용자들이 항상 안정적인 경험을 할 수 있습니다.  
- **다양한 시스템 연동 가능:** 여러 프로그램이나 서비스와 쉽게 연결할 수 있어, 업무에 맞게 맞춤형 응답 시스템을 만들 수 있습니다.  

2) 실무에서 편한 이유 2개  
- **시간 절약:** 반복적인 답변 작업을 자동으로 처리해 주기 때문에 직원들이 더 중요한 업무에 집중할 수 있습니다.  
- **관리와 모니터링 용이:** API를 통해 응답 상태를 쉽게 추적·분석할 수 있어서 문제를 빠르게 발견하고 개선할 수 있습니다.  

3) 한 줄 요약  
Responses API는 자동화와 효율성을 높여 일상 업무를 간편하게 만들어 주는 중요한 도구입니다.
일반 호출 보완 사용 여부: False
 

5단계. 스트리밍 결과 저장하기

# 저장할 파일 경로를 정합니다.
text_path = OUTPUT_DIR / "04_streaming_v2_output.txt"
events_path = OUTPUT_DIR / "04_streaming_v2_events.json"

# 최종 텍스트를 저장합니다.
text_path.write_text(streaming_text + "\\n", encoding="utf-8")

# 이벤트 정보를 JSON으로 저장합니다.
events_payload = {
    "example": "04.streaming_v2",
    "requested_model": RESPONSES_MODEL,
    "first_delta_seconds": first_delta_seconds,
    "total_seconds": round(total_seconds, 4),
    "event_counts": event_counts,
    "fallback_used": fallback_used,
    "output_path": str(text_path),
}
events_path.write_text(json.dumps(events_payload, ensure_ascii=False, indent=2) + "\\n", encoding="utf-8")

print(f"텍스트 저장: {text_path}")
print(f"이벤트 정보 저장: {events_path}")
텍스트 저장: C:\Users\ai4nu\2026-ai-tech\AS.1.1_0601\01.text_generation\output\04_streaming_v2_output.txt
이벤트 정보 저장: C:\Users\ai4nu\2026-ai-tech\AS.1.1_0601\01.text_generation\output\04_streaming_v2_events.json