본문으로 건너뛰기

[SGLang] Ray 통합: 분산 엔진과 스케줄러 액터

들어가며

SGLang은 기본적으로 multiprocessing으로 Scheduler 프로세스를 실행한다. 그러나 멀티노드 환경이나 Kubernetes 클러스터에서는 Ray를 사용하여 프로세스 배치와 생명주기를 관리할 수 있다. RayEngineEngine을 상속하며, Scheduler를 Ray Actor로 실행하는 것이 핵심 차이다.

구조도

RayEngine (Engine 상속)
    │
    ├── PlacementGroup (Ray 리소스 관리)
    │     ├── Bundle 0 (Node 0) ── GPU x N
    │     ├── Bundle 1 (Node 1) ── GPU x N
    │     └── ...
    │
    └── _launch_scheduler_processes()
          │
          ├── SchedulerActor (Ray Actor)
          │     ├── __init__: Scheduler 생성, 모델 로딩
          │     ├── get_info(): 초기화 정보 반환
          │     └── run_event_loop(): 이벤트 루프 실행
          │
          ├── SchedulerActor (pp=0, tp=1)
          ├── SchedulerActor (pp=1, tp=0)
          └── ...

통신: ZMQ (요청/응답) + NCCL (텐서)

핵심 코드 분석

RayEngine

RayEngineEngine을 상속하여 _launch_scheduler_processes()만 오버라이드한다.

class RayEngine(Engine):
    @classmethod
    def _launch_scheduler_processes(cls, server_args, port_args,
                                     run_scheduler_process_func):
        if server_args.dp_size > 1:
            raise NotImplementedError(
                "Ray support for dp_size > 1 is not yet implemented.")

        pg = ray.util.get_current_placement_group()
        if pg is None:
            raise RuntimeError(
                "use_ray=True requires a placement group")

        world_size = server_args.tp_size * server_args.pp_size
        nnodes = server_args.nnodes
        gpus_per_node = world_size // nnodes

현재 Ray 모드는 dp_size=1만 지원한다. Placement Group이 필수적이며, GPU 배치를 Ray가 관리한다.

Engine-Scheduler 공존 보장

Rank 0 Scheduler는 Engine과 같은 노드에 있어야 한다. _find_engine_bundle()이 이를 보장한다.

def _find_engine_bundle(placement_group, nnodes):
    engine_ip = ray.util.get_node_ip_address()

    @ray.remote(num_cpus=0, num_gpus=0)
    def get_node_ip():
        return ray.util.get_node_ip_address()

    bundle_ips = ray.get([
        get_node_ip.options(
            scheduling_strategy=PlacementGroupSchedulingStrategy(
                placement_group=placement_group,
                placement_group_bundle_index=i,
            ),
        ).remote()
        for i in range(nnodes)
    ])
    return bundle_ips.index(engine_ip), engine_ip

각 번들에 경량 리모트 함수를 배치하여 IP를 확인하고, Engine IP와 일치하는 번들을 찾는다.

SchedulerActor 생성

for node_idx in range(nnodes):
    bundle_idx = bundle_for_node[node_idx]
    for pp_rank in pp_range:
        for tp_rank in tp_range:
            actor = SchedulerActor.options(
                num_cpus=0, num_gpus=1,
                name=f"sglang_scheduler_rank0node={rank0_node_ip}_pp{pp_rank}_tp{tp_rank}",
                scheduling_strategy=PlacementGroupSchedulingStrategy(
                    placement_group=pg,
                    placement_group_bundle_index=bundle_idx,
                ),
            ).remote(
                server_args=server_args, port_args=port_args,
                gpu_id=local_gpu_idx, tp_rank=tp_rank,
                attn_cp_rank=attn_cp_rank, moe_dp_rank=moe_dp_rank,
                moe_ep_rank=moe_ep_rank, pp_rank=pp_rank,
                dp_rank=0, dist_init_addr=dist_init_addr,
            )

각 Actor는 num_gpus=1로 GPU 1개를 요청하고, PlacementGroupSchedulingStrategy로 특정 노드에 배치된다.

