[pydantic-ai] 클라이언트 연결 해제 시 StopAsyncIteration 방지를 위한 aclosing 적용
PR 링크: pydantic/pydantic-ai#4205 상태: Merged | 변경: +118 / -71
들어가며
비동기 스트리밍에서 클라이언트가 중간에 연결을 끊으면, 내부의 비동기 제너레이터가 제대로 정리(cleanup)되지 않아 StopAsyncIteration이 예상치 못한 곳에서 전파될 수 있습니다. 이 PR은 contextlib.aclosing을 적용하고, group_by_temporal 유틸리티의 정리 로직을 개선하여 이 문제를 해결합니다.
핵심 코드 분석
1. aclosing으로 제너레이터 정리 보장
Before:
async def _stream_text_deltas() -> AsyncIterator[str]:
async with _utils.group_by_temporal(...) as group_iter:
async for items in group_iter:
yield ''.join([content for content, _ in items])
if delta:
async for text in _stream_text_deltas():
yield text
else:
deltas: list[str] = []
async for text in _stream_text_deltas():
deltas.append(text)
yield ''.join(deltas)
After:
async def _stream_text_deltas() -> AsyncGenerator[str, None]:
async with _utils.group_by_temporal(...) as group_iter:
async for items in group_iter:
yield ''.join([content for content, _ in items])
async with aclosing(_stream_text_deltas()) as deltas_iter:
if delta:
async for text in deltas_iter:
yield text
else:
deltas: list[str] = []
async for text in deltas_iter:
deltas.append(text)
yield ''.join(deltas)
aclosing 컨텍스트 매니저가 스코프를 벗어날 때 aclose()를 호출하여 제너레이터의 finally 블록이 실행됩니다. 반환 타입도 AsyncIterator에서 AsyncGenerator로 변경되어 타입 안전성이 향상되었습니다.
2. group_by_temporal 정리 로직 개선
Before:
try:
yield async_iter_groups_noop()
return
# ... 또는
try:
yield async_iter_groups()
finally: # pragma: no cover
if task:
task.cancel('Cancelling due to error in iterator')
with suppress(asyncio.CancelledError):
await task
soft_max_interval=None 경로에는 정리 로직이 없었고, 다른 경로의 정리는 no cover 주석이 붙어 있었습니다.
After:
async def _cleanup_temporal_group(task, aiterator):
if task:
task.cancel('Cancelling group_by_temporal pending task')
with suppress(asyncio.CancelledError, StopAsyncIteration):
await task
aclose = getattr(aiterator, 'aclose', None)
if aclose is not None:
await aclose()
try:
yield async_iter_groups()
finally:
await _cleanup_temporal_group(task, aiterator)
두 경로 모두 동일한 _cleanup_temporal_group 함수를 사용하며, 태스크 취소뿐 아니라 비동기 이터레이터의 aclose()도 호출합니다. StopAsyncIteration도 suppress 대상에 추가되었습니다.
왜 이게 좋은가
비동기 제너레이터는 소비자가 끝까지 순회하지 않으면 내부 cleanup 코드가 실행되지 않는 것이 Python의 알려진 함정입니다. 웹 서버에서 SSE(Server-Sent Events)나 WebSocket 스트리밍 중 클라이언트가 연결을 끊는 것은 정상적인 시나리오이므로, 리소스 정리가 반드시 보장되어야 합니다. aclosing은 이를 위한 표준 패턴입니다.
정리
| 항목 | 내용 |
|---|---|
| 문제 | 클라이언트 중단 시 비동기 제너레이터 미정리 |
| 해결 1 | aclosing으로 _stream_text_deltas 감싸기 |
| 해결 2 | _cleanup_temporal_group으로 통합 정리 |
참고 자료
⚠️ 알림: 이 분석은 AI가 실제 코드 diff를 기반으로 작성했습니다.
관련 포스트
PR Analysis 의 다른글
- 이전글 [pydantic-ai] xAI 프로바이더에서 gRPC 이벤트 루프 불일치 버그 수정
- 현재글 : [pydantic-ai] 클라이언트 연결 해제 시 StopAsyncIteration 방지를 위한 aclosing 적용
- 다음글 [pytorch] Inductor: CycleGAN CPU 벤치마크 expected accuracy 상태 업데이트
댓글