본문 바로가기
AI System/OpenAI API와 바이브 코딩으로 배우는 AI 서비스 개발

d01 - 03. Structed Output - 05. multistep structured pipeline v2

by Toddler_AD 2026. 6. 2.

05. 다단계 Structured Outputs 파이프라인 v2

이 노트북은 원문 장애 기록을 1단계 구조화, 2단계 대응 계획, 3단계 경영진 브리프로 이어지는 파이프라인으로 처리합니다. 앞 단계의 JSON을 다음 단계 입력으로 넘기는 방식은 실무 자동화에서 매우 자주 쓰입니다.

# Colab 또는 로컬 노트북 실행 환경을 구분하기 위해 sys를 가져옵니다.
import sys
# 패키지 설치 명령을 현재 Python 커널에서 실행하기 위해 subprocess를 가져옵니다.
import subprocess
# 패키지 설치 여부를 확인하기 위해 importlib.util을 가져옵니다.
import importlib.util
# 환경 변수에서 API 키를 읽고 설정하기 위해 os를 가져옵니다.
import os
# API 키를 화면에 노출하지 않고 입력받기 위해 getpass를 가져옵니다.
import getpass
# .env 파일 위치를 다루기 위해 pathlib의 Path를 가져옵니다.
from pathlib import Path

# google.colab 모듈이 있으면 현재 런타임이 Google Colab이라고 판단합니다.
IN_COLAB = "google.colab" in sys.modules
# 노트북에서 사용하는 import 이름과 pip 패키지 이름을 짝지어 둡니다.
REQUIRED_PACKAGES = {
    "openai": "openai>=2.26.0",
    "dotenv": "python-dotenv>=1.2.2",
    "pydantic": "pydantic>=2.11.0",
    "PIL": "pillow>=12.1.1",
    "requests": "requests>=2.32.5",
    "numpy": "numpy>=2.3.3",
    "websockets": "websockets>=15.0.1",
    "websocket": "websocket-client>=1.8.0",
    "nest_asyncio": "nest-asyncio>=1.6.0",
}

# 현재 커널에서 import할 수 없는 패키지만 설치하는 함수입니다.
def ensure_package(import_name: str, package_name: str) -> None:
    # 이미 import 가능한 패키지는 설치를 건너뜁니다.
    if importlib.util.find_spec(import_name) is not None:
        # 설치가 필요 없음을 호출자에게 조용히 알리고 돌아갑니다.
        return
    # Colab에서는 현재 노트북 커널의 Python에 패키지를 설치해야 합니다.
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package_name])

# Colab 기본 런타임에는 일부 OpenAI 실습 패키지가 없을 수 있으므로 먼저 준비합니다.
if IN_COLAB:
    # 실습 전체에서 필요한 패키지를 하나씩 확인하고 부족한 것만 설치합니다.
    for import_name, package_name in REQUIRED_PACKAGES.items():
        # 누락된 패키지를 현재 Colab 런타임에 설치합니다.
        ensure_package(import_name, package_name)

# 패키지 설치 이후 .env 로딩 기능을 사용할 수 있도록 load_dotenv를 가져옵니다.
from dotenv import load_dotenv