SchedulerActor 내부

@ray.remote
class SchedulerActor:
    def __init__(self, server_args, port_args, gpu_id, tp_rank,
                 attn_cp_rank, moe_dp_rank, moe_ep_rank, pp_rank,
                 dp_rank, dist_init_addr=None):
        # dist_init_addr 오버라이드 (멀티노드)
        if dist_init_addr:
            server_args = dataclasses.replace(
                server_args, dist_init_addr=dist_init_addr)

        # Ray가 할당한 실제 GPU ID 획득
        accelerator_ids = ray.get_runtime_context().get_accelerator_ids()
        assigned_gpus = accelerator_ids.get("GPU", [])
        if assigned_gpus:
            actual_gpu_id = int(assigned_gpus[0])

        # Scheduler 생성 (모델 로딩 포함)
        self.scheduler = Scheduler(
            server_args=server_args, port_args=port_args,
            gpu_id=actual_gpu_id, tp_rank=tp_rank, ...)

핵심 포인트: Ray의 런타임 컨텍스트에서 실제 GPU ID를 가져온다. 사용자가 전달한 gpu_id는 논리적 인덱스일 뿐, Ray가 할당한 물리적 GPU ID와 다를 수 있다.

이벤트 루프 실행

def run_event_loop(self) -> None:
    try:
        import torch
        torch.cuda.set_device(self.scheduler.gpu_id)
        self.scheduler.run_event_loop()
    except Exception as e:
        logger.error(f"Scheduler PP{self._pp_rank} TP{self._tp_rank} crashed: {e}")
        raise

이벤트 루프 시작 전에 torch.cuda.set_device를 호출하여 NCCL 통신이 올바른 GPU에서 동작하도록 보장한다.

초기화 대기와 에러 처리

try:
    scheduler_infos = ray.get(
        [actor.get_info.remote() for actor in scheduler_actors])
except ray.exceptions.RayActorError as e:
    for actor in scheduler_actors:
        try:
            ray.kill(actor)
        except Exception:
            pass
    raise RuntimeError(f"Scheduler actor failed to initialize: {e}")

event_loop_refs = [actor.run_event_loop.remote() for actor in scheduler_actors]

모든 Actor가 get_info()를 반환할 때까지 동기적으로 대기한다. 하나라도 실패하면 전체 Actor를 kill한다.

Shutdown

class RayEngine(Engine):
    def shutdown(self):
        for actor in self._scheduler_init_result.scheduler_actors:
            try:
                ray.kill(actor)
            except Exception:
                logger.error(f"Failed to kill Ray scheduler actor: {actor}")
        super().shutdown()

Ray Actor는 ray.kill()로 강제 종료하고, 부모 클래스의 shutdown()으로 나머지 리소스를 정리한다.

비교: multiprocessing vs Ray

특성 multiprocessing Ray
멀티노드 수동 설정 Placement Group 자동 배치
GPU 할당 CUDA_VISIBLE_DEVICES 수동 Ray 자동 할당
장애 복구 프로세스 모니터링 필요 Actor 재시작 가능
DP 지원 지원 미구현 (dp_size=1만)
오버헤드 낮음 Ray 런타임 오버헤드

설계 근거

왜 ZMQ와 NCCL을 함께 쓰는가? Ray는 Actor 생명주기와 GPU 배치만 관리한다. 실제 추론 요청의 빠른 전달에는 ZMQ가, GPU 텐서 통신에는 NCCL이 적합하다. Ray의 Object Store를 통한 데이터 전달은 직렬화 오버헤드가 크므로 추론 경로에서는 사용하지 않는다.

왜 dp_size > 1을 아직 지원하지 않는가? DataParallelController가 multiprocessing과 ZMQ에 강하게 결합되어 있다. Ray Actor 기반으로 DP Controller를 재구현하려면 포트 관리, 워커 생성 등 상당한 리팩터링이 필요하다.

관련 포스트

참고

댓글

관련 포스트

SGLang 의 다른글