본문으로 건너뛰기

[pydantic-ai] 스트리밍에서 중복 도구 호출 방지 및 결과 캐싱 추가

PR 링크: pydantic/pydantic-ai#3848 상태: Merged | 변경: +263 / -39

들어가며

pydantic-ai의 스트리밍 API에서 stream_output(), stream_text(), get_output()을 여러 번 호출하면 output validator나 output function이 매번 다시 실행되는 문제가 있었습니다. 특히 output function에 부작용(side effect)이 있는 경우, 결과를 두 번째로 가져올 때 중복 실행이 발생했습니다. 이 PR은 AgentStream에 결과 캐시를 도입하여 이를 해결합니다.

핵심 코드 분석

_cached_output 필드 추가

Before:

@dataclass
class AgentStream(Generic[AgentDepsT, OutputDataT]):
    _agent_stream_iterator: AsyncIterator[ModelResponseStreamEvent] | None = field(default=None, init=False)
    _initial_run_ctx_usage: RunUsage = field(init=False)

After:

@dataclass
class AgentStream(Generic[AgentDepsT, OutputDataT]):
    _agent_stream_iterator: AsyncIterator[ModelResponseStreamEvent] | None = field(default=None, init=False)
    _initial_run_ctx_usage: RunUsage = field(init=False)
    _cached_output: OutputDataT | None = field(default=None, init=False)

stream_output에서 캐시 활용

Before:

async def stream_output(self, *, debounce_by=0.1):
    # ... 스트리밍 ...
    response = self.response
    if self._raw_stream_response.final_result_event is None or (
        last_response and response.parts == last_response.parts
    ):
        return
    yield await self.validate_response_output(response)

After:

async def stream_output(self, *, debounce_by=0.1):
    if self._cached_output is not None:
        yield deepcopy(self._cached_output)
        return

    # ... 스트리밍 ...
    response = self.response
    if self._raw_stream_response.final_result_event is not None and (
        not last_response or response.parts != last_response.parts
    ):
        self._cached_output = await self.validate_response_output(response, allow_partial=False)
        yield deepcopy(self._cached_output)

첫 번째 호출 시 최종 검증 결과를 _cached_output에 저장하고, 이후 호출에서는 deepcopy로 반환합니다. deepcopy를 사용하여 호출자가 반환값을 수정해도 캐시가 오염되지 않습니다.

get_output에서도 캐시 활용

async def get_output(self) -> OutputDataT:
    if self._cached_output is not None:
        return deepcopy(self._cached_output)

    async for _ in self:
        pass

    self._cached_output = await self.validate_response_output(self.response)
    return deepcopy(self._cached_output)

왜 이게 좋은가

Output function은 사용자 정의 로직(데이터 변환, 외부 서비스 호출 등)을 포함할 수 있으며, 멱등하지 않을 수 있습니다. 예를 들어 "결과를 데이터베이스에 저장하는" output function이 두 번 호출되면 중복 데이터가 생성됩니다. 캐싱으로 이를 방지하면서, deepcopy로 캐시 안전성도 보장합니다. stream_text(delta=True) 후 재호출 시에는 캐시된 전체 텍스트를 반환하여 일관된 동작을 제공합니다.

정리

항목 내용
문제 스트리밍 API 반복 호출 시 output function 중복 실행
해결 _cached_output 필드로 결과 캐싱
안전장치 deepcopy로 캐시 변조 방지

참고 자료

⚠️ 알림: 이 분석은 AI가 실제 코드 diff를 기반으로 작성했습니다.

댓글

관련 포스트

PR Analysis 의 다른글