# 현재 작업 폴더의 .env 파일이 있으면 로컬 실행처럼 환경 변수로 올립니다.
load_dotenv(Path.cwd() / ".env")
# OPENAI_API_KEY가 이미 있으면 그대로 사용하고, 없으면 Colab Secrets 또는 입력으로 보완합니다.
if not os.getenv("OPENAI_API_KEY"):
    # Colab Secrets에서 OPENAI_API_KEY를 읽어 볼 변수를 준비합니다.
    secret_key = None
    # Colab이 아닌 로컬 환경에서는 google.colab import가 실패할 수 있으므로 예외를 허용합니다.
    try:
        # Colab Secrets의 userdata API를 가져옵니다.
        from google.colab import userdata
        # Secrets에 저장된 OPENAI_API_KEY 값을 읽습니다.
        secret_key = userdata.get("OPENAI_API_KEY")
    except Exception:
        # Colab Secrets를 사용할 수 없으면 이후 수동 입력 단계로 넘어갑니다.
        secret_key = None
    # Secrets에서 키를 찾았다면 현재 런타임 환경 변수로 설정합니다.
    if secret_key:
        # OpenAI SDK가 자동으로 읽을 수 있도록 표준 환경 변수 이름에 저장합니다.
        os.environ["OPENAI_API_KEY"] = secret_key
    else:
        # 키가 없으면 노트북 실행자가 직접 입력하도록 요청합니다.
        entered_key = getpass.getpass("OPENAI_API_KEY를 입력하세요: ").strip()
        # 빈 문자열이 아닌 값을 입력한 경우에만 환경 변수로 등록합니다.
        if entered_key:
            # OpenAI SDK가 자동으로 읽을 수 있도록 표준 환경 변수 이름에 저장합니다.
            os.environ["OPENAI_API_KEY"] = entered_key

# API 키가 끝까지 없으면 다음 OpenAI API 호출이 실패하므로 명확한 오류를 냅니다.
if not os.getenv("OPENAI_API_KEY"):
    # Colab에서는 Secrets 또는 입력, 로컬에서는 .env 또는 환경 변수를 설정해야 함을 알려 줍니다.
    raise RuntimeError("OPENAI_API_KEY가 없습니다. Colab Secrets, 수동 입력, 또는 .env 파일로 설정해 주세요.")

# 이후 셀에서 Colab 여부를 참고할 수 있도록 간단히 출력합니다.
print(f"개발환경: {'Colab' if IN_COLAB else '로컬'}")
# API 키를 직접 출력하지 않고 준비 완료 여부만 알려 줍니다.
print("OPENAI_API_KEY 준비 완료")

 

 

 

1단계. 실행 환경 준비

API 키, 모델명, 저장 폴더를 먼저 준비합니다. 구조화 출력은 결과 저장과 검증까지 이어지므로 경로와 환경을 초기에 확정하는 것이 중요합니다.

# 환경 변수와 모델명을 읽기 위해 os를 가져옵니다.
import os
# 구조화 결과와 메타데이터를 JSON으로 저장하기 위해 json을 가져옵니다.
import json
# API 호출 시간을 측정하기 위해 time을 가져옵니다.
import time
# 파일 경로를 안전하게 다루기 위해 Path를 가져옵니다.
from pathlib import Path
# 일부 함수와 응답 객체의 느슨한 타입을 표현하기 위해 Any를 가져옵니다.
from typing import Any, Literal
# .env 파일에서 OPENAI_API_KEY를 로드하기 위해 load_dotenv를 가져옵니다.
from dotenv import load_dotenv
# OpenAI API를 호출하는 공식 SDK 클라이언트입니다.
from openai import OpenAI
# Structured Outputs에서 Python 클래스로 스키마를 정의하기 위해 BaseModel과 Field를 가져옵니다.
from pydantic import BaseModel, Field

# 현재 실행 위치를 확인합니다.
CURRENT_DIR = Path.cwd()
# 루트에서 실행 중이면 03.structured_output 폴더를 선택합니다.
if (CURRENT_DIR / "03.structured_output").exists():
    CURRICULUM_ROOT = CURRENT_DIR / "03.structured_output"
# 실습 폴더 안에서 실행 중이면 현재 위치를 사용합니다.
elif CURRENT_DIR.name == "03.structured_output":
    CURRICULUM_ROOT = CURRENT_DIR
# 그 밖의 경우 상대 경로를 해석합니다.
else:
    CURRICULUM_ROOT = Path("03.structured_output").resolve()
# 워크스페이스 루트는 실습 폴더의 상위 폴더입니다.
WORKSPACE_ROOT = CURRICULUM_ROOT.parent
# 결과 저장 폴더 경로입니다.
OUTPUT_DIR = CURRICULUM_ROOT / "output"
# 결과 저장 폴더가 없으면 생성합니다.
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# .env 파일에서 환경 변수를 읽습니다.
load_dotenv(WORKSPACE_ROOT / ".env")
# API 키가 없으면 명확한 메시지로 중단합니다.
if not os.getenv("OPENAI_API_KEY"):
    raise RuntimeError("OPENAI_API_KEY가 없습니다. 워크스페이스 루트의 .env 파일을 확인해 주세요.")
