본문으로 건너뛰기

[SGLang] Interpreter: SGL 프로그램 실행 엔진의 설계와 구현

들어가며

SGLang은 LLM 프로그램을 Python 함수 형태로 작성하고 실행하는 프레임워크다. 사용자가 sgl.function으로 정의한 프로그램은 내부적으로 IR(Intermediate Representation) 노드의 시퀀스로 변환되고, Interpreter가 이를 하나씩 실행한다.

이 글에서는 python/sglang/lang/interpreter.py의 핵심 구조를 분석한다. Interpreter는 크게 세 가지 역할을 맡는다. 첫째, StreamExecutor가 IR 노드를 백그라운드 스레드에서 순차 실행한다. 둘째, ProgramState가 사용자에게 s += gen("answer") 같은 직관적 인터페이스를 제공한다. 셋째, run_program_batch가 다수의 프로그램을 ThreadPoolExecutor로 병렬 실행한다.

Interpreter 실행 구조도

User Program (sgl.function)
        |
        v
  run_program() / run_program_batch()
        |
        v
  +------------------+
  |  ProgramState    |  <-- 사용자 인터페이스 (s += ..., s["var"])
  |  ┌──────────────┐|
  |  │StreamExecutor││  <-- 백그라운드 스레드에서 IR 실행
  |  │  ┌──────────┐││
  |  │  │  Queue    │││  <-- IR 노드 큐
  |  │  └──────────┘││
  |  │  _execute()  ││  <-- IR 타입별 디스패치
  |  │    |         ││
  |  │    v         ││
  |  │  Backend     ││  <-- generate(), select() 호출
  |  └──────────────┘|
  +------------------+
        |
        v
  ProgramState (결과: text_, variables, meta_info)

핵심 코드 분석

1. StreamExecutor: 백그라운드 스레드 실행 엔진

StreamExecutor는 Interpreter의 핵심이다. 내부에 queue.Queue를 두고 별도 워커 스레드가 IR 노드를 하나씩 꺼내 실행한다. 이 구조 덕분에 사용자 코드는 s += expr 호출 후 즉시 반환되고, 실제 LLM 호출은 비동기로 처리된다.

class StreamExecutor:
    """A stream executor that executes SGL expressions in a background thread."""

    def __init__(self, backend, arguments, default_sampling_para,
                 chat_template, stream, num_api_spec_tokens=None, use_thread=True):
        self.sid = uuid.uuid4().hex
        self.backend: BaseBackend = backend
        self.variables = {}          # Dict[name: str -> value: str]
        self.variable_event = {}     # Dict[name: str -> event: threading.Event]
        self.meta_info = {}
        self.text_ = ""              # 전체 텍스트 누적

        if self.use_thread:
            self.queue = queue.Queue()
            self.worker = threading.Thread(
                target=contextvars.copy_context().run,
                args=(_run_worker_in_context,)
            )
            self.worker.start()

워커 스레드는 _thread_worker_func에서 무한 루프를 돌며 큐에서 표현식을 꺼내 실행한다. None이 들어오면 종료 신호로 처리한다.

def _thread_worker_func(self):
    while True:
        expr = self.queue.get()
        if expr is None:
            self.queue.task_done()
            break
        try:
            self._execute(expr)
        except Exception as e:
            warnings.warn(f"Error in stream_executor: {get_exception_traceback()}")
            error = e
            break
        self.queue.task_done()
        if self.stream_text_event:
            self.stream_text_event.set()

에러 발생 시 남은 큐를 비우고 모든 variable_event를 set하여 대기 중인 스레드가 블로킹되지 않도록 정리한다.

2. IR 타입별 디스패치: _execute 메서드

_execute는 IR 노드의 타입에 따라 적절한 핸들러를 호출하는 디스패치 함수다. SglConstantText, SglGen, SglSelect, SglRoleBegin/End 등 13가지 IR 타입을 처리한다.

def _execute(self, other):
    if isinstance(other, str):
        other = SglConstantText(other)

    if isinstance(other, SglConstantText):
        self._execute_fill(other.value)
    elif isinstance(other, SglGen):
        self._execute_gen(other)
    elif isinstance(other, SglSelect):
        self._execute_select(other)
    elif isinstance(other, SglExprList):
        for x in other.expr_list:
            self._execute(x)
    elif isinstance(other, SglRoleBegin):
        self._execute_role_begin(other)
    elif isinstance(other, SglConcateAndAppend):
        if (global_config.enable_parallel_encoding
                and self.backend.support_concate_and_append):
            self._execute_concatenate_and_append_kv_cache(other)
        else:
            self._execute_concatenate_and_append_text(other)

SglExprList는 재귀적으로 내부 표현식을 순회하고, SglConcateAndAppend는 백엔드의 KV cache 결합 지원 여부에 따라 분기한다. 이 디스패치 구조는 visitor pattern 없이 단순한 isinstance 체인으로 구현되어 있어 새로운 IR 노드 추가가 용이하다.

3. 생성(Generation) 실행과 변수 관리

_execute_gen은 백엔드에 실제 LLM 생성 요청을 보내는 핵심 메서드다. 스트리밍 여부에 따라 실행 경로가 갈린다.

