본문으로 건너뛰기

[SGLang] Custom All-Reduce: NCCL 너머의 최적화된 집합 통신

들어가며

NCCL은 범용 집합 통신 라이브러리로, 대규모 텐서에서 최고의 대역폭을 발휘한다. 그러나 LLM 추론의 AllReduce 대상은 대부분 8MB 이하의 소규모 텐서다. 이 영역에서 NCCL의 커널 런치 오버헤드가 지연시간의 주된 병목이 된다.

SGLang의 Custom AllReduce는 IPC(Inter-Process Communication) 공유 메모리를 활용하여 NCCL의 지연시간을 크게 줄인다. 같은 노드 내 NVLink 연결 GPU 사이에서 동작하며, v1(기존)과 v2(JIT 컴파일) 두 가지 구현이 존재한다.

구조도

CustomAllreduce (v1)
  ├── meta_ptrs[]       ─── IPC 공유 버퍼 (동기화 메타데이터 + 임시 결과)
  ├── buffer_ptrs[]     ─── IPC 사전 등록 버퍼 (Eager 모드용)
  ├── rank_data         ─── 모든 랭크의 포인터 튜플 저장
  └── _ptr              ─── C++ 커스텀 커널 핸들
       │
       ├── ops.all_reduce()          ── 등록된/미등록 텐서 AllReduce
       ├── ops.register_buffer()     ── IPC 버퍼 등록
       └── ops.register_graph_buffers() ── CUDA Graph 버퍼 등록

CustomAllReduceV2 (JIT)
  └── sglang.jit_kernel.all_reduce
       ├── one_shot_push     ── 소규모 텐서
       ├── one_shot_pull     ── 중규모 텐서
       └── two_shot          ── 대규모 텐서

핵심 코드 분석

적용 조건 판단

Custom AllReduce를 사용할지 여부는 should_custom_ar()가 결정한다.

def should_custom_ar(self, inp: torch.Tensor):
    if self.disabled:
        return False
    inp_size = inp.numel() * inp.element_size()
    # custom allreduce requires input byte size to be multiples of 16
    if inp_size % 16 != 0:
        return False
    if not is_weak_contiguous(inp):
        return False
    if not _is_hip:
        if self.world_size == 2 or self.full_nvlink:
            return inp_size <= self.max_size
        return False
    return False

핵심 조건은 세 가지다: (1) 16바이트 정렬, (2) 메모리 연속성, (3) NVLink 완전 연결 또는 2-GPU. 4-GPU 이상에서 NVLink 없이는 NCCL 대비 이점이 없다.

최대 크기 임계값

class CustomAllreduce:
    _SUPPORTED_WORLD_SIZES = [2, 4, 6, 8]
    _MAX_CAR_SIZE = 8192 * 1024  # 8MB (CUDA)
    if _is_hip:
        _MAX_CAR_SIZE = 2 * 8192 * 1024  # 16MB (ROCm)
    if _is_musa:
        _MAX_CAR_SIZE = 16 * 8196 * 1024  # 128MB (MUSA)

이 임계값은 Custom AllReduce가 NCCL보다 빠른 크로스오버 포인트를 반영한다. CUDA에서는 8MB, ROCm에서는 16MB가 경계다.

IPC 공유 버퍼 생성

같은 노드의 GPU들이 직접 메모리에 접근할 수 있도록 IPC 핸들을 교환한다.

@staticmethod
def create_shared_buffer(size_in_bytes, group=None):
    lib = CudaRTLibrary()
    pointer = lib.cudaMalloc(size_in_bytes)
    handle = lib.cudaIpcGetMemHandle(pointer)
    world_size = dist.get_world_size(group=group)
    rank = dist.get_rank(group=group)
    handles = [None] * world_size
    dist.all_gather_object(handles, handle, group=group)

    pointers = []
    for i, h in enumerate(handles):
        if i == rank:
            pointers.append(pointer.value)
        else:
            pointers.append(lib.cudaIpcOpenMemHandle(h).value)
    return pointers

각 GPU가 cudaMalloc으로 버퍼를 할당하고, cudaIpcGetMemHandle로 핸들을 생성한 뒤, all_gather_object로 모든 랭크에 공유한다. 다른 랭크의 핸들은 cudaIpcOpenMemHandle로 열어 직접 접근 가능한 포인터를 얻는다.