# OpenAI 클라이언트를 생성합니다.
client = OpenAI()
# Structured Outputs 실습에 사용할 모델입니다.
STRUCTURED_MODEL = os.getenv("OPENAI_STRUCTURED_MODEL", "gpt-5-mini")
# 필요할 때 사용할 보조 모델명입니다.
FALLBACK_MODEL = os.getenv("OPENAI_STRUCTURED_FALLBACK_MODEL", "gpt-4.1-mini")

# 워크스페이스 기준 상대 경로를 반환합니다.
def workspace_path(path: Path) -> str:
    # Windows에서도 읽기 쉬운 / 구분자 문자열로 변환합니다.
    return path.resolve().relative_to(WORKSPACE_ROOT.resolve()).as_posix()

# Pydantic 객체, dict, list를 JSON 저장 가능한 형태로 바꿉니다.
def normalize_data(value: Any) -> Any:
    # Pydantic 모델은 model_dump()로 dict가 됩니다.
    if hasattr(value, "model_dump"):
        return value.model_dump()
    # dict는 값들을 재귀적으로 변환합니다.
    if isinstance(value, dict):
        return {key: normalize_data(item) for key, item in value.items()}
    # list는 각 원소를 재귀적으로 변환합니다.
    if isinstance(value, list):
        return [normalize_data(item) for item in value]
    # 그 밖의 값은 그대로 반환합니다.
    return value

# usage 객체를 dict로 변환합니다.
def usage_to_dict(usage: Any) -> dict[str, Any]:
    # usage가 없으면 빈 dict를 반환합니다.
    if usage is None:
        return {}
    # Pydantic 객체면 model_dump()로 변환합니다.
    if hasattr(usage, "model_dump"):
        return usage.model_dump()
    # 이미 dict면 그대로 반환합니다.
    if isinstance(usage, dict):
        return usage
    # 예상 밖 형태는 빈 dict로 처리합니다.
    return {}

# 긴 텍스트를 한 줄 미리보기로 줄입니다.
def preview_text(value: str, limit: int = 220) -> str:
    # 공백과 줄바꿈을 정리합니다.
    compact = " ".join(str(value).split())
    # 길이가 짧으면 그대로 반환합니다.
    if len(compact) <= limit:
        return compact
    # 길면 말줄임표와 함께 앞부분만 반환합니다.
    return compact[: limit - 3] + "..."

# JSON 텍스트를 안전하게 파싱합니다.
def parse_json_text(raw_text: str) -> tuple[Any, str | None]:
    # 문자열 앞뒤 공백을 제거합니다.
    text = str(raw_text or "").strip()
    # 비어 있으면 오류 메시지를 반환합니다.
    if not text:
        return None, "빈 문자열입니다."
    # Markdown 코드블록으로 감싸진 JSON도 처리합니다.
    if text.startswith("```"):
        lines = text.splitlines()
        text = "\n".join(lines[1:-1]).strip() if len(lines) >= 3 else text
    # json.loads로 파싱을 시도합니다.
    try:
        return json.loads(text), None
    # 실패하면 예외를 밖으로 던지지 않고 메시지로 반환합니다.
    except Exception as exc:
        return None, str(exc)