def _execute_gen(self, expr: SglGen):
    sampling_params = self._resolve_sampling_params(expr.sampling_params)
    name = expr.name
    if not self.stream:
        comp, meta_info = self.backend.generate(self, sampling_params=sampling_params)
        self.text_ += comp
        self.variables[name] = comp
        self.meta_info[name] = meta_info
        self.variable_event[name].set()
    else:
        generator = self.backend.generate_stream(self, sampling_params=sampling_params)
        self.variables[name] = ""
        self.stream_var_event[name].set()
        for comp, meta_info in generator:
            self.text_ += comp
            self.variables[name] += comp
            self.meta_info[name] = meta_info
            self.stream_var_event[name].set()
            self.stream_text_event.set()
        self.variable_event[name].set()

비스트리밍 모드에서는 backend.generate()로 전체 결과를 한 번에 받아 text_에 누적하고, variable_event를 set하여 해당 변수를 기다리는 다른 스레드에 알린다. 스트리밍 모드에서는 generate_stream()의 제너레이터를 순회하며 토큰 단위로 누적한다. 이때 stream_var_eventstream_text_event를 매 청크마다 set하여 실시간 소비가 가능하다.

4. ProgramState: 사용자 인터페이스

ProgramStateStreamExecutor를 감싸는 사용자 인터페이스 계층이다. __iadd__를 오버로딩하여 s += expr 문법을 가능하게 한다.

class ProgramState:
    def __init__(self, stream_executor: StreamExecutor):
        self.stream_executor = stream_executor

    def __iadd__(self, other):
        if other is None:
            raise ValueError("Tried to append None to state.")
        self.stream_executor.submit(other)
        return self

    def __getitem__(self, name):
        return self.get_var(name)

role 관리도 ProgramState가 담당한다. system(), user(), assistant() 메서드는 context manager를 반환하여 with s.user(): 블록 안에서 역할별 메시지를 자연스럽게 구성할 수 있다.

5. fork/join: 병렬 분기 실행

StreamExecutor.fork()는 현재 상태를 복제한 여러 executor를 생성한다. 기존 변수, 텍스트, 메시지 히스토리를 복사하고 fork_start_text_pos로 분기 시점을 기록한다.

def fork(self, size: int = 1, position_ids_offset=None):
    if size > 1 and str(self.text_):
        self.submit(SglCommitLazy())
    self.sync()
    exes = [StreamExecutor(self.backend, self.arguments, ...) for _ in range(size)]
    for i in range(size):
        exes[i].variables = dict(self.variables)
        exes[i].text_ = str(self.text_)
        exes[i].messages_ = list(self.messages_)
        exes[i].fork_start_text_pos = len(self.text_)
    return exes

ProgramStateGroup.join()은 두 가지 모드를 지원한다. gather_variable 모드는 자식 상태의 새 변수를 부모로 수집하고, concate_and_append 모드는 KV cache를 직접 결합하여 중복 연산을 피한다.

6. 배치 실행: run_program_batch

다수의 입력에 대해 같은 프로그램을 실행할 때는 run_program_batch를 사용한다.

def run_program_batch(program, backend, batch_arguments, default_sampling_para,
                      num_threads, progress_bar, generator_style=False):
    if global_config.enable_precache_with_tracing and len(batch_arguments) > 1:
        cache_program(program, backend)

    if num_threads == "auto":
        num_threads = max(96, multiprocessing.cpu_count() * 16)
    num_threads = min(num_threads, len(batch_arguments))

배치 실행의 첫 단계에서 cache_program을 호출한다. 이 함수는 tracer로 프로그램을 추적하여 공통 prefix를 추출하고 백엔드에 캐싱을 요청한다. 동일한 system prompt를 공유하는 배치에서 prefix 캐싱은 중복 연산을 크게 줄인다.

generator_style=True일 때는 chunk 단위(200개)로 작업을 제출하고 결과를 yield한다. ThreadPoolExecutor에 전체 작업을 한 번에 넣으면 submit 자체가 블로킹되어 제너레이터의 lazy 특성이 깨지기 때문이다.

인터프리터 vs 컴파일러 접근 비교

SGLang은 Interpreter 외에 tracer 기반 컴파일러 경로도 제공한다. 두 접근 방식의 차이는 다음과 같다.

항목 Interpreter Compiler (Tracer)
실행 방식 런타임에 IR 노드를 즉시 실행 프로그램을 먼저 추적하여 그래프 생성
제어 흐름 Python if/for 자유롭게 사용 정적 트레이싱이므로 동적 분기 제한
최적화 speculative execution, prefix caching prefix 추출, 그래프 수준 최적화
활용 기본 실행 경로 배치 prefix 캐싱에 부분 활용

실제로 cache_program에서 tracer의 extract_prefix_by_tracing을 호출하여 배치의 공통 prefix를 추출하는 것은, 두 접근이 상호 보완적으로 동작하는 사례다.

Interpreter 방식은 Python의 모든 제어 흐름을 지원하므로 범용성이 높다. 반면 tracer는 정적 분석이 가능한 부분에서 더 공격적인 최적화를 수행할 수 있다. SGLang은 기본 실행 경로로 Interpreter를 사용하면서, 필요한 지점에서만 tracer를 보조적으로 활용하는 실용적 설계를 택했다.

관련 포스트

참고

댓글

관련 포스트

SGLang 의 다른글