[SGLang] TokenizerManager: 비동기 토큰화 파이프라인의 설계와 구현
들어가며
LLM serving에서 tokenization은 단순한 전처리가 아니다. 텍스트를 토큰으로 변환하는 것은 물론, 요청 정규화, 입력 검증, 멀티모달 데이터 처리, sampling parameter 파싱, LoRA 어댑터 관리까지 포함하는 복잡한 파이프라인이다. SGLang의 TokenizerManager는 이 모든 작업을 비동기로 처리하면서, ZMQ IPC를 통해 Scheduler에 토큰화된 요청을 전달하고, DetokenizerManager로부터 결과를 수신하여 클라이언트에게 응답한다.
이 글에서는 python/sglang/srt/managers/tokenizer_manager.py를 중심으로 TokenizerManager의 설계를 분석한다.
전체 구조
TokenizerManager가 처리하는 요청-응답 파이프라인의 전체 흐름은 다음과 같다.
Client (HTTP/gRPC)
│
▼
┌──────────────────────────────────────────────────────┐
│ TokenizerManager (Main Process) │
│ │
│ 1. normalize_batch_and_arguments() │
│ 2. _tokenize_one_request() / _batch_tokenize() │
│ 3. _create_tokenized_object() │
│ 4. _send_one_request() ──── ZMQ PUSH ────┐ │
│ │ │
│ 7. _handle_batch_output() ◄── ZMQ PULL ───┼──┐ │
│ 8. _wait_one_response() ──▶ yield │ │ │
│ │ │ │
└────────────────────────────────────────────┼──┼──────┘
│ │
ZMQ IPC │ │
▼ │
┌───────────────┐
│ Scheduler │
│ (subprocess) │
└───────┬───────┘
│
▼
┌────────────────────┐
│ DetokenizerManager │
│ (subprocess) │
└────────────────────┘
핵심은 TokenizerManager가 메인 프로세스에서 asyncio 이벤트 루프를 실행한다는 점이다. HTTP 서버와 같은 프로세스에서 동작하므로, 요청 수신부터 응답 반환까지 비동기적으로 처리된다.
핵심 코드 분석
클래스 구조와 초기화
TokenizerManager는 두 개의 mixin을 상속받는다. TokenizerCommunicatorMixin은 DetokenizerManager로부터 결과를 수신하는 핸들 루프를, TokenizerManagerScoreMixin은 scoring 관련 로직을 제공한다.
class TokenizerManager(TokenizerCommunicatorMixin, TokenizerManagerScoreMixin):
"""TokenizerManager is a process that tokenizes the text."""
def __init__(
self,
server_args: ServerArgs,
port_args: PortArgs,
):
# Parse args
self.server_args = server_args
set_global_server_args_for_tokenizer(server_args)
# Init model config
self.init_model_config()
# Initialize tokenizer and multimodalprocessor
self.init_tokenizer_and_processor()
# Init inter-process communication
self.init_ipc_channels(port_args)
# Init running status
self.init_running_status()
초기화는 7단계로 나뉜다. 모델 설정, tokenizer/processor 초기화, IPC 채널 설정, 실행 상태 초기화, 로깅, weight update, LoRA, disaggregation 순으로 진행된다. 각 단계는 독립적인 메서드로 분리되어 있어 유지보수가 용이하다.
ZMQ IPC 채널 설정
TokenizerManager는 두 개의 ZMQ 소켓을 관리한다. DetokenizerManager로부터 결과를 받는 PULL 소켓과, Scheduler에 요청을 보내는 PUSH 소켓이다.
def init_ipc_channels(self, port_args: PortArgs):
context = zmq.asyncio.Context(2)
self.recv_from_detokenizer = get_zmq_socket(
context, zmq.PULL, port_args.tokenizer_ipc_name, True
)
if self.server_args.tokenizer_worker_num == 1:
self.send_to_scheduler = get_zmq_socket(
context, zmq.PUSH, port_args.scheduler_input_ipc_name, True
)
else:
from sglang.srt.managers.multi_tokenizer_mixin import SenderWrapper
send_to_scheduler = get_zmq_socket(
context, zmq.PUSH, port_args.tokenizer_worker_ipc_name, False
)
self.send_to_scheduler = SenderWrapper(port_args, send_to_scheduler)
zmq.asyncio.Context를 사용하여 asyncio 이벤트 루프와 호환되는 비동기 ZMQ 소켓을 생성한다. multi-tokenizer 모드(tokenizer_worker_num > 1)에서는 SenderWrapper로 감싸서 각 요청에 응답 라우팅을 위한 IPC 이름을 자동 첨부한다.
요청 처리 파이프라인: generate_request
generate_request는 TokenizerManager의 핵심 메서드다. 클라이언트 요청을 받아 토큰화하고, Scheduler에 전달한 뒤 응답을 기다린다.
async def generate_request(
self,
obj: Union[GenerateReqInput, EmbeddingReqInput],
request: Optional[fastapi.Request] = None,
):
self.auto_create_handle_loop()
# Normalize the request
obj.normalize_batch_and_arguments()
self._set_default_priority(obj)
self._validate_rid_not_in_flight(obj)
# ...
async with self.model_update_lock.reader_lock:
await self._validate_and_resolve_lora(obj)
if obj.is_single:
tokenized_obj = await self._tokenize_one_request(obj)
state = self.rid_to_state[obj.rid]
self._send_one_request(tokenized_obj)
async for response in self._wait_one_response(obj, state, request):
yield response
else:
async for response in self._handle_batch_request(obj, request):
yield response
이 메서드는 async generator다. 스트리밍 모드에서는 토큰이 생성될 때마다 yield하고, 비스트리밍 모드에서는 완료 시 한 번 yield한다. model_update_lock의 reader lock을 사용하여 weight update 중에는 새 요청을 차단한다.
토큰화 전략: 단일 vs 배치
TokenizerManager는 입력 유형에 따라 세 가지 토큰화 전략을 제공한다.
async def _tokenize_texts(
self, texts: Union[str, List[str]], is_cross_encoder: bool = False
) -> Union[
Tuple[List[int], Optional[List[int]]],
Tuple[List[List[int]], Optional[List[List[int]]]],
]:
input_format = self._detect_input_format(texts, is_cross_encoder)
tokenizer_input = self._prepare_tokenizer_input(texts, input_format)
use_async_tokenizer = (
self.async_dynamic_batch_tokenizer is not None
and input_format == InputFormat.SINGLE_STRING
)
if use_async_tokenizer:
result = await self.async_dynamic_batch_tokenizer.encode(
tokenizer_input[0], **tokenizer_kwargs
)
input_ids = [result["input_ids"]]
else:
encoded = self.tokenizer(tokenizer_input, **tokenizer_kwargs)
input_ids = encoded["input_ids"]
InputFormat enum으로 입력 유형을 구분한다. SINGLE_STRING일 때는 AsyncDynamicbatchTokenizer를 사용하여 여러 요청의 토큰화를 동적으로 배칭할 수 있고, BATCH_STRINGS나 CROSS_ENCODER_PAIRS일 때는 일반 tokenizer를 사용한다.
요청 상태 관리: ReqState
각 요청의 상태는 ReqState dataclass로 관리된다. 이 객체는 비동기 이벤트 기반의 요청-응답 매칭을 담당한다.
@dataclasses.dataclass
class ReqState:
"""Store the state a request."""
out_list: List[Dict[Any, Any]]
finished: bool
event: asyncio.Event
obj: Union[GenerateReqInput, EmbeddingReqInput]
# For performance metrics
time_stats: APIServerReqTimeStats
last_completion_tokens: int = 1
# For streaming output
last_output_offset: int = 0
last_text_offset: int = 0
# For incremental state update.
text: str = ""
output_ids: List[int] = dataclasses.field(default_factory=list)
event는 asyncio.Event로, DetokenizerManager로부터 결과가 도착하면 set()되고, _wait_one_response에서 wait()한다. out_list에 결과가 누적되므로 스트리밍 중간 결과도 손실 없이 전달된다. last_output_offset과 last_text_offset은 증분 스트리밍 출력을 위한 오프셋으로, 이전에 전송한 부분을 추적하여 중복 없이 delta만 전송한다.
결과 수신과 디스패칭
DetokenizerManager로부터 받은 결과는 TypeBasedDispatcher를 통해 유형별로 라우팅된다.
def init_request_dispatcher(self):
self._result_dispatcher = TypeBasedDispatcher(
[
(
(BatchStrOutput, BatchEmbeddingOutput, BatchTokenIDOutput),
self._handle_batch_output,
),
(AbortReq, self._handle_abort_req),
(OpenSessionReqOutput, self._handle_open_session_req_output),
(UpdateWeightFromDiskReqOutput,
self._handle_update_weights_from_disk_req_output),
(FreezeGCReq, lambda x: None),
(HealthCheckOutput, lambda x: None),
(ActiveRanksOutput, self.update_active_ranks),
]
)
TypeBasedDispatcher는 수신 객체의 타입에 따라 적절한 핸들러를 호출하는 패턴이다. 가장 빈번한 BatchStrOutput은 _handle_batch_output으로 라우팅되어, 각 요청의 ReqState.event를 set()하고, 대기 중인 _wait_one_response 코루틴을 깨운다.
왜 이 설계인가
비동기 파이프라인: TokenizerManager는 메인 프로세스에서 asyncio로 동작한다. 이는 CPU-bound인 토큰화 작업이 GPU-bound인 inference 작업을 blocking하지 않도록 보장한다. ZMQ IPC의 zero-copy 메시징으로 프로세스 간 데이터 전달 오버헤드도 최소화한다.
유연한 토큰화 전략: 단일 요청에는 AsyncDynamicbatchTokenizer로 여러 요청을 동적 배칭하고, 배치 요청에는 HuggingFace tokenizer의 배치 인코딩을 직접 사용한다. cross-encoder 같은 특수 모델도 InputFormat 구분으로 자연스럽게 지원한다.
안전한 동시성: model_update_lock(RWLock)으로 weight update와 inference를 안전하게 분리하고, lora_update_lock으로 LoRA 로딩과 inference를 overlap할 수 있게 한다. 요청 ID 충돌 검증(_validate_rid_not_in_flight)도 race condition을 방지한다.
증분 스트리밍: ReqState에 delta 오프셋을 관리하여 스트리밍 시 전체 텍스트가 아닌 새로 생성된 부분만 전송한다. 다수의 스트리밍 청크가 쌓이면 병합(_merge_incremental_stream_meta_info)하여 네트워크 오버헤드를 줄인다.
관련 포스트
- SGLang Engine: 멀티프로세스 오케스트레이터의 설계와 구현 - TokenizerManager를 생성하고 관리하는 Engine 분석
- SGLang DetokenizerManager: 스트리밍 디토큰화와 증분 출력 - TokenizerManager가 결과를 수신하는 상대편 분석
- SGLang IO 데이터 구조: 요청에서 응답까지의 직렬화 설계 - TokenizerManager가 주고받는 데이터 구조 분석
- SGLang Multi-Tokenizer: 다중 모델 토크나이저 동시 관리 - multi-worker 모드에서의 확장 분석
참고
- SGLang GitHub Repository
python/sglang/srt/managers/tokenizer_manager.py- TokenizerManager 구현python/sglang/srt/managers/tokenizer_communicator_mixin.py- 결과 수신 핸들 루프python/sglang/srt/managers/async_dynamic_batch_tokenizer.py- 동적 배치 토큰화
관련 포스트
- [SGLang] Engine: 멀티프로세스 오케스트레이터의 설계와 구현
- [sglang] DeepSeek-V4의 Latency 최적화: Fused mHC Post/Pre Kernel 도입
- [sglang] sglang ROCm MXFP4 어텐션에서 불필요한 contiguous copy 제거를 통한 성능 최적화
- [sglang] sglang의 torch.compile 활용: Advanced Indexing Gather 최적화로 LLM 추론 가속화
- [sglang] sglang diffusion 모델 성능 향상: Cache-DiT와 torch.compile의 최적화된 적용 순서
SGLang 의 다른글
- 이전글 [SGLang] Chat Template 관리: Jinja 템플릿과 모델별 대화 포맷
- 현재글 : [SGLang] TokenizerManager: 비동기 토큰화 파이프라인의 설계와 구현
- 다음글 [SGLang] DetokenizerManager: 스트리밍 디토큰화와 증분 출력
댓글