본문으로 건너뛰기

[pydantic-ai] DBOSAgent에서 병렬 도구 실행 지원 및 실행 모드 API 추가

PR 링크: pydantic/pydantic-ai#4077 상태: Merged | 변경: +261 / -483

들어가며

Durable execution 환경(DBOS)에서 에이전트 도구 실행은 결정론적 리플레이가 보장되어야 합니다. 기존에는 이를 위해 도구를 순차 실행했지만, 독립적인 도구 호출은 병렬로 실행해도 리플레이 안전성을 유지할 수 있습니다. 이 PR은 parallel_execution_mode API를 새로 도입하고 DBOSAgent에서 parallel_ordered_events 모드를 기본값으로 설정합니다.

핵심 코드 분석

새로운 ParallelExecutionMode 타입

Before:

_sequential_tool_calls_ctx_var: ContextVar[bool] = ContextVar('sequential_tool_calls', default=False)

def should_call_sequentially(self, calls: list[ToolCallPart]) -> bool:
    return _sequential_tool_calls_ctx_var.get() or any(
        tool_def.sequential for call in calls if (tool_def := self.get_tool_def(call.tool_name))
    )

After:

ParallelExecutionMode = Literal['parallel', 'sequential', 'parallel_ordered_events']

_parallel_execution_mode_ctx_var: ContextVar[ParallelExecutionMode] = ContextVar(
    'parallel_execution_mode', default='parallel'
)

def get_parallel_execution_mode(self, calls: list[ToolCallPart]) -> ParallelExecutionMode:
    if any(tool_def.sequential for call in calls if (tool_def := self.get_tool_def(call.tool_name))):
        return 'sequential'
    return _parallel_execution_mode_ctx_var.get()

3가지 실행 모드를 명시적으로 정의합니다: parallel(기존 기본값), sequential(순차), parallel_ordered_events(병렬 실행 + 순서 보장 이벤트).

_agent_graph에서 모드별 분기

parallel_execution_mode = tool_manager.get_parallel_execution_mode(tool_calls)
if parallel_execution_mode == 'sequential':
    for index, call in enumerate(tool_calls):
        # 순차 실행
elif parallel_execution_mode == 'parallel_ordered_events':
    # 모든 태스크 완료 대기 후 순서대로 이벤트 발행
    await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
    for index, task in enumerate(tasks):
        if event := await handle_call_or_result(coro_or_task=task, index=index):
            yield event
else:
    # 기존 방식: 완료되는 순서대로 이벤트 발행
    pending = set(tasks)
    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)

parallel_ordered_events 모드는 ALL_COMPLETED로 전체 대기 후 원래 순서로 이벤트를 yield합니다. 실행은 병렬이지만 이벤트 순서가 결정론적이므로 DBOS 리플레이에 안전합니다.

기존 API 하위 호환

@classmethod
@contextmanager
@deprecated('Use `parallel_execution_mode("sequential")` instead.')
def sequential_tool_calls(cls) -> Iterator[None]:
    with cls.parallel_execution_mode('sequential'):
        yield

기존 sequential_tool_calls()는 deprecated로 마킹하되 동작은 유지합니다.

왜 이게 좋은가

이 설계는 성능(병렬 실행)과 안정성(결정론적 리플레이)을 동시에 달성합니다. parallel_ordered_events는 모든 도구가 완료될 때까지 기다린 후 원래 순서로 결과를 내보내므로, 리플레이 시 동일한 이벤트 순서가 보장됩니다. Boolean 플래그 대신 Literal union으로 모드를 표현하여 확장성과 가독성도 개선했습니다.

정리

모드 실행 이벤트 순서 사용처
parallel 병렬 완료 순 기본값
sequential 순차 호출 순 순서 의존 도구
parallel_ordered_events 병렬 호출 순 DBOS (기본값)

참고 자료

⚠️ 알림: 이 분석은 AI가 실제 코드 diff를 기반으로 작성했습니다.

댓글

관련 포스트

PR Analysis 의 다른글