# Responses parse 응답에서 구조화 파싱 결과를 찾습니다.
def extract_parsed_response(response: Any) -> Any:
    # SDK 버전에 따라 최상위 output_parsed에 파싱 결과가 들어올 수 있습니다.
    direct = getattr(response, "output_parsed", None)
    # 최상위 값이 있으면 그대로 반환합니다.
    if direct is not None:
        return direct
    # 최상위에 없으면 output 배열의 content block을 확인합니다.
    for item in getattr(response, "output", []) or []:
        # content 속성을 안전하게 읽습니다.
        content = getattr(item, "content", None)
        # content가 list가 아니면 다음 항목으로 넘어갑니다.
        if not isinstance(content, list):
            continue
        # content block을 하나씩 확인합니다.
        for block in content:
            # block 안의 parsed 속성을 읽습니다.
            parsed = getattr(block, "parsed", None)
            # parsed 값이 있으면 반환합니다.
            if parsed is not None:
                return parsed
    # 어느 위치에서도 찾지 못하면 None을 반환합니다.
    return None

# 필수 필드 누락 여부를 확인합니다.
def missing_fields(payload: dict[str, Any], required: list[str]) -> list[str]:
    # 누락된 필드명을 담을 리스트입니다.
    missing: list[str] = []
    # 필수 필드를 하나씩 검사합니다.
    for key in required:
        # 키가 없으면 누락으로 기록합니다.
        if key not in payload:
            missing.append(key)
            continue
        # 값이 None이면 누락으로 봅니다.
        if payload[key] is None:
            missing.append(key)
        # 문자열이 공백뿐이면 누락으로 봅니다.
        elif isinstance(payload[key], str) and not payload[key].strip():
            missing.append(key)
        # 리스트나 dict가 비어 있으면 누락으로 봅니다.
        elif isinstance(payload[key], (list, dict)) and not payload[key]:
            missing.append(key)
    # 누락된 필드명 목록을 반환합니다.
    return missing

# 실행 위치와 모델 정보를 출력합니다.
print(f"실습 폴더: {CURRICULUM_ROOT}")
# 저장 폴더를 출력합니다.
print(f"결과 저장 폴더: {OUTPUT_DIR}")
# 요청 모델을 출력합니다.
print(f"요청 모델: {STRUCTURED_MODEL}")
실습 폴더: c:\Users\ai4nu\2026-ai-tech\AS.1.1_0601\03.structured_output
결과 저장 폴더: c:\Users\ai4nu\2026-ai-tech\AS.1.1_0601\03.structured_output\output
요청 모델: gpt-5-mini

 

 

2단계. 출력 스키마와 입력 준비

각 단계의 산출물은 다음 단계의 입력이 됩니다. 그래서 단계마다 별도의 Pydantic 스키마를 두어 책임을 분리합니다.

# 1단계 장애 기록 스키마입니다.
class IncidentRecord(BaseModel):
    # 장애 제목입니다.
    incident_title: str = Field(description="장애 제목")
    # 발생 시각입니다.
    occurrence_time: str = Field(description="발생 시각")
    # 영향 범위입니다.
    impact_scope: str = Field(description="영향 범위")
    # 핵심 사실 목록입니다.
    key_facts: list[str] = Field(description="핵심 사실")
    # 추정 원인 목록입니다.
    suspected_causes: list[str] = Field(description="추정 원인")
    # 임시 조치 목록입니다.
    temporary_mitigation: list[str] = Field(description="임시 조치")

# 2단계 실행 항목 스키마입니다.
class ActionItem(BaseModel):
    # 우선순위입니다.
    priority: Literal["P1", "P2", "P3"] = Field(description="우선순위")
    # 담당자입니다.
    owner: str = Field(description="담당자")
    # 실행할 조치입니다.
    action: str = Field(description="실행할 조치")
    # 완료 목표 시각입니다.
    deadline: str = Field(description="완료 목표 시각")

# 2단계 대응 계획 스키마입니다.
class ResponsePlan(BaseModel):
    # 현재 리스크 수준입니다.
    risk_level: Literal["낮음", "보통", "높음", "심각"] = Field(description="현재 리스크 수준")
    # 고객 공지 문안입니다.
    external_message: str = Field(description="고객 공지 문안")
    # 실행 액션 목록입니다.
    actions: list[ActionItem] = Field(description="실행 액션 목록")
    # 추적할 모니터링 지표입니다.
    monitoring_metrics: list[str] = Field(description="추적 지표")
    # 경영진 보고 필요 여부입니다.
    requires_exec_report: bool = Field(description="경영진 보고 필요 여부")

