[pydantic-ai] 스트리밍에서 중복 도구 호출 방지 및 결과 캐싱 추가
PR 링크: pydantic/pydantic-ai#3848 상태: Merged | 변경: +263 / -39
들어가며
pydantic-ai의 스트리밍 API에서 stream_output(), stream_text(), get_output()을 여러 번 호출하면 output validator나 output function이 매번 다시 실행되는 문제가 있었습니다. 특히 output function에 부작용(side effect)이 있는 경우, 결과를 두 번째로 가져올 때 중복 실행이 발생했습니다. 이 PR은 AgentStream에 결과 캐시를 도입하여 이를 해결합니다.
핵심 코드 분석
_cached_output 필드 추가
Before:
@dataclass
class AgentStream(Generic[AgentDepsT, OutputDataT]):
_agent_stream_iterator: AsyncIterator[ModelResponseStreamEvent] | None = field(default=None, init=False)
_initial_run_ctx_usage: RunUsage = field(init=False)
After:
@dataclass
class AgentStream(Generic[AgentDepsT, OutputDataT]):
_agent_stream_iterator: AsyncIterator[ModelResponseStreamEvent] | None = field(default=None, init=False)
_initial_run_ctx_usage: RunUsage = field(init=False)
_cached_output: OutputDataT | None = field(default=None, init=False)
stream_output에서 캐시 활용
Before:
async def stream_output(self, *, debounce_by=0.1):
# ... 스트리밍 ...
response = self.response
if self._raw_stream_response.final_result_event is None or (
last_response and response.parts == last_response.parts
):
return
yield await self.validate_response_output(response)
After:
async def stream_output(self, *, debounce_by=0.1):
if self._cached_output is not None:
yield deepcopy(self._cached_output)
return
# ... 스트리밍 ...
response = self.response
if self._raw_stream_response.final_result_event is not None and (
not last_response or response.parts != last_response.parts
):
self._cached_output = await self.validate_response_output(response, allow_partial=False)
yield deepcopy(self._cached_output)
첫 번째 호출 시 최종 검증 결과를 _cached_output에 저장하고, 이후 호출에서는 deepcopy로 반환합니다. deepcopy를 사용하여 호출자가 반환값을 수정해도 캐시가 오염되지 않습니다.
get_output에서도 캐시 활용
async def get_output(self) -> OutputDataT:
if self._cached_output is not None:
return deepcopy(self._cached_output)
async for _ in self:
pass
self._cached_output = await self.validate_response_output(self.response)
return deepcopy(self._cached_output)
왜 이게 좋은가
Output function은 사용자 정의 로직(데이터 변환, 외부 서비스 호출 등)을 포함할 수 있으며, 멱등하지 않을 수 있습니다. 예를 들어 "결과를 데이터베이스에 저장하는" output function이 두 번 호출되면 중복 데이터가 생성됩니다. 캐싱으로 이를 방지하면서, deepcopy로 캐시 안전성도 보장합니다. stream_text(delta=True) 후 재호출 시에는 캐시된 전체 텍스트를 반환하여 일관된 동작을 제공합니다.
정리
| 항목 | 내용 |
|---|---|
| 문제 | 스트리밍 API 반복 호출 시 output function 중복 실행 |
| 해결 | _cached_output 필드로 결과 캐싱 |
| 안전장치 | deepcopy로 캐시 변조 방지 |
참고 자료
⚠️ 알림: 이 분석은 AI가 실제 코드 diff를 기반으로 작성했습니다.
관련 포스트
PR Analysis 의 다른글
- 이전글 [pytest] actions/cache v4에서 v5로 업그레이드
- 현재글 : [pydantic-ai] 스트리밍에서 중복 도구 호출 방지 및 결과 캐싱 추가
- 다음글 [pydantic-ai] 테스트 스위트에서 불필요한 asyncio.sleep 제거
댓글