본문으로 건너뛰기

[SGLang] TokenizerManager: 비동기 토큰화 파이프라인의 설계와 구현

들어가며

LLM serving에서 tokenization은 단순한 전처리가 아니다. 텍스트를 토큰으로 변환하는 것은 물론, 요청 정규화, 입력 검증, 멀티모달 데이터 처리, sampling parameter 파싱, LoRA 어댑터 관리까지 포함하는 복잡한 파이프라인이다. SGLang의 TokenizerManager는 이 모든 작업을 비동기로 처리하면서, ZMQ IPC를 통해 Scheduler에 토큰화된 요청을 전달하고, DetokenizerManager로부터 결과를 수신하여 클라이언트에게 응답한다.

이 글에서는 python/sglang/srt/managers/tokenizer_manager.py를 중심으로 TokenizerManager의 설계를 분석한다.

전체 구조

TokenizerManager가 처리하는 요청-응답 파이프라인의 전체 흐름은 다음과 같다.

  Client (HTTP/gRPC)
       │
       ▼
┌──────────────────────────────────────────────────────┐
│              TokenizerManager (Main Process)          │
│                                                      │
│  1. normalize_batch_and_arguments()                  │
│  2. _tokenize_one_request() / _batch_tokenize()      │
│  3. _create_tokenized_object()                       │
│  4. _send_one_request()  ──── ZMQ PUSH ────┐         │
│                                            │         │
│  7. _handle_batch_output() ◄── ZMQ PULL ───┼──┐      │
│  8. _wait_one_response()  ──▶ yield        │  │      │
│                                            │  │      │
└────────────────────────────────────────────┼──┼──────┘
                                             │  │
                         ZMQ IPC             │  │
                                             ▼  │
                                     ┌───────────────┐
                                     │   Scheduler    │
                                     │  (subprocess)  │
                                     └───────┬───────┘
                                             │
                                             ▼
                                  ┌────────────────────┐
                                  │ DetokenizerManager  │
                                  │   (subprocess)      │
                                  └────────────────────┘

핵심은 TokenizerManager가 메인 프로세스에서 asyncio 이벤트 루프를 실행한다는 점이다. HTTP 서버와 같은 프로세스에서 동작하므로, 요청 수신부터 응답 반환까지 비동기적으로 처리된다.

핵심 코드 분석

클래스 구조와 초기화

TokenizerManager는 두 개의 mixin을 상속받는다. TokenizerCommunicatorMixin은 DetokenizerManager로부터 결과를 수신하는 핸들 루프를, TokenizerManagerScoreMixin은 scoring 관련 로직을 제공한다.

class TokenizerManager(TokenizerCommunicatorMixin, TokenizerManagerScoreMixin):
    """TokenizerManager is a process that tokenizes the text."""

    def __init__(
        self,
        server_args: ServerArgs,
        port_args: PortArgs,
    ):
        # Parse args
        self.server_args = server_args
        set_global_server_args_for_tokenizer(server_args)

        # Init model config
        self.init_model_config()

        # Initialize tokenizer and multimodalprocessor
        self.init_tokenizer_and_processor()

        # Init inter-process communication
        self.init_ipc_channels(port_args)

        # Init running status
        self.init_running_status()

초기화는 7단계로 나뉜다. 모델 설정, tokenizer/processor 초기화, IPC 채널 설정, 실행 상태 초기화, 로깅, weight update, LoRA, disaggregation 순으로 진행된다. 각 단계는 독립적인 메서드로 분리되어 있어 유지보수가 용이하다.

ZMQ IPC 채널 설정

TokenizerManager는 두 개의 ZMQ 소켓을 관리한다. DetokenizerManager로부터 결과를 받는 PULL 소켓과, Scheduler에 요청을 보내는 PUSH 소켓이다.

def init_ipc_channels(self, port_args: PortArgs):
    context = zmq.asyncio.Context(2)
    self.recv_from_detokenizer = get_zmq_socket(
        context, zmq.PULL, port_args.tokenizer_ipc_name, True
    )
    if self.server_args.tokenizer_worker_num == 1:
        self.send_to_scheduler = get_zmq_socket(
            context, zmq.PUSH, port_args.scheduler_input_ipc_name, True
        )
    else:
        from sglang.srt.managers.multi_tokenizer_mixin import SenderWrapper
        send_to_scheduler = get_zmq_socket(
            context, zmq.PUSH, port_args.tokenizer_worker_ipc_name, False
        )
        self.send_to_scheduler = SenderWrapper(port_args, send_to_scheduler)

zmq.asyncio.Context를 사용하여 asyncio 이벤트 루프와 호환되는 비동기 ZMQ 소켓을 생성한다. multi-tokenizer 모드(tokenizer_worker_num > 1)에서는 SenderWrapper로 감싸서 각 요청에 응답 라우팅을 위한 IPC 이름을 자동 첨부한다.

요청 처리 파이프라인: generate_request

generate_request는 TokenizerManager의 핵심 메서드다. 클라이언트 요청을 받아 토큰화하고, Scheduler에 전달한 뒤 응답을 기다린다.

async def generate_request(
    self,
    obj: Union[GenerateReqInput, EmbeddingReqInput],
    request: Optional[fastapi.Request] = None,
):
    self.auto_create_handle_loop()

    # Normalize the request
    obj.normalize_batch_and_arguments()
    self._set_default_priority(obj)
    self._validate_rid_not_in_flight(obj)

    # ...

    async with self.model_update_lock.reader_lock:
        await self._validate_and_resolve_lora(obj)

        if obj.is_single:
            tokenized_obj = await self._tokenize_one_request(obj)
            state = self.rid_to_state[obj.rid]
            self._send_one_request(tokenized_obj)
            async for response in self._wait_one_response(obj, state, request):
                yield response
        else:
            async for response in self._handle_batch_request(obj, request):
                yield response