# 3단계 경영진 브리프 스키마입니다.
class ExecutiveBrief(BaseModel):
    # 한 줄 제목입니다.
    headline: str = Field(description="한 줄 제목")
    # 3줄 요약입니다.
    three_line_summary: list[str] = Field(description="3줄 요약")
    # 핵심 실행 항목입니다.
    critical_actions: list[str] = Field(description="핵심 실행 항목")

# 구조화 파이프라인의 원문 입력입니다.
RAW_INCIDENT_TEXT = "3월 10일 오후 2시, 결제 API 응답 지연으로 주문 실패율이 급증했습니다. 영향 범위는 모바일 앱 사용자 약 12%이며, CS 문의가 40분 동안 180건 접수되었습니다. 원인 후보는 DB 커넥션 풀 포화와 캐시 재시작 실패로 추정됩니다. 현재 임시 조치로 트래픽 우회와 재시도 정책을 적용했습니다."

 

 

 

3단계. Responses API 직접 호출

세 번의 client.responses.parse(...) 호출이 순서대로 실행됩니다. 각 호출은 이전 단계의 구조화 JSON을 입력으로 받아 더 높은 수준의 구조를 만듭니다.

# 각 단계의 실행 기록을 저장할 리스트입니다.
stage_trace: list[dict[str, Any]] = []
# gpt-5 계열 모델인지 확인합니다.
uses_gpt5 = STRUCTURED_MODEL.lower().startswith("gpt-5")
# 1단계 호출 시작 시간을 기록합니다.
stage1_started = time.perf_counter()
# 1단계 API 호출에 전달할 payload를 구성합니다.
stage1_payload = {"model": STRUCTURED_MODEL, "instructions": "당신은 장애 관제 분석가입니다. 입력 내용을 장애 사실 중심으로 구조화하세요.", "input": [{"role": "user", "content": RAW_INCIDENT_TEXT}], "text_format": IncidentRecord, "max_output_tokens": 1100}
# gpt-5 계열 모델이면 추론 강도를 낮게 지정해 구조화 출력에 집중시킵니다.
if uses_gpt5:
    stage1_payload["reasoning"] = {"effort": "low"}
# 원문 장애 내용을 IncidentRecord 스키마로 구조화합니다.
stage1_response = client.responses.parse(**stage1_payload)
# 1단계 소요 시간을 계산합니다.
stage1_elapsed = time.perf_counter() - stage1_started
# 1단계 파싱 결과를 dict로 정규화합니다.
incident = normalize_data(extract_parsed_response(stage1_response))
# 1단계 필수 필드 누락을 검사합니다.
incident_missing = missing_fields(incident, ["incident_title", "occurrence_time", "impact_scope", "key_facts", "suspected_causes", "temporary_mitigation"])
# 1단계 기록을 저장합니다.
stage_trace.append({"stage": 1, "name": "IncidentRecord", "actual_model": stage1_response.model, "elapsed_seconds": round(stage1_elapsed, 4), "missing_fields": incident_missing})
# 2단계 입력으로 1단계 JSON을 문자열화합니다.
stage2_prompt = "아래 IncidentRecord JSON을 바탕으로 대응 계획을 작성하세요. 액션은 가장 중요한 3개 이내로 간결하게 작성하세요.\n" + json.dumps(incident, ensure_ascii=False, indent=2)
# 2단계 호출 시작 시간을 기록합니다.
stage2_started = time.perf_counter()
# 2단계 API 호출에 전달할 payload를 구성합니다.
stage2_payload = {"model": STRUCTURED_MODEL, "instructions": "당신은 장애 대응 리더입니다. 실행 가능한 계획 중심으로 구조화하세요. 각 문자열은 짧게 쓰고 액션은 3개 이내로 제한하세요.", "input": [{"role": "user", "content": stage2_prompt}], "text_format": ResponsePlan, "max_output_tokens": 1800}
# gpt-5 계열 모델이면 2단계 호출에도 reasoning 설정을 추가합니다.
if uses_gpt5:
    stage2_payload["reasoning"] = {"effort": "low"}
