본문으로 건너뛰기

[SGLang] Data Parallel Controller: 다중 인스턴스 조율

들어가며

TP(Tensor Parallelism)가 하나의 모델을 여러 GPU에 분할하는 것이라면, DP(Data Parallelism)는 모델 복제본 여러 개를 두고 요청을 분산하는 전략이다. SGLang의 DataParallelController는 이 분산의 핵심으로, Tokenizer에서 들어오는 요청을 여러 Scheduler 인스턴스에 라우팅한다.

구조도

Tokenizer
    │ (ZMQ PUSH)
    ▼
DataParallelController
    │
    ├── LoadBalanceMethod 선택
    │     ├── ROUND_ROBIN
    │     ├── FOLLOW_BOOTSTRAP_ROOM
    │     ├── TOTAL_REQUESTS
    │     └── TOTAL_TOKENS
    │
    ├── workers[0] ──ZMQ PUSH──▶ Scheduler(DP=0, TP 그룹)
    ├── workers[1] ──ZMQ PUSH──▶ Scheduler(DP=1, TP 그룹)
    └── workers[N] ──ZMQ PUSH──▶ Scheduler(DP=N, TP 그룹)

DPBudget
    ├── total_requests[dp_size]
    └── total_tokens[dp_size]

핵심 코드 분석

로드 밸런싱 전략

SGLang은 4가지 로드 밸런싱 방법을 지원한다.

class LoadBalanceMethod(Enum):
    ROUND_ROBIN = auto()
    FOLLOW_BOOTSTRAP_ROOM = auto()
    TOTAL_REQUESTS = auto()
    TOTAL_TOKENS = auto()

각 방법에 대응하는 디스패처가 초기화 시 선택된다:

dispatch_lookup = {
    LoadBalanceMethod.ROUND_ROBIN: self.round_robin_scheduler,
    LoadBalanceMethod.FOLLOW_BOOTSTRAP_ROOM: self.follow_bootstrap_room_scheduler,
    LoadBalanceMethod.TOTAL_REQUESTS: self.total_requests_scheduler,
    LoadBalanceMethod.TOTAL_TOKENS: self.total_tokens_scheduler,
}
self.dispatching = dispatch_lookup[self.load_balance_method]

DPBudget: 부하 추적

각 DP 워커의 부하를 실시간으로 추적하는 클래스다.

class DPBudget:
    def __init__(self, dp_size: int):
        self.dp_size = dp_size
        self.total_requests = [0] * dp_size
        self.total_tokens = [0] * dp_size

    def dispatch(self, method: LoadBalanceMethod):
        if method == LoadBalanceMethod.TOTAL_REQUESTS:
            target_rank = self.total_requests.index(min(self.total_requests))
        elif method == LoadBalanceMethod.TOTAL_TOKENS:
            target_rank = min(
                range(self.dp_size),
                key=lambda i: (self.total_tokens[i], self.total_requests[i]))
        # 휴리스틱: 선택된 워커의 부하를 1 증가
        self.total_requests[target_rank] += 1
        return target_rank

TOTAL_TOKENS 방식은 토큰 수를 1차 기준, 요청 수를 2차 tie-breaker로 사용한다. dispatch() 호출 시 선택된 워커의 요청 수를 즉시 1 증가시켜, 연속 요청이 같은 워커에 몰리는 것을 방지한다.

Round Robin 스케줄러

가장 단순한 방식이지만, 비활성 워커를 건너뛴다.

def round_robin_scheduler(self, req: Req):
    if self.maybe_external_dp_rank_routing(req):
        return
    while True:
        if self.status[self.round_robin_counter]:
            self.workers[self.round_robin_counter].send_pyobj(req)
            self.round_robin_counter = (self.round_robin_counter + 1) % len(self.workers)
            break
        self.round_robin_counter = (self.round_robin_counter + 1) % len(self.workers)