AllReduce 실행

def _all_reduce_impl(self, inp: torch.Tensor, registered: bool):
    out = torch.empty_like(inp)
    if not _is_hip:
        if registered:
            ops.all_reduce(self._ptr, inp, out, 0, 0)
        else:
            ops.all_reduce(
                self._ptr, inp, out, self.buffer_ptrs[self.rank], self.max_size)
    elif self.use_amd_deterministic_impl:
        inp_size = inp.numel() * inp.element_size()
        if inp_size < self.max_size:
            reg_buffer = self.buffer.view(inp.dtype)[:inp.numel()]
            ops.deterministic_all_reduce_unreg(self._ptr, inp, reg_buffer, out)
        else:
            self.register_buffer(inp)
            ops.deterministic_all_reduce_reg(self._ptr, inp, out)
    return out

registered=True는 CUDA Graph에서 사전 등록된 버퍼를 사용하는 경우다. Eager 모드에서는 입력 텐서를 먼저 IPC 버퍼에 복사한 후 AllReduce를 수행한다.

CUDA Graph 지원

def custom_all_reduce(self, input: torch.Tensor) -> Optional[torch.Tensor]:
    if self.disabled or not self.should_custom_ar(input):
        return None
    if self._IS_CAPTURING:
        if torch.cuda.is_current_stream_capturing():
            return self._all_reduce_impl(input, registered=not self.tms_cudagraph)
        else:
            if is_in_piecewise_cuda_graph():
                return self._all_reduce_impl(input, registered=False)
            else:
                return torch.zeros_like(input)  # warmup
    else:
        return self._all_reduce_impl(input, registered=False)

CUDA Graph 캡처 중에는 세 가지 경로가 있다: (1) 실제 캡처 중이면 등록된 버퍼로 AllReduce, (2) Piecewise CUDA Graph의 split op이면 실제 AllReduce, (3) warmup이면 할당 패턴만 모방.

v2: JIT 컴파일 구현

환경변수 SGLANG_USE_JIT_ALL_REDUCE=1로 활성화되는 v2는 세 가지 알고리즘을 크기에 따라 선택한다.

@dataclass(frozen=True)
class ModeConfig:
    one_shot_push_threshold: int  # 이하: one-shot push
    one_shot_pull_threshold: int  # 이하: one-shot pull
    # 초과: two-shot

class CustomAllReduceV2:
    def __init__(self, group, device, max_pull_size=None, max_push_size=None):
        # ...
        self.obj = get_custom_all_reduce_cls()(
            rank=self.rank, world_size=self.world_size,
            pull_buffer_bytes=self.max_pull_size,  # default 16MB
            push_buffer_bytes=self.max_push_size,
            graph_input_count=131072,
        )

디스패치 로직

어떤 CustomAllreduce 구현을 사용할지 결정하는 dispatch_custom_allreduce() 함수가 있다.

def dispatch_custom_allreduce():
    if _is_cuda and get_bool_env_var("SGLANG_USE_JIT_ALL_REDUCE", default="false"):
        from .custom_all_reduce_v2 import CustomAllReduceV2
        return CustomAllReduceV2
    if _is_cuda or _is_musa:
        return CustomAllreduce
    # AMD ROCm: AITER 구현 또는 sglang 기본 구현
    if get_bool_env_var("SGLANG_USE_AITER_AR", default="true"):
        try:
            from aiter.dist.device_communicators.custom_all_reduce import (
                CustomAllreduce as AiterCustomAllreduce)
            return partial(AiterCustomAllreduce, ...)
        except ImportError:
            return CustomAllreduce

비교: NCCL vs Custom AllReduce

특성 NCCL Custom AllReduce
지연시간 (소규모) 높음 (커널 런치 오버헤드) 낮음 (IPC 직접 접근)
대역폭 (대규모) 최적화됨 8MB 이상에서 열세
멀티노드 지원 미지원 (같은 노드만)
NVLink 필요 불필요 4+ GPU에서 필수

관련 포스트

참고

댓글

관련 포스트

SGLang 의 다른글