본문 바로가기
생성형 AI

LangGraph의 Reducer vs React의 useReducer - 3

by Toddler_AD 2025. 11. 20.

13. LangGraph: 한 상태(State)에 여러 리듀서 쓰는 패턴

13.1 왜 여러 리듀서가 필요할까?

이전까지는 LangGraph 상태를 이렇게 단순하게 썼죠.

class AgentState(TypedDict, total=False):
    messages: Annotated[list[AnyMessage], add_messages]

하지만 실제로는 한 세션에서 갖고 싶은 상태가 훨씬 많습니다. 예를 들어:

  • messages: 대화 히스토리 → 기존처럼 add_messages로 누적
  • retry_count: 같은 요청을 몇 번 재시도했는지 → 새 값으로 덮어쓰는 게 아니라 합산하고 싶음
  • last_tool: 마지막으로 사용한 툴 이름 → 가장 최근 호출 값으로 덮어씀
  • metadata: 여러 노드에서 key-value 형태의 부가 정보를 추가 → 딕셔너리 병합

이걸 전부 “마지막 값이 이긴다(last write wins)”로 처리하면:

  • 동시 실행(branch)에서 서로의 업데이트가 덮어 씌워져서 날아가는 문제가 생깁니다.

그래서 필드별로 다른 병합 규칙(리듀서)를 지정하는 게 중요합니다.


13.2 예시: 복합 상태 정의

아래는 예시로 쓸 AgentState입니다.

# src/graph/state.py

from typing import Annotated, Dict, Any, Optional
from typing_extensions import TypedDict
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages

# 1) State 정의 ---------------------------------------------------
class AgentState(TypedDict, total=False):
    # ① 대화 메시지: add_messages로 "누적"되는 리스트
    messages: Annotated[list[AnyMessage], add_messages]

    # ② 재시도 횟수: 여러 노드에서 +1씩 증가 → 합산(reduce)하고 싶음
    retry_count: int

    # ③ 마지막으로 호출한 툴 이름: 항상 "가장 최근 값"을 쓰면 됨
    last_tool: Optional[str]

    # ④ 메타데이터: 여러 노드에서 key-value를 추가 → dict 병합
    metadata: Dict[str, Any]

여기서 messages만 Annotated[..., add_messages]로 되어 있고
나머지는 일단 일반 타입입니다.

LangGraph는 명시된 리듀서가 없으면 기본 병합 전략을 사용합니다(보통 “덮어쓰기”에 가까운 동작).
하지만 우리는 retry_count는 합산, metadata는 병합, last_tool은 마지막 값 유지 등, 좀 더 의도적인 병합을 원합니다.

그래서 커스텀 리듀서 함수를 정의하고, 이들을 필드에 바인딩해 사용하는 패턴으로 확장할 수 있습니다.


13.3 커스텀 리듀서 함수 설계 아이디어

13.3.1 공통 아이디어

리듀서는 기본적으로 이 형태입니다.

def reducer(old_value, new_value):
    # old_value: 현재 state에 있던 값
    # new_value: 노드에서 이번에 업데이트한 값
    return merged_value

이걸 필드별로 설계하면 됩니다.

13.3.2 retry_count: 합산 리듀서

def sum_reducer(old: Optional[int], new: Optional[int]) -> int:
    old_val = old or 0
    new_val = new or 0
    return old_val + new_val
  • 여러 노드가 {"retry_count": 1}씩 내놓으면,
    최종 state에서는 기존 + 1 + 1 + ... 형태로 합산됩니다.
  • 특히 병렬(branch) 실행에서 각자 +1 하더라도, 최종적으로 잘 합쳐지게 됩니다.

13.3.3 last_tool: 마지막 값 우선

def last_value_reducer(old, new):
    # new가 None이 아니면 신규 값을 우선
    if new is not None:
        return new
    return old
  • 보통 마지막으로 툴을 호출한 노드에서만 {"last_tool": "get_weather"} 같은 값을 넣도록 하고,
  • 이후에 새로운 값이 들어오면 그것으로 덮어씌우는 식으로 동작시킬 수 있습니다.

13.3.4 metadata: 딕셔너리 병합

from typing import Dict, Any, Optional

