[SGLang] Custom All-Reduce: NCCL 너머의 최적화된 집합 통신
들어가며
NCCL은 범용 집합 통신 라이브러리로, 대규모 텐서에서 최고의 대역폭을 발휘한다. 그러나 LLM 추론의 AllReduce 대상은 대부분 8MB 이하의 소규모 텐서다. 이 영역에서 NCCL의 커널 런치 오버헤드가 지연시간의 주된 병목이 된다.
SGLang의 Custom AllReduce는 IPC(Inter-Process Communication) 공유 메모리를 활용하여 NCCL의 지연시간을 크게 줄인다. 같은 노드 내 NVLink 연결 GPU 사이에서 동작하며, v1(기존)과 v2(JIT 컴파일) 두 가지 구현이 존재한다.
구조도
CustomAllreduce (v1)
├── meta_ptrs[] ─── IPC 공유 버퍼 (동기화 메타데이터 + 임시 결과)
├── buffer_ptrs[] ─── IPC 사전 등록 버퍼 (Eager 모드용)
├── rank_data ─── 모든 랭크의 포인터 튜플 저장
└── _ptr ─── C++ 커스텀 커널 핸들
│
├── ops.all_reduce() ── 등록된/미등록 텐서 AllReduce
├── ops.register_buffer() ── IPC 버퍼 등록
└── ops.register_graph_buffers() ── CUDA Graph 버퍼 등록
CustomAllReduceV2 (JIT)
└── sglang.jit_kernel.all_reduce
├── one_shot_push ── 소규모 텐서
├── one_shot_pull ── 중규모 텐서
└── two_shot ── 대규모 텐서
핵심 코드 분석
적용 조건 판단
Custom AllReduce를 사용할지 여부는 should_custom_ar()가 결정한다.
def should_custom_ar(self, inp: torch.Tensor):
if self.disabled:
return False
inp_size = inp.numel() * inp.element_size()
# custom allreduce requires input byte size to be multiples of 16
if inp_size % 16 != 0:
return False
if not is_weak_contiguous(inp):
return False
if not _is_hip:
if self.world_size == 2 or self.full_nvlink:
return inp_size <= self.max_size
return False
return False
핵심 조건은 세 가지다: (1) 16바이트 정렬, (2) 메모리 연속성, (3) NVLink 완전 연결 또는 2-GPU. 4-GPU 이상에서 NVLink 없이는 NCCL 대비 이점이 없다.
최대 크기 임계값
class CustomAllreduce:
_SUPPORTED_WORLD_SIZES = [2, 4, 6, 8]
_MAX_CAR_SIZE = 8192 * 1024 # 8MB (CUDA)
if _is_hip:
_MAX_CAR_SIZE = 2 * 8192 * 1024 # 16MB (ROCm)
if _is_musa:
_MAX_CAR_SIZE = 16 * 8196 * 1024 # 128MB (MUSA)
이 임계값은 Custom AllReduce가 NCCL보다 빠른 크로스오버 포인트를 반영한다. CUDA에서는 8MB, ROCm에서는 16MB가 경계다.
IPC 공유 버퍼 생성
같은 노드의 GPU들이 직접 메모리에 접근할 수 있도록 IPC 핸들을 교환한다.
@staticmethod
def create_shared_buffer(size_in_bytes, group=None):
lib = CudaRTLibrary()
pointer = lib.cudaMalloc(size_in_bytes)
handle = lib.cudaIpcGetMemHandle(pointer)
world_size = dist.get_world_size(group=group)
rank = dist.get_rank(group=group)
handles = [None] * world_size
dist.all_gather_object(handles, handle, group=group)
pointers = []
for i, h in enumerate(handles):
if i == rank:
pointers.append(pointer.value)
else:
pointers.append(lib.cudaIpcOpenMemHandle(h).value)
return pointers
각 GPU가 cudaMalloc으로 버퍼를 할당하고, cudaIpcGetMemHandle로 핸들을 생성한 뒤, all_gather_object로 모든 랭크에 공유한다. 다른 랭크의 핸들은 cudaIpcOpenMemHandle로 열어 직접 접근 가능한 포인터를 얻는다.
AllReduce 실행
def _all_reduce_impl(self, inp: torch.Tensor, registered: bool):
out = torch.empty_like(inp)
if not _is_hip:
if registered:
ops.all_reduce(self._ptr, inp, out, 0, 0)
else:
ops.all_reduce(
self._ptr, inp, out, self.buffer_ptrs[self.rank], self.max_size)
elif self.use_amd_deterministic_impl:
inp_size = inp.numel() * inp.element_size()
if inp_size < self.max_size:
reg_buffer = self.buffer.view(inp.dtype)[:inp.numel()]
ops.deterministic_all_reduce_unreg(self._ptr, inp, reg_buffer, out)
else:
self.register_buffer(inp)
ops.deterministic_all_reduce_reg(self._ptr, inp, out)
return out
registered=True는 CUDA Graph에서 사전 등록된 버퍼를 사용하는 경우다. Eager 모드에서는 입력 텐서를 먼저 IPC 버퍼에 복사한 후 AllReduce를 수행한다.
CUDA Graph 지원
def custom_all_reduce(self, input: torch.Tensor) -> Optional[torch.Tensor]:
if self.disabled or not self.should_custom_ar(input):
return None
if self._IS_CAPTURING:
if torch.cuda.is_current_stream_capturing():
return self._all_reduce_impl(input, registered=not self.tms_cudagraph)
else:
if is_in_piecewise_cuda_graph():
return self._all_reduce_impl(input, registered=False)
else:
return torch.zeros_like(input) # warmup
else:
return self._all_reduce_impl(input, registered=False)
CUDA Graph 캡처 중에는 세 가지 경로가 있다: (1) 실제 캡처 중이면 등록된 버퍼로 AllReduce, (2) Piecewise CUDA Graph의 split op이면 실제 AllReduce, (3) warmup이면 할당 패턴만 모방.
v2: JIT 컴파일 구현
환경변수 SGLANG_USE_JIT_ALL_REDUCE=1로 활성화되는 v2는 세 가지 알고리즘을 크기에 따라 선택한다.
@dataclass(frozen=True)
class ModeConfig:
one_shot_push_threshold: int # 이하: one-shot push
one_shot_pull_threshold: int # 이하: one-shot pull
# 초과: two-shot
class CustomAllReduceV2:
def __init__(self, group, device, max_pull_size=None, max_push_size=None):
# ...
self.obj = get_custom_all_reduce_cls()(
rank=self.rank, world_size=self.world_size,
pull_buffer_bytes=self.max_pull_size, # default 16MB
push_buffer_bytes=self.max_push_size,
graph_input_count=131072,
)
디스패치 로직
어떤 CustomAllreduce 구현을 사용할지 결정하는 dispatch_custom_allreduce() 함수가 있다.
def dispatch_custom_allreduce():
if _is_cuda and get_bool_env_var("SGLANG_USE_JIT_ALL_REDUCE", default="false"):
from .custom_all_reduce_v2 import CustomAllReduceV2
return CustomAllReduceV2
if _is_cuda or _is_musa:
return CustomAllreduce
# AMD ROCm: AITER 구현 또는 sglang 기본 구현
if get_bool_env_var("SGLANG_USE_AITER_AR", default="true"):
try:
from aiter.dist.device_communicators.custom_all_reduce import (
CustomAllreduce as AiterCustomAllreduce)
return partial(AiterCustomAllreduce, ...)
except ImportError:
return CustomAllreduce
비교: NCCL vs Custom AllReduce
| 특성 | NCCL | Custom AllReduce |
|---|---|---|
| 지연시간 (소규모) | 높음 (커널 런치 오버헤드) | 낮음 (IPC 직접 접근) |
| 대역폭 (대규모) | 최적화됨 | 8MB 이상에서 열세 |
| 멀티노드 | 지원 | 미지원 (같은 노드만) |
| NVLink 필요 | 불필요 | 4+ GPU에서 필수 |
관련 포스트
- SGLang Parallel State: TP/PP/DP/EP 병렬화 상태 관리
- SGLang NCCL & MSCCL++: 집합 통신 라이브러리 통합
- SGLang 하드웨어별 통신: HPU, NPU, XPU 커뮤니케이터
참고
관련 포스트
- [SGLang] Shared Memory Broadcast: 프로세스 간 고속 통신
- [sglang] DeepSeek-V4의 Latency 최적화: Fused mHC Post/Pre Kernel 도입
- [sglang] sglang ROCm MXFP4 어텐션에서 불필요한 contiguous copy 제거를 통한 성능 최적화
- [sglang] sglang의 torch.compile 활용: Advanced Indexing Gather 최적화로 LLM 추론 가속화
- [sglang] sglang diffusion 모델 성능 향상: Cache-DiT와 torch.compile의 최적화된 적용 순서
SGLang 의 다른글
- 이전글 [SGLang] 통신 연산: AllReduce, Broadcast, AllGather 구현
- 현재글 : [SGLang] Custom All-Reduce: NCCL 너머의 최적화된 집합 통신
- 다음글 [SGLang] NCCL & MSCCL++: 집합 통신 라이브러리 통합
댓글