[sglang] SGLang 고성능 서빙: 비동기 알림 배치 처리와 SSE 고속 경로 최적화 분석
PR 링크: sgl-project/sglang#22658 상태: Merged | 변경: +None / -None
들어가며
최근 대규모 언어 모델(LLM) 서빙 시스템은 높은 동시성과 낮은 지연 시간을 동시에 만족시켜야 하는 과제에 직면해 있습니다. 특히, SGLang 프로젝트의 분산(PD disaggregation) 및 스트리밍 환경에서는 수천 개의 동시 요청을 처리할 때 디코딩 측면의 tokenizer_manager가 CPU 병목 현상의 주요 원인이 되곤 합니다. 본 PR은 이러한 문제를 해결하기 위해 두 가지 핵심적인 최적화를 도입했습니다:
- 비동기 알림(Asyncio wakeup storms) 최적화: 각 요청마다 발생하는
event.set()호출이 과도한 컨텍스트 스위칭을 유발하는 문제를 개선합니다. - SSE(Server-Sent Events) 고속 경로(Fast Path) 최적화: 매 토큰마다 발생하는 Pydantic 객체 생성 및 직렬화 오버헤드를 줄여 응답 생성 속도를 높입니다.
이 글에서는 해당 PR의 코드 변경 사항을 상세히 분석하고, 이러한 최적화가 왜 성능 향상에 기여하는지, 그리고 실제 벤치마크 결과를 통해 그 효과를 검증해 보겠습니다.
코드 분석
1. tokenizer_manager.py: 비동기 알림 최적화
기존 tokenizer_manager의 _handle_batch_output 함수는 각 요청의 완료 시마다 state.event.set()을 호출하여 비동기 코루틴을 즉시 깨웠습니다. 수백 개의 요청이 동시에 완료될 경우, 이는 수백 번의 event.set() 호출과 그에 따른 과도한 컨텍스트 스위칭을 유발하여 CPU 사용률을 높이는 주요 원인이었습니다.
이번 PR에서는 이 부분을 개선하기 위해 _handle_batch_output 함수를 async 함수로 변경하고, event.set() 호출을 즉시 수행하는 대신, 완료된 요청들의 알림을 일정 개수(기본값 16개)만큼 모았다가 한 번에 처리하도록 로직을 수정했습니다.
Before:
def _handle_batch_output(
self,
recv_obj: Union[
BatchStrOutput,
BatchEmbeddingOutput,
BatchTokenIDOutput,
],
):
# ... (중략) ...
for state in states:
state.event.set()
# ... (중략) ...
After:
async def _handle_batch_output(
self,
recv_obj: Union[
BatchStrOutput,
BatchEmbeddingOutput,
BatchTokenIDOutput,
],
):
pending_notify: dict[str, ReqState] = {}
batch_notify_size = self.server_args.batch_notify_size
# ... (중략) ...
for state in states:
pending_notify[state.req_id] = state
if len(pending_notify) >= batch_notify_size:
for req_id, req_state in pending_notify.items():
req_state.event.set()
pending_notify.clear()
await asyncio.sleep(0) # Yield control to the event loop
# Final flush to ensure no notifications are lost
for req_id, req_state in pending_notify.items():
req_state.event.set()
# handle_loop awaits next recv immediately
주요 변경점은 다음과 같습니다:
_handle_batch_output함수가async로 변경되었습니다.event.set()호출은pending_notify딕셔너리에 요청 상태를 저장하며 즉시 호출되지 않습니다.pending_notify의 크기가batch_notify_size(기본값 16)에 도달하면, 모아둔 알림들을 한 번에 처리하고await asyncio.sleep(0)을 호출하여 이벤트 루프에 제어권을 양보합니다. 이는event.set()호출 빈도를 크게 줄여 컨텍스트 스위칭 오버헤드를 감소시킵니다.- 루프 종료 후 남은 알림들을 처리하는 최종 플러시 로직이 추가되어 알림 누락을 방지합니다.
리뷰어(alexnails)는 batch_notify_size를 매직 넘버로 두기보다 server_args를 통해 파라미터화하고, 루프 내에서 self.server_args.batch_notify_size를 반복적으로 접근하는 대신 지역 변수로 캐싱하는 것을 제안했습니다. 이는 성능 최적화와 코드 가독성 측면에서 좋은 지적입니다.
2. serving_chat.py: SSE 고속 경로 최적화
기존 SSE 스트리밍 응답 생성 과정에서는 각 토큰마다 Pydantic 모델(DeltaMessage, ChatCompletionResponseStreamChoice, ChatCompletionStreamResponse)을 생성하고 model_dump_json() 메서드를 호출했습니다. 이 과정은 스키마 검증, 필드 순회, 재귀적 직렬화 등 상당한 오버헤드를 포함하며, 특히 고정된 구조를 가진 스트리밍 응답에는 불필요한 작업이었습니다.
이번 PR에서는 이 부분을 개선하기 위해 msgspec 라이브러리를 사용하여 Pydantic 대신 더 빠른 직렬화 방식을 도입했습니다. _fast_sse_content라는 새로운 헬퍼 함수가 추가되었으며, 이는 Pydantic 객체 대신 msgspec.Struct를 사용하여 Python 딕셔너리를 직접 생성하고 orjson.dumps() 또는 msgspec.json.Encoder를 통해 직렬화합니다.
Before (Pydantic 사용):
# ... (중략) ...
choice_data = ChatCompletionResponseStreamChoice(
index=index,
delta=DeltaMessage(content=delta),
finish_reason=None,
matched_stop=None,
logprobs=choice_logprobs,
)
chunk = ChatCompletionStreamResponse(
id=content["meta_info"].get("id"),
created=int(time.time()),
choices=[choice_data],
model=request.model,
)
yield f"data: {chunk.model_dump_json()}\n\n"
# ... (중략) ...
After (msgspec 사용):
# ... (중략) ...
return (_SSE_DATA_B + _stream_encoder.encode(chunk) + _SSE_NL_B).decode()
def _fast_sse_content(
chunk_id: str,
created: int,
model: str,
index: int,
role: Optional[str] = None,
content: Optional[str] = None,
reasoning_content: Optional[str] = None,
finish_reason: Optional[str] = None,
logprobs: Optional[dict] = None,
matched_stop: Union[None, int, str] = None,
usage: Optional[dict] = None,
) -> str:
delta = _StreamDelta(
role=role, content=content, reasoning_content=reasoning_content
)
choice = _StreamChoice(
index=index,
delta=delta,
logprobs=logprobs,
finish_reason=finish_reason,
matched_stop=matched_stop,
)
chunk = _StreamChunk(
id=chunk_id,
object="chat.completion.chunk",
created=created,
model=model,
choices=[choice],
usage=usage,
)
return (_SSE_DATA_B + _stream_encoder.encode(chunk) + _SSE_NL_B).decode()
# ... (중략) ...
# First chunk with role
if is_firsts.get(index, True):
# ... (중략) ...
yield _fast_sse_content(
chunk_id=content["meta_info"].get("id"),
created=int(time.time()),
model=request.model,
index=index,
role="assistant",
content="",
)
stream_started = True
# ... (중략) ...
# Regular content
if delta:
# ... (중략) ...
yield _fast_sse_content(
chunk_id=content["meta_info"].get("id"),
created=int(time.time()),
model=request.model,
index=index,
content=delta,
logprobs=choice_logprobs,
usage=usage,
)
# ... (중략) ...
# Send finish_reason chunks for each index that completed
# ... (중략) ...
yield _fast_sse_content(
chunk_id=content["meta_info"].get("id"),
created=int(time.time()),
model=request.model,
index=idx,
finish_reason=final_finish_reason,
matched_stop=matched_stop,
)
주요 변경점은 다음과 같습니다:
msgspec.Struct를 사용하여_StreamDelta,_StreamChoice,_StreamChunk클래스를 정의했습니다.omit_defaults=True옵션을 사용하여 기본값이None인 필드는 JSON 출력에서 제외시켜 페이로드 크기를 줄였습니다. 이는 OpenAI의 실제 SSE 형식과 일치하며, 직렬화 성능도 향상시킵니다._fast_sse_content함수는 이러한msgspec구조체를 사용하여 SSE 응답 청크를 생성합니다.- 기존 Pydantic 기반의
ChatCompletionStreamResponse및DeltaMessage생성 로직이_fast_sse_content호출로 대체되었습니다. 이는 첫 번째 청크, 일반 콘텐츠 청크, 그리고 종료 이유 청크 등 여러 부분에 적용되었습니다. UsageProcessor.calculate_token_usage의 결과는model_dump()를 호출하여dict형태로 변환한 후_fast_sse_content에 전달됩니다. 이는msgspec이 Pydantic 모델을 직접 처리하지 못하는 문제를 해결합니다.
리뷰어(alexnails)는 tool_calls 필드가 _fast_sse_content에서는 사용되지 않으므로 불필요하며 제거해야 한다고 지적했습니다. 또한, omit_defaults=True 적용, usage 필드의 타입 명확화 (Pydantic 모델 대신 dict 사용), 그리고 SSE 데이터 형식을 바이트(bytes)로 유지하는 등의 세부적인 개선 사항을 제안했습니다. 이러한 제안들은 페이로드 크기를 줄이고 직렬화 속도를 높이는 데 기여합니다.
왜 이게 좋은가?
이번 PR의 최적화는 두 가지 주요 병목 지점을 효과적으로 해결하여 시스템의 전반적인 성능을 크게 향상시켰습니다.
- CPU 병목 완화:
tokenizer_manager에서의 비동기 알림 처리를 배치(batch) 방식으로 변경하고asyncio.sleep(0)을 통해 이벤트 루프에 제어권을 적절히 양보함으로써, 수천 개의 동시 요청 처리 시 발생하는 과도한 컨텍스트 스위칭 오버헤드를 크게 줄였습니다. 이는 CPU 사용률을 낮추고 더 많은 요청을 처리할 수 있는 기반을 마련합니다. - 응답 생성 속도 향상: SSE 스트리밍 응답 생성 시 Pydantic의 무거운 직렬화 과정을
msgspec과orjson을 활용한 고속 경로로 대체했습니다. 이는 각 토큰 생성 시 발생하는 오버헤드를 수 밀리초에서 수 마이크로초 수준으로 줄여, 최종 사용자에게 응답이 전달되는 시간을 단축시킵니다.
벤치마크 결과:
제공된 벤치마크 설정(10,240개 프롬프트, 최대 동시성 2048) 하에서 다음과 같은 성능 향상이 관찰되었습니다:
- 출력 처리량 (Output throughput): 11,488 → 13,802 tok/s (+20.1%)
- 평균 TPOT (Time Per Output Token, ms): 110.74 → 92.62 ms (-16.4%)
- 평균 ITL (Inference Time Latency, ms): 110.61 → 92.53 ms (-16.4%)
이 결과는 제안된 최적화가 실제 운영 환경에서 상당한 성능 개선을 가져옴을 명확히 보여줍니다. 특히, TPOT와 ITL의 감소는 사용자 경험에 직접적인 영향을 미치는 지연 시간 단축을 의미합니다.
일반적인 교훈:
- 비동기 이벤트 처리 최적화: 고부하 비동기 시스템에서는 개별 이벤트 처리가 병목이 될 수 있습니다. 이벤트를 배치 처리하고 이벤트 루프에 제어권을 적절히 양보하는 패턴은 CPU 오버헤드를 줄이는 데 효과적입니다.
- 직렬화/역직렬화 오버헤드 최소화: API 응답 생성 시, 특히 스트리밍과 같이 빈번하게 발생하는 작업에서는 직렬화 라이브러리 선택이 중요합니다. Pydantic과 같은 ORM/데이터 검증 라이브러리는 편리하지만, 성능이 중요한 경로에서는
msgspec,orjson등 더 가벼운 라이브러리를 고려해야 합니다.omit_defaults=True와 같은 옵션 활용은 페이로드 크기 및 직렬화 속도 향상에 큰 도움이 됩니다. - 프로파일링 기반 최적화: 성능 병목은 종종 예상치 못한 곳에서 발생합니다. 프로파일링을 통해 실제 병목 지점을 정확히 파악하고, 해당 지점에 대한 집중적인 최적화를 수행하는 것이 중요합니다.
References
참고 자료
- https://jcrisp.github.io/msgspec/
- https://github.com/ijl/orjson/
- https://docs.pydantic.dev/latest/
- https://docs.python.org/3/library/asyncio-task.html#asyncio.sleep
⚠️ 알림: 이 분석은 AI가 실제 코드 diff를 기반으로 작성했습니다.
관련 포스트
PR Analysis 의 다른글
- 이전글 [abtop] Codex 세션 파일 검색 성능 개선: lsof 대신 /proc/pid/fd 활용
- 현재글 : [sglang] SGLang 고성능 서빙: 비동기 알림 배치 처리와 SSE 고속 경로 최적화 분석
- 다음글 [sglang] SGLang Triton 커널 최적화: libdevice.tanh 도입과 2D Strided Tensor 지원
댓글