본문으로 건너뛰기

[pydantic-ai] 클라이언트 연결 해제 시 StopAsyncIteration 방지를 위한 aclosing 적용

PR 링크: pydantic/pydantic-ai#4205 상태: Merged | 변경: +118 / -71

들어가며

비동기 스트리밍에서 클라이언트가 중간에 연결을 끊으면, 내부의 비동기 제너레이터가 제대로 정리(cleanup)되지 않아 StopAsyncIteration이 예상치 못한 곳에서 전파될 수 있습니다. 이 PR은 contextlib.aclosing을 적용하고, group_by_temporal 유틸리티의 정리 로직을 개선하여 이 문제를 해결합니다.

핵심 코드 분석

1. aclosing으로 제너레이터 정리 보장

Before:

async def _stream_text_deltas() -> AsyncIterator[str]:
    async with _utils.group_by_temporal(...) as group_iter:
        async for items in group_iter:
            yield ''.join([content for content, _ in items])

if delta:
    async for text in _stream_text_deltas():
        yield text
else:
    deltas: list[str] = []
    async for text in _stream_text_deltas():
        deltas.append(text)
        yield ''.join(deltas)

After:

async def _stream_text_deltas() -> AsyncGenerator[str, None]:
    async with _utils.group_by_temporal(...) as group_iter:
        async for items in group_iter:
            yield ''.join([content for content, _ in items])

async with aclosing(_stream_text_deltas()) as deltas_iter:
    if delta:
        async for text in deltas_iter:
            yield text
    else:
        deltas: list[str] = []
        async for text in deltas_iter:
            deltas.append(text)
            yield ''.join(deltas)

aclosing 컨텍스트 매니저가 스코프를 벗어날 때 aclose()를 호출하여 제너레이터의 finally 블록이 실행됩니다. 반환 타입도 AsyncIterator에서 AsyncGenerator로 변경되어 타입 안전성이 향상되었습니다.

2. group_by_temporal 정리 로직 개선

Before:

try:
    yield async_iter_groups_noop()
    return
# ... 또는
try:
    yield async_iter_groups()
finally:  # pragma: no cover
    if task:
        task.cancel('Cancelling due to error in iterator')
        with suppress(asyncio.CancelledError):
            await task

soft_max_interval=None 경로에는 정리 로직이 없었고, 다른 경로의 정리는 no cover 주석이 붙어 있었습니다.

After:

async def _cleanup_temporal_group(task, aiterator):
    if task:
        task.cancel('Cancelling group_by_temporal pending task')
        with suppress(asyncio.CancelledError, StopAsyncIteration):
            await task
    aclose = getattr(aiterator, 'aclose', None)
    if aclose is not None:
        await aclose()

try:
    yield async_iter_groups()
finally:
    await _cleanup_temporal_group(task, aiterator)

두 경로 모두 동일한 _cleanup_temporal_group 함수를 사용하며, 태스크 취소뿐 아니라 비동기 이터레이터의 aclose()도 호출합니다. StopAsyncIteration도 suppress 대상에 추가되었습니다.

왜 이게 좋은가

비동기 제너레이터는 소비자가 끝까지 순회하지 않으면 내부 cleanup 코드가 실행되지 않는 것이 Python의 알려진 함정입니다. 웹 서버에서 SSE(Server-Sent Events)나 WebSocket 스트리밍 중 클라이언트가 연결을 끊는 것은 정상적인 시나리오이므로, 리소스 정리가 반드시 보장되어야 합니다. aclosing은 이를 위한 표준 패턴입니다.

정리

항목 내용
문제 클라이언트 중단 시 비동기 제너레이터 미정리
해결 1 aclosing으로 _stream_text_deltas 감싸기
해결 2 _cleanup_temporal_group으로 통합 정리

참고 자료

⚠️ 알림: 이 분석은 AI가 실제 코드 diff를 기반으로 작성했습니다.

댓글

관련 포스트

PR Analysis 의 다른글