# IncidentRecord를 ResponsePlan으로 확장합니다.
stage2_response = client.responses.parse(**stage2_payload)
# 2단계 소요 시간을 계산합니다.
stage2_elapsed = time.perf_counter() - stage2_started
# 2단계 파싱 결과를 dict로 정규화합니다.
plan = normalize_data(extract_parsed_response(stage2_response))
# 2단계 필수 필드 누락을 검사합니다.
plan_missing = missing_fields(plan, ["risk_level", "external_message", "actions", "monitoring_metrics", "requires_exec_report"])
# 2단계 기록을 저장합니다.
stage_trace.append({"stage": 2, "name": "ResponsePlan", "actual_model": stage2_response.model, "elapsed_seconds": round(stage2_elapsed, 4), "missing_fields": plan_missing, "action_count": len(plan.get("actions", []))})
# 3단계 입력으로 대응 계획 JSON을 문자열화합니다.
stage3_prompt = "아래 ResponsePlan JSON을 경영진 공유용 브리프로 압축하세요. 세 줄 요약은 정확히 3개 항목으로 작성하세요.\n" + json.dumps(plan, ensure_ascii=False, indent=2)
# 3단계 호출 시작 시간을 기록합니다.
stage3_started = time.perf_counter()
# 3단계 API 호출에 전달할 payload를 구성합니다.
stage3_payload = {"model": STRUCTURED_MODEL, "instructions": "당신은 CTO 보좌 역할입니다. 짧고 명확하게 핵심만 구조화하세요.", "input": [{"role": "user", "content": stage3_prompt}], "text_format": ExecutiveBrief, "max_output_tokens": 900}
# gpt-5 계열 모델이면 마지막 단계에도 reasoning 설정을 추가합니다.
if uses_gpt5:
    stage3_payload["reasoning"] = {"effort": "low"}
# ResponsePlan을 ExecutiveBrief로 요약합니다.
stage3_response = client.responses.parse(**stage3_payload)
# 3단계 소요 시간을 계산합니다.
stage3_elapsed = time.perf_counter() - stage3_started
# 3단계 파싱 결과를 dict로 정규화합니다.
brief = normalize_data(extract_parsed_response(stage3_response))
# 3단계 필수 필드 누락을 검사합니다.
brief_missing = missing_fields(brief, ["headline", "three_line_summary", "critical_actions"])
# 3단계 기록을 저장합니다.
stage_trace.append({"stage": 3, "name": "ExecutiveBrief", "actual_model": stage3_response.model, "elapsed_seconds": round(stage3_elapsed, 4), "missing_fields": brief_missing, "summary_line_count": len(brief.get("three_line_summary", []))})
# 최종 브리프를 출력합니다.
print(json.dumps(brief, ensure_ascii=False, indent=2))
# 단계별 누락 필드 수를 출력합니다.
print(f"누락 필드 수: {[len(item['missing_fields']) for item in stage_trace]}")
{
  "headline": "결제 처리 지연(높음) — 즉시 대응 필요",
  "three_line_summary": [
    "현상: 결제 처리 지연으로 주문 실패 가능성 발생(리스크: 높음) — 고객에 재시도 안내 중.",
    "즉시조치: DB 커넥션 풀 확장·정리(P1, DB팀, 즉시 30분), 캐시 복구·무결성 검사(P1, SRE, 즉시 45분), 고객공지·CS 템플릿 배포(P2, 고객지원/운영, 1시간).",
    "모니터링·보고: 결제 지연·주문 성공률·DB/캐시 지표 집중 관찰, 경영진 보고 필요."
  ],
  "critical_actions": [
    "P1 — DB팀: DB 커넥션 풀 긴급 확장 및 유휴/장기 연결 정리 (데드라인: 즉시, 30분)",
    "P1 — SRE: 캐시 재시작 실패 원인 복구 및 캐시 무결성 검사 (데드라인: 즉시, 45분)",
    "P2 — 고객지원/운영: 고객 공지 및 CS 템플릿 배포, 재시도 안내 (데드라인: 1시간)"
  ]
}
누락 필드 수: [0, 0, 0]

 

 

 
 