이 메서드는 async generator다. 스트리밍 모드에서는 토큰이 생성될 때마다 yield하고, 비스트리밍 모드에서는 완료 시 한 번 yield한다. model_update_lock의 reader lock을 사용하여 weight update 중에는 새 요청을 차단한다.

토큰화 전략: 단일 vs 배치

TokenizerManager는 입력 유형에 따라 세 가지 토큰화 전략을 제공한다.

async def _tokenize_texts(
    self, texts: Union[str, List[str]], is_cross_encoder: bool = False
) -> Union[
    Tuple[List[int], Optional[List[int]]],
    Tuple[List[List[int]], Optional[List[List[int]]]],
]:
    input_format = self._detect_input_format(texts, is_cross_encoder)
    tokenizer_input = self._prepare_tokenizer_input(texts, input_format)

    use_async_tokenizer = (
        self.async_dynamic_batch_tokenizer is not None
        and input_format == InputFormat.SINGLE_STRING
    )

    if use_async_tokenizer:
        result = await self.async_dynamic_batch_tokenizer.encode(
            tokenizer_input[0], **tokenizer_kwargs
        )
        input_ids = [result["input_ids"]]
    else:
        encoded = self.tokenizer(tokenizer_input, **tokenizer_kwargs)
        input_ids = encoded["input_ids"]

InputFormat enum으로 입력 유형을 구분한다. SINGLE_STRING일 때는 AsyncDynamicbatchTokenizer를 사용하여 여러 요청의 토큰화를 동적으로 배칭할 수 있고, BATCH_STRINGSCROSS_ENCODER_PAIRS일 때는 일반 tokenizer를 사용한다.

요청 상태 관리: ReqState

각 요청의 상태는 ReqState dataclass로 관리된다. 이 객체는 비동기 이벤트 기반의 요청-응답 매칭을 담당한다.

@dataclasses.dataclass
class ReqState:
    """Store the state a request."""

    out_list: List[Dict[Any, Any]]
    finished: bool
    event: asyncio.Event
    obj: Union[GenerateReqInput, EmbeddingReqInput]

    # For performance metrics
    time_stats: APIServerReqTimeStats
    last_completion_tokens: int = 1

    # For streaming output
    last_output_offset: int = 0
    last_text_offset: int = 0

    # For incremental state update.
    text: str = ""
    output_ids: List[int] = dataclasses.field(default_factory=list)

eventasyncio.Event로, DetokenizerManager로부터 결과가 도착하면 set()되고, _wait_one_response에서 wait()한다. out_list에 결과가 누적되므로 스트리밍 중간 결과도 손실 없이 전달된다. last_output_offsetlast_text_offset은 증분 스트리밍 출력을 위한 오프셋으로, 이전에 전송한 부분을 추적하여 중복 없이 delta만 전송한다.

결과 수신과 디스패칭

DetokenizerManager로부터 받은 결과는 TypeBasedDispatcher를 통해 유형별로 라우팅된다.

def init_request_dispatcher(self):
    self._result_dispatcher = TypeBasedDispatcher(
        [
            (
                (BatchStrOutput, BatchEmbeddingOutput, BatchTokenIDOutput),
                self._handle_batch_output,
            ),
            (AbortReq, self._handle_abort_req),
            (OpenSessionReqOutput, self._handle_open_session_req_output),
            (UpdateWeightFromDiskReqOutput,
             self._handle_update_weights_from_disk_req_output),
            (FreezeGCReq, lambda x: None),
            (HealthCheckOutput, lambda x: None),
            (ActiveRanksOutput, self.update_active_ranks),
        ]
    )

TypeBasedDispatcher는 수신 객체의 타입에 따라 적절한 핸들러를 호출하는 패턴이다. 가장 빈번한 BatchStrOutput_handle_batch_output으로 라우팅되어, 각 요청의 ReqState.eventset()하고, 대기 중인 _wait_one_response 코루틴을 깨운다.

왜 이 설계인가

비동기 파이프라인: TokenizerManager는 메인 프로세스에서 asyncio로 동작한다. 이는 CPU-bound인 토큰화 작업이 GPU-bound인 inference 작업을 blocking하지 않도록 보장한다. ZMQ IPC의 zero-copy 메시징으로 프로세스 간 데이터 전달 오버헤드도 최소화한다.

유연한 토큰화 전략: 단일 요청에는 AsyncDynamicbatchTokenizer로 여러 요청을 동적 배칭하고, 배치 요청에는 HuggingFace tokenizer의 배치 인코딩을 직접 사용한다. cross-encoder 같은 특수 모델도 InputFormat 구분으로 자연스럽게 지원한다.

안전한 동시성: model_update_lock(RWLock)으로 weight update와 inference를 안전하게 분리하고, lora_update_lock으로 LoRA 로딩과 inference를 overlap할 수 있게 한다. 요청 ID 충돌 검증(_validate_rid_not_in_flight)도 race condition을 방지한다.

증분 스트리밍: ReqState에 delta 오프셋을 관리하여 스트리밍 시 전체 텍스트가 아닌 새로 생성된 부분만 전송한다. 다수의 스트리밍 청크가 쌓이면 병합(_merge_incremental_stream_meta_info)하여 네트워크 오버헤드를 줄인다.

관련 포스트

참고

댓글

관련 포스트

SGLang 의 다른글