self.statusActiveRanksOutput으로 업데이트되며, 장애가 발생한 워커를 자동으로 우회한다.

DP Attention 모드 워커 실행

DP Attention이 활성화되면, TP 그룹이 DP를 겸한다. 워커 실행 로직이 달라진다:

def launch_dp_attention_schedulers(self, server_args, port_args):
    # 노드 0에서 포트 사전 할당
    worker_ports = []
    if server_args.node_rank == 0:
        for dp_rank in range(server_args.dp_size):
            worker_port, worker_socket = get_zmq_socket_on_host(
                self.context, zmq.PUSH, host=bind_host)
            worker_ports.append(worker_port)
            self.workers[dp_rank] = worker_socket

    # 멀티노드: 포트 정보를 모든 노드에 broadcast
    broadcasted_ports = self._broadcast_worker_ports(server_args, worker_ports)
    self.launch_tensor_parallel_group(
        server_args, port_args, 0, None, broadcasted_ports)

일반 DP에서는 각 DP 랭크가 독립적인 TP 그룹을 갖지만, DP Attention에서는 하나의 큰 TP 그룹 안에서 DP를 시뮬레이션한다.

프로세스 계층 구조

각 DP 워커는 하나의 TP 그룹(여러 GPU)을 관리하는 Scheduler 프로세스를 생성한다.

def launch_tensor_parallel_group(self, server_args, port_args,
                                  base_gpu_id, dp_rank, ...):
    for pp_rank in pp_rank_range:
        for tp_rank in tp_rank_range:
            gpu_id = base_gpu_id + ...
            proc = mp.Process(
                target=self.run_scheduler_process_func,
                args=(server_args, rank_port_args, gpu_id,
                      tp_rank, attn_cp_rank, moe_dp_rank,
                      moe_ep_rank, pp_rank, dp_rank, writer))
            proc.start()

병렬화 hierarchy가 복잡하게 조합된다: Attention 쪽은 attn_cp_rank, MoE 쪽은 moe_dp_rank, moe_ep_rank가 별도로 계산된다.

이벤트 루프

Controller의 메인 루프는 ZMQ non-blocking 수신으로 동작한다.

def event_loop(self):
    while True:
        while True:
            self.soft_watchdog.feed()
            try:
                recv_req = self.recv_from_tokenizer.recv_pyobj(zmq.NOBLOCK)
            except zmq.ZMQError:
                break
            self._request_dispatcher(recv_req)

TypeBasedDispatcher가 요청 타입에 따라 적절한 핸들러를 호출한다. TokenizedGenerateReqInput은 로드 밸런싱을 거치고, ProfileReq 같은 제어 메시지는 모든 워커에 브로드캐스트된다.

멀티노드 포트 브로드캐스트

멀티노드 환경에서는 노드 0이 할당한 ZMQ 포트를 다른 노드에 전파해야 한다.

def _broadcast_worker_ports(self, server_args, worker_ports=None):
    if server_args.node_rank == 0:
        return self._broadcast_ports_as_server(endpoint, nnodes - 1, worker_ports)
    else:
        return self._receive_ports_as_client(endpoint, server_args.node_rank)

REQ-REP 패턴으로 구현되며, 클라이언트 노드는 10분 타임아웃으로 포트 정보를 수신한다.

설계 근거

왜 ZMQ인가? DP Controller는 CPU에서 동작하는 조율 프로세스다. GPU 통신(NCCL)이 아닌 프로세스 간 메시지 전달이 필요하며, ZMQ의 PUSH/PULL 패턴이 이 용도에 적합하다.

왜 TOTAL_TOKENS가 기본이 아닌가? 토큰 수 기반 밸런싱은 정확하지만, 부하 정보가 비동기로 업데이트되므로 약간의 지연이 있다. 간단한 시나리오에서는 ROUND_ROBIN이 오버헤드가 적다.

관련 포스트

참고

댓글

관련 포스트

SGLang 의 다른글