4단계. 결과 저장과 검증 요약

구조화 출력은 이후 코드가 바로 사용할 데이터가 됩니다. 텍스트, JSON, 메타데이터를 함께 저장하면 사람이 읽는 결과와 프로그램이 처리하는 결과를 모두 확인할 수 있습니다.

# 최종 보고서 행을 구성합니다.
report_lines = ["# 05 예제 실행 보고서", "", "## 1단계 - IncidentRecord", f"- 제목: {incident.get('incident_title', '')}", f"- 발생 시각: {incident.get('occurrence_time', '')}", f"- 영향 범위: {incident.get('impact_scope', '')}", "", "## 2단계 - ResponsePlan", f"- 리스크 수준: {plan.get('risk_level', '')}", f"- 경영진 보고 필요: {plan.get('requires_exec_report', False)}", "", "## 3단계 - ExecutiveBrief", f"- 헤드라인: {brief.get('headline', '')}"]
# 3줄 요약을 보고서에 추가합니다.
report_lines.extend([f"- {row}" for row in brief.get("three_line_summary", [])])
# 최종 보고서 텍스트를 만듭니다.
final_text = "\n".join(report_lines) + "\n"
# 최종 텍스트 파일 경로입니다.
output_text_path = OUTPUT_DIR / "05_pipeline_v2_final_output.txt"
# 각 단계 JSON 파일 경로입니다.
incident_path = OUTPUT_DIR / "05_pipeline_v2_incident.json"
plan_path = OUTPUT_DIR / "05_pipeline_v2_plan.json"
brief_path = OUTPUT_DIR / "05_pipeline_v2_brief.json"
# 단계 추적 파일 경로입니다.
trace_path = OUTPUT_DIR / "05_pipeline_v2_trace.json"
# 메타데이터 파일 경로입니다.
metadata_path = OUTPUT_DIR / "05_pipeline_v2_meta.json"
# 최종 보고서를 저장합니다.
output_text_path.write_text(final_text, encoding="utf-8")
# 1단계 JSON을 저장합니다.
incident_path.write_text(json.dumps(incident, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
# 2단계 JSON을 저장합니다.
plan_path.write_text(json.dumps(plan, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
# 3단계 JSON을 저장합니다.
brief_path.write_text(json.dumps(brief, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
# 단계 추적 정보를 저장합니다.
trace_path.write_text(json.dumps({"stages": stage_trace}, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
# 마지막 단계 usage를 dict로 변환합니다.
usage = usage_to_dict(stage3_response.usage)
# 메타데이터를 구성합니다.
metadata = {"example": "05.multistep_structured_pipeline_v2", "requested_model": STRUCTURED_MODEL, "actual_model": stage3_response.model, "stage_count": 3, "incident_missing_count": len(incident_missing), "plan_missing_count": len(plan_missing), "brief_missing_count": len(brief_missing), "total_actions": len(plan.get("actions", [])), "summary_line_count": len(brief.get("three_line_summary", [])), "input_tokens": usage.get("input_tokens"), "output_tokens": usage.get("output_tokens"), "total_tokens": usage.get("total_tokens"), "output_path": workspace_path(output_text_path), "trace_path": workspace_path(trace_path)}
# 메타데이터를 저장합니다.
metadata_path.write_text(json.dumps(metadata, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
# 저장 위치를 출력합니다.
print(f"최종 보고서 저장: {workspace_path(output_text_path)}")
print(f"추적 JSON 저장: {workspace_path(trace_path)}")
print(f"메타데이터 저장: {workspace_path(metadata_path)}")
최종 보고서 저장: 03.structured_output/output/05_pipeline_v2_final_output.txt
추적 JSON 저장: 03.structured_output/output/05_pipeline_v2_trace.json
메타데이터 저장: 03.structured_output/output/05_pipeline_v2_meta.json