def dict_merge_reducer(old: Optional[Dict[str, Any]],
                       new: Optional[Dict[str, Any]]) -> Dict[str, Any]:
    # None 방지
    old = old or {}
    new = new or {}
    # 간단히 "오른쪽 우선" 딕셔너리 병합
    merged = {**old, **new}
    return merged

 

  • 예:
    • 노드 A: {"metadata": {"intent": "search"}}
    • 노드 B: {"metadata": {"score": 0.9}}
  • 병합 결과:
    • {"metadata": {"intent": "search", "score": 0.9}}

13.4 LangGraph에 커스텀 리듀서 연결하기 

구체적인 API는 사용 중인 LangGraph 버전/언어 바인딩에 따라 조금씩 달라질 수 있지만, 개념적으로는 다음과 같이 “상태 스키마 + 리듀서 매핑”을 넘기게 됩니다.

# 개념 코드 (버전에 따라 다를 수 있음)

from langgraph.graph import StateGraph

# 1) 그래프 빌더 생성
builder = StateGraph(AgentState)

# 2) 필드별 리듀서 설정 (개념적 예시)
builder.add_state_reducer("retry_count", sum_reducer)
builder.add_state_reducer("last_tool", last_value_reducer)
builder.add_state_reducer("metadata", dict_merge_reducer)

# messages는 Annotated[..., add_messages] 로 이미 설정되어 있다고 가정

 

이제 그래프 내의 각 노드에서 다음처럼 partial state를 내놓으면:

def some_node(state: AgentState) -> AgentState:
    return {
        "messages": [... 어떤 메시지들 ...],
        "retry_count": 1,
        "metadata": {"step": "some_node"}
    }

LangGraph 엔진이 상태를 병합할 때:

  • messages → add_messages로 누적
  • retry_count → sum_reducer로 합산
  • metadata → dict_merge_reducer로 병합

이렇게 동작하게 됩니다.


13.5 병렬 실행 시나리오 예시

상태 초기값:

state = {
    "messages": [],
    "retry_count": 0,
    "last_tool": None,
    "metadata": {},
}

브랜치 A, B가 동시에 실행:

  • 브랜치 A 결과:
{
    "messages": [AIMessage("브랜치 A 분석 로그")],
    "retry_count": 1,
    "metadata": {"branch_A": True}
}
  • 브랜치 B 결과:
{
    "messages": [AIMessage("브랜치 B 분석 로그")],
    "retry_count": 1,
    "metadata": {"branch_B": True}
}

병합 후:

  • messages: add_messages
    → ["브랜치 A 로그", "브랜치 B 로그"] (순서는 실행/머지 순서에 따라 달라질 수 있음)
  • retry_count: sum_reducer
    → 0(초기) + 1(A) + 1(B) = 2
  • metadata: dict_merge_reducer
    → {"branch_A": True, "branch_B": True}

이렇게 각 필드의 의미에 맞는 병합이 가능해집니다.
이게 바로 LangGraph reducer를 “여러 개, 목적에 맞게” 쓰는 기본 패턴입니다.


14. React: useReducer + useEffect로 요청 생명주기 관리하기

이번에는 프론트 쪽 고급 패턴입니다.

핵심 철학은 하나입니다.

리듀서는 “순수”하게 두고,
네트워크 요청/취소/중복 방지 같은 IO는 useEffect와 AbortController로 처리하자.

14.1 상태와 액션 설계

조금만 더 정교하게 상태를 쪼개 보겠습니다.

// src/components/chat/ChatWindow.tsx

// 1) 상태 -----------------------------------
type RequestStatus = 'idle' | 'pending' | 'success' | 'error' | 'cancelled'

type ChatMessage = {
  id: string
  role: 'user' | 'assistant'
  content: string
}

type ChatState = {
  messages: ChatMessage[]
  input: string
  status: RequestStatus      // 요청 상태
  error: string | null
  // 요청 구분용 ID (요청이 바뀔 때마다 증가)
  currentRequestId: number | null
  // 사용자가 타이핑 중인지 여부
  isTyping: boolean
}

// 2) 액션 -----------------------------------
type ChatAction =
  | { type: 'CHANGE_INPUT'; payload: string }
  | { type: 'SET_TYPING'; payload: boolean }
  | { type: 'SEND_REQUEST' }
  | { type: 'REQUEST_SUCCESS'; payload: { messages: ChatMessage[] } }
  | { type: 'REQUEST_ERROR'; payload: string }
  | { type: 'REQUEST_CANCELLED' }

