본문으로 건너뛰기

[vLLM] OutputProcessor: 출력 후처리 및 디토크나이징

들어가며

LLM 엔진이 토큰 ID를 출력하면 끝이 아니다. 토큰을 텍스트로 변환(디토크나이징)하고, 스트리밍 출력을 관리하고, logprobs를 계산하고, 완료된 요청을 정리하는 후처리 단계가 필요하다. vLLM의 OutputProcessor(vllm/v1/engine/output_processor.py)가 이 모든 것을 담당한다.

핵심 구조/코드 분석

RequestOutputCollector: 비동기 출력 수집

스트리밍 출력에서 프로듀서(엔진)와 컨슈머(API 핸들러) 사이의 버퍼 역할을 한다.

class RequestOutputCollector:
    def __init__(self, output_kind: RequestOutputKind, request_id: str):
        self.aggregate = output_kind == RequestOutputKind.DELTA
        self.output: RequestOutput | PoolingRequestOutput | Exception | None = None
        self.ready = asyncio.Event()

    def put(self, output: RequestOutput | PoolingRequestOutput | Exception) -> None:
        if self.output is None or isinstance(output, Exception):
            self.output = output
            self.ready.set()
        elif isinstance(self.output, RequestOutput) and isinstance(output, RequestOutput):
            self.output.add(output, aggregate=self.aggregate)

DELTA 모드에서 프로듀서가 컨슈머보다 빠르면, 중간 출력을 병합(aggregate)한다. 이는 스트리밍에서 불필요한 중간 응답 전송을 방지한다.

RequestState: 요청별 상태 관리

각 요청의 전체 생명주기를 추적하는 상태 객체이다.

class RequestState:
    def __init__(self, request_id, external_req_id, parent_req,
                 request_index, lora_request, output_kind, prompt,
                 prompt_token_ids, prompt_embeds, logprobs_processor,
                 detokenizer, max_tokens_param, arrival_time, queue, ...):
        self.is_prefilling = True
        self.num_cached_tokens = 0
        self.stream_interval = stream_interval
        self.sent_tokens_offset = 0

is_prefilling 플래그로 프리필 단계와 디코딩 단계를 구분하고, sent_tokens_offset으로 스트리밍에서 이미 전송한 토큰 위치를 추적한다.

make_request_output: 출력 생성 핵심 로직

def make_request_output(self, new_token_ids, pooling_output,
                        finish_reason, stop_reason, ...):
    finished = finish_reason is not None
    final_only = self.output_kind == RequestOutputKind.FINAL_ONLY

    if not finished and final_only:
        return None  # FINAL_ONLY 모드에서는 완료 시에만 출력

    if self.stream_interval > 1:
        if not (finished or self.sent_tokens_offset == 0
                or self.detokenizer.num_output_tokens() - self.sent_tokens_offset
                >= self.stream_interval):
            return None

출력 모드에 따른 3가지 동작:

  • FINAL_ONLY: 생성 완료 시에만 결과 반환
  • DELTA: 매 토큰(또는 stream_interval마다) 증분 결과 반환
  • CUMULATIVE: 매번 전체 결과 반환

stream_interval은 스트리밍 응답 빈도를 제어한다. 매 토큰마다 응답하면 오버헤드가 클 수 있으므로, 여러 토큰을 모아 전송할 수 있다.

OutputProcessor: 전체 조율

class OutputProcessor:
    def __init__(self, tokenizer, *, log_stats, stream_interval=1, tracing_enabled=False):
        self.request_states: dict[str, RequestState] = {}
        self.parent_requests: dict[str, ParentRequest] = {}
        self.external_req_ids: defaultdict[str, list[str]] = defaultdict(list)
        self.lora_states = LoRARequestStates(log_stats)

요청 상태를 내부 ID와 외부 ID 양방향으로 매핑한다. Parallel sampling(n > 1)에서 하나의 외부 요청이 여러 내부 요청을 생성하므로, external_req_ids로 이 관계를 추적한다.

요청 중단 처리

def abort_requests(self, request_ids, internal):
    for request_id in request_ids:
        if internal:
            internal_req_ids.append(request_id)
        elif internal_ids := self.external_req_ids.pop(request_id, []):
            internal_req_ids.extend(internal_ids)

    for request_id in internal_req_ids:
        req_state = self.request_states.pop(request_id, None)
        if req_state is not None:
            self.lora_states.request_finished(request_id, req_state.lora_name)
            if req_state.queue is not None:
                request_output = req_state.make_request_output(
                    new_token_ids=[],
                    finish_reason=FinishReason.ABORT, ...)
                req_state.queue.put(request_output)

중단된 요청에도 FinishReason.ABORT와 함께 최종 출력을 생성하여 컨슈머에게 전달한다. 이를 통해 클라이언트가 무한 대기에 빠지는 것을 방지한다.

Streaming Input 지원

def apply_streaming_update(self, update: StreamingUpdate) -> None:
    self.streaming_input = not update.final
    if update.prompt:
        self.prompt = (
            (self.prompt + update.prompt) if self.prompt else update.prompt
        )
    if self.prompt_token_ids:
        self.prompt_token_ids.extend(update.prompt_token_ids or ())
    self.is_prefilling = True

스트리밍 입력 기능은 프롬프트를 청크 단위로 전송하면서 동시에 생성을 시작할 수 있게 한다. 새로운 입력 청크가 도착하면 프리필 상태로 되돌아간다.

왜 이 설계인가

  1. 프로듀서-컨슈머 디커플링: RequestOutputCollector의 asyncio.Event 기반 설계로, 엔진과 API 핸들러의 속도 차이를 자연스럽게 흡수한다. 중간 결과 병합으로 불필요한 전송을 줄인다.

  2. Parallel Sampling 지원: n > 1일 때 ParentRequest가 여러 자식 요청의 출력을 조율한다. 모든 자식의 완료를 추적하고 병합된 결과를 생성한다.

  3. ABORT 시 정상 종료: 요청 중단도 정상적인 생명주기 이벤트로 처리한다. 클라이언트가 연결을 끊어도 리소스 누수나 데드락이 발생하지 않는다.

  4. stream_interval: 매 토큰 스트리밍은 SSE/WebSocket 오버헤드가 크다. 간격을 두고 배치 전송하면 처리량이 개선된다.

정리

OutputProcessor는 vLLM 엔진의 출구이다. 토큰 ID를 텍스트로, 내부 상태를 사용자 응답으로 변환하며, 스트리밍, parallel sampling, 요청 중단 등 복잡한 시나리오를 안정적으로 처리한다. InputProcessor와 함께 엔진의 양 끝을 구성하는 핵심 컴포넌트이다.

댓글

관련 포스트

vLLM 의 다른글