본문으로 건너뛰기

[vLLM] AsyncLLM: 비동기 엔진의 최상위 객체

들어가며

vLLM의 서빙 파이프라인에서 가장 바깥쪽에 위치하는 객체가 AsyncLLM이다. OpenAI-호환 API 서버가 요청을 받으면 최종적으로 AsyncLLM.generate()를 호출하게 되며, 이 클래스가 입력 전처리, EngineCore(별도 프로세스)와의 IPC 통신, 출력 후처리를 모두 오케스트레이션한다.

소스 경로: vllm/v1/engine/async_llm.py

핵심 구조/코드 분석

클래스 선언과 초기화

AsyncLLMEngineClient 프로토콜을 구현한다.

class AsyncLLM(EngineClient):
    def __init__(
        self,
        vllm_config: VllmConfig,
        executor_class: type[Executor],
        log_stats: bool,
        ...
    ) -> None:
        self.renderer = renderer_from_config(self.vllm_config)
        self.input_processor = InputProcessor(self.vllm_config, renderer)
        self.output_processor = OutputProcessor(
            renderer.tokenizer,
            log_stats=self.log_stats,
            ...
        )
        # EngineCore를 별도 프로세스로 시작
        self.engine_core = EngineCoreClient.make_async_mp_client(
            vllm_config=vllm_config,
            executor_class=executor_class,
            ...
        )

핵심 구성요소 세 가지가 여기서 생성된다:

  • InputProcessor: 사용자 프롬프트를 EngineCoreRequest로 변환
  • OutputProcessor: EngineCore 출력을 RequestOutput으로 변환 (디토크나이저 포함)
  • EngineCoreClient: 멀티프로세스 IPC 클라이언트 (ZMQ 기반)

generate() - 비동기 생성 루프

async def generate(
    self,
    prompt: PromptType | ...,
    sampling_params: SamplingParams,
    request_id: str,
    ...
) -> AsyncGenerator[RequestOutput, None]:
    q = await self.add_request(request_id, prompt, sampling_params, ...)

    finished = False
    while not finished:
        out = q.get_nowait() or await q.get()
        assert isinstance(out, RequestOutput)
        finished = out.finished
        if out is not STREAM_FINISHED:
            yield out

generate()는 AsyncGenerator이다. 요청을 추가하면 RequestOutputCollector 큐를 받고, 백그라운드 output_handler 태스크가 EngineCore로부터 결과를 받아 큐에 넣는다. 호출자는 이 큐에서 yield를 통해 스트리밍 토큰을 하나씩 가져간다.

add_request() - 요청 등록

async def add_request(self, request_id, prompt, params, ...) -> RequestOutputCollector:
    request = self.input_processor.process_inputs(
        request_id, prompt, params,
        supported_tasks=await self.get_supported_tasks(), ...
    )
    # OutputProcessor에 등록 (이 프로세스)
    # EngineCore에 등록 (별도 프로세스)
    await self.engine_core.add_request_async(request)

n > 1인 경우(같은 프롬프트에서 여러 샘플 생성) ParentRequest를 생성하고 자식 요청으로 fan-out한다. 이는 OpenAI API의 n 파라미터를 지원하기 위함이다.

스트리밍 입력 지원

async def _add_streaming_input_request(self, request_id, input_stream, ...):
    async for input_chunk in input_stream:
        req = self.input_processor.process_inputs(
            request_id=internal_req_id, prompt=input_chunk.prompt, ...
        )
        await self._add_request(req, prompt_text, None, 0, queue)

토큰이 점진적으로 도착하는 스트리밍 입력도 지원한다. AsyncGenerator로 받은 입력 스트림을 순회하면서 청크 단위로 요청을 업데이트한다.

왜 이 설계인가

  1. 프로세스 분리: EngineCore(스케줄러 + GPU 실행)는 별도 프로세스에서 돌아간다. Python GIL 영향 없이 API 서버가 수천 개의 동시 연결을 처리할 수 있다. ZMQ 기반 IPC로 통신한다.

  2. 큐 기반 스트리밍: RequestOutputCollector가 비동기 큐 역할을 하면서 get_nowait()로 busy-wait 없이 출력을 전달한다. 이 패턴은 서버 레이턴시를 줄이는 핵심이다.

  3. InputProcessor/OutputProcessor 분리: 토크나이징과 디토크나이징이 EngineCore와 같은 프로세스에 있으면 GPU 실행을 블로킹할 수 있다. 이를 AsyncLLM 프로세스에서 처리하여 파이프라인 병목을 제거한다.

참고

댓글

관련 포스트

vLLM 의 다른글