상태 해석

  • status:
    • 'pending'이면 버튼 비활성화, 스피너 표시 등 UI에 활용
  • currentRequestId:
    • 요청이 발생할 때마다 +1 해서,
    • useEffect에서 “가장 최신 요청만 유효”하게 관리
  • isTyping:
    • 타이핑 인디케이터(“입력 중...”)에 활용

14.2 리듀서 구현 (IO 없이 상태만)

// src/components/chat/ChatWindow.tsx

function chatReducer(state: ChatState, action: ChatAction): ChatState {
  switch (action.type) {
    case 'CHANGE_INPUT': {
      return {
        ...state,
        input: action.payload,
        isTyping: action.payload.length > 0,
      }
    }

    case 'SET_TYPING': {
      return {
        ...state,
        isTyping: action.payload,
      }
    }

    case 'SEND_REQUEST': {
      const trimmed = state.input.trim()
      if (!trimmed) return state

      const userMessage: ChatMessage = {
        id: `user-${Date.now()}`,
        role: 'user',
        content: trimmed,
      }

      return {
        ...state,
        input: '',
        isTyping: false,
        status: 'pending',
        error: null,
        // 새 요청 ID 부여
        currentRequestId:
          state.currentRequestId === null
            ? 1
            : state.currentRequestId + 1,
        messages: [...state.messages, userMessage],
      }
    }

    case 'REQUEST_SUCCESS': {
      return {
        ...state,
        status: 'success',
        error: null,
        messages: action.payload.messages,
      }
    }

    case 'REQUEST_ERROR': {
      return {
        ...state,
        status: 'error',
        error: action.payload,
      }
    }

    case 'REQUEST_CANCELLED': {
      return {
        ...state,
        status: 'cancelled',
      }
    }

    default:
      return state
  }
}

const initialState: ChatState = {
  messages: [],
  input: '',
  status: 'idle',
  error: null,
  currentRequestId: null,
  isTyping: false,
}

여기까지는 오직 상태 계산만 담당합니다.
연결된 네트워크 IO는 전혀 없습니다.


14.3 useEffect + AbortController로 “최신 요청만 유효”하게 만들기

이제 요청/취소/중복 방지를 useEffect에서 담당하게 합니다.

아이디어는 이렇습니다.

  1. SEND_REQUEST 액션이 디스패치되면,
    • status: 'pending'
    • currentRequestId 증가
  2. useEffect에서 status === 'pending' & currentRequestId !== null일 때
    • AbortController를 하나 만들고
    • fetch 요청을 날림
  3. 응답이 돌아오면:
    • 아직도 이 effect가 “최신 요청”인지 확인
      (requestId를 클로저에 캡처한 뒤, 현재 state의 currentRequestId와 비교)
    • 최신이 아니면 결과를 버림 → 중복 요청으로 인한 꼬임 방지
  4. 컴포넌트 unmount 또는 currentRequestId 변화 시
    • 이전 AbortController로 요청 취소 → 취소 처리

코드로 보면:

// src/components/chat/ChatWindow.tsx

import { useReducer, useEffect, FormEvent, useRef } from 'react'
import { sendChat, ServerMessage } from '@/lib/api/chat'

export function ChatWindow() {
  const [state, dispatch] = useReducer(chatReducer, initialState)

  // 현재 AbortController를 기억할 ref
  const abortRef = useRef<AbortController | null>(null)

  // 1) 폼 전송 시에는 SEND_REQUEST만 날리고, 실제 요청은 useEffect에서 처리
  const handleSubmit = (e: FormEvent) => {
    e.preventDefault()
    dispatch({ type: 'SEND_REQUEST' })
  }

  // 2) 요청 생명주기 관리용 useEffect
  useEffect(() => {
    // pending 상태가 아니면 아무 것도 안 함
    if (state.status !== 'pending' || state.currentRequestId === null) return

    const requestId = state.currentRequestId
    const controller = new AbortController()
    abortRef.current = controller

    const run = async () => {
      try {
        // 서버에 보낼 메시지 목록 구성
        const serverMessages: ServerMessage[] = state.messages.map((m) => ({
          role: m.role,
          content: m.content,
        }))

        const result = await sendChat(serverMessages)

        // 요청이 끝나는 시점에, 이 요청이 여전히 "최신"인지 확인
        // (중간에 다른 요청이 발생했다면 currentRequestId가 바뀌었을 것)
        if (requestId !== state.currentRequestId) {
          // 최신 요청이 아니므로 결과를 무시
          return
        }

        const uiMessages = result.map((m, index) => ({
          id: `${m.role}-${index}-${Date.now()}`,
          role: m.role,
          content: m.content,
        }))

        dispatch({
          type: 'REQUEST_SUCCESS',
          payload: { messages: uiMessages },
        })
      } catch (err: any) {
        if (controller.signal.aborted) {
          // 취소된 요청이면 CANCELLED 액션
          dispatch({ type: 'REQUEST_CANCELLED' })
        } else {
          dispatch({
            type: 'REQUEST_ERROR',
            payload: err.message ?? '알 수 없는 오류입니다.',
          })
        }
      }
    }

    run()

    // cleanup: 요청 취소
    return () => {
      controller.abort()
    }
  }, [state.status, state.currentRequestId, state.messages])

  // 렌더 부분은 이전 예제랑 거의 동일 (생략 가능)
  // ...
}

이 패턴의 장점:

  • 리듀서가 100% 순수하게 유지됩니다.
  • 네트워크 요청/취소/중복 방지 로직은 useEffect + AbortController에서 처리.
  • 최신 요청만 유효하게 받아들이기 때문에,
    사용자가 연달아 여러 번 전송 버튼을 눌러도 상태 꼬임 위험이 줄어듭니다.

14.4 타이핑 인디케이터 & 디바운스 아이디어

14.4.1 isTyping 플래그는 이미 리듀서에 존재

  • CHANGE_INPUT 액션에서 payload.length > 0이면 isTyping: true
  • 입력창이 비워지면 false

이걸 그대로 UI에 활용하면:

{state.isTyping && (
  <div className="text-xs opacity-60 px-4 pb-2">
    입력 중...
  </div>
)}

처럼 간단히 타이핑 상태를 보여줄 수 있습니다.

14.4.2 “타이핑 끝난 뒤 X초 지나면 자동으로 false” 같은 패턴

여기에 디바운스를 추가하고 싶다면, useEffect에서 딱 1초짜리 타이머를 두고 이렇게 할 수 있습니다.

// 입력값이 바뀔 때마다 "1초 뒤에 isTyping을 false로 바꾸는" 디바운스
useEffect(() => {
  if (!state.isTyping) return

  const timer = setTimeout(() => {
    dispatch({ type: 'SET_TYPING', payload: false })
  }, 1000)

  return () => clearTimeout(timer)
}, [state.input])
  • 사용자가 계속 타이핑하면 input이 계속 바뀌기 때문에 타이머가 리셋됩니다.
  • 입력이 멈춘 후 1초 지나면 isTyping: false가 됩니다.
  • 이걸 이용하면 “상대방에게 표시되는 타이핑 상태”를 좀 더 자연스럽게 제어할 수 있습니다.

15. 마무리: 두 레이어의 “리듀서 사고방식” 연결

지금까지 내용을 한 번에 정리하면:

  1. LangGraph 쪽
    •   상태는 TypedDict + Annotated로 정의
    • 필드마다 의미가 다르기 때문에,
      •   messages: add_messages로 누적
      •   retry_count: 합산 리듀서
      •   last_tool: 마지막 값 우선
      •   metadata: dict 병합
        처럼 각자 다른 리듀서(병합 규칙)를 부여
    •   그래프 내 여러 노드/브랜치가 동시에 상태를 업데이트해도,
      → 최종 전역 상태가 의미 있게 병합
  2. React 쪽
    •   useReducer는 오직 UI 상태에 집중
    •   리듀서 안에서는 IO(네트워크, 타이머) 하지 않기
    •   네트워크 요청/취소/중복 방지는 useEffect + AbortController에서 처리
    •   isTyping 같은 플래그는 리듀서에서 관리,
        디바운스/타이머는 useEffect에서
  3. 공통 관통 포인트
    •   둘 다 “(state, something) → newState”라는 리듀서 패턴을 공유
    • 하지만
      •   LangGraph는 분산/병렬 워크플로우 전역 상태 오케스트레이션
      •   React는 단일 컴포넌트(또는 뷰 계층)의 로컬 상태 머신
    •   이 차이를 이해하고 나면, “reducer”라는 단어가 나올 때
        → “아, 여기서는 어떤 스케일의 상태를, 어떤 규칙으로 병합/전이시키는지”를 먼저 보게 됩니다.