[SGLang] 하드웨어별 통신: HPU, NPU, XPU 커뮤니케이터
들어가며
SGLang의 분산 통신은 NVIDIA GPU를 전제로 설계되었지만, 실제로는 세 가지 비-NVIDIA 하드웨어를 지원한다: Intel Gaudi(HPU), Huawei Ascend(NPU), Intel GPU(XPU). 각각의 하드웨어는 torch.distributed의 커스텀 백엔드를 사용하며, 전용 커뮤니케이터 클래스가 하드웨어 고유의 quirk를 처리한다.
이 세 커뮤니케이터는 GroupCoordinator.__init__()에서 조건부로 생성되며, all_reduce()의 디스패치 체인에서 최우선으로 검사된다.
구조도
GroupCoordinator.all_reduce(input_)
│
├── [1순위] hpu_communicator.all_reduce() ── Intel Gaudi (HPU)
│ Backend: HCCL
│
├── [2순위] xpu_communicator.all_reduce() ── Intel GPU (XPU)
│ Backend: XCCL
│
├── [3순위] npu_communicator.all_reduce() ── Huawei Ascend (NPU)
│ Backend: HCCL
│
└── [4순위 이하] PyNccl → Custom AR → torch.distributed
(NVIDIA GPU 경로)
디바이스 → 백엔드 매핑:
"cuda" → "nccl"
"hpu" → "hccl"
"npu" → "hccl"
"xpu" → "xccl"
"musa" → "mccl"
"cpu" → "gloo"
핵심 코드 분석
GroupCoordinator에서의 생성
세 커뮤니케이터 모두 GroupCoordinator.__init__()에서 동일한 패턴으로 생성된다.
# parallel_state.py - GroupCoordinator.__init__()
from sglang.srt.distributed.device_communicators.hpu_communicator import HpuCommunicator
from sglang.srt.distributed.device_communicators.npu_communicator import NpuCommunicator
from sglang.srt.distributed.device_communicators.xpu_communicator import XpuCommunicator
self.hpu_communicator: Optional[HpuCommunicator] = None
if use_hpu_communicator and self.world_size > 1:
self.hpu_communicator = HpuCommunicator(group=self.device_group)
self.xpu_communicator: Optional[XpuCommunicator] = None
if use_xpu_communicator and self.world_size > 1:
self.xpu_communicator = XpuCommunicator(group=self.device_group)
self.npu_communicator: Optional[NpuCommunicator] = None
if use_npu_communicator and self.world_size > 1:
self.npu_communicator = NpuCommunicator(group=self.device_group)
use_hpu_communicator, use_xpu_communicator, use_npu_communicator는 init_model_parallel_group()에서 모두 True로 설정된다. 실제 활성화 여부는 각 커뮤니케이터의 __init__에서 is_hpu(), is_xpu(), is_npu() 검사로 결정된다.
HpuCommunicator (Intel Gaudi)
class HpuCommunicator:
def __init__(self, group: ProcessGroup):
if not is_hpu():
self.disabled = True
return
self.disabled = False
self.group = group
self.world_size = dist.get_world_size(self.group)
def all_reduce(self, x: torch.Tensor) -> torch.Tensor:
# FIXME: workaround for a bug in Habana PT bridge
# occurring when PT_HPU_ENABLE_LAZY_COLLECTIVES=true
htorch.core.mark_step()
dist.all_reduce(x, group=self.group)
return x
def all_gather(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor:
world_size = self.world_size
if dim < 0:
dim += x.dim()
input_size = x.size()
output_tensor = torch.empty(
(world_size,) + input_size, dtype=x.dtype, device=x.device)
htorch.core.mark_step()
dist.all_gather_into_tensor(output_tensor, x, group=self.group)
output_tensor = output_tensor.movedim(0, dim)
output_tensor = output_tensor.reshape(
input_size[:dim] + (world_size * input_size[dim],) + input_size[dim + 1:])
return output_tensor
핵심 quirk: htorch.core.mark_step() 호출이 필수다. Intel Gaudi의 Lazy Execution 모드에서 집합 통신 전에 이전 연산을 확정(materialize)해야 한다. 이 호출 없이는 PT_HPU_ENABLE_LAZY_COLLECTIVES=true 환경에서 버그가 발생한다.
NpuCommunicator (Huawei Ascend)
class NpuCommunicator:
def __init__(self, group: ProcessGroup):
if not is_npu():
self.disabled = True
return
self.disabled = False
self.group = group
self.world_size = dist.get_world_size(self.group)
def all_reduce(self, x: torch.Tensor) -> torch.Tensor:
dist.all_reduce(x, group=self.group)
return x
def all_gather(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor:
world_size = self.world_size
if dim < 0:
dim += x.dim()
input_size = x.size()
output_size = (input_size[0] * world_size,) + input_size[1:]
output_tensor = torch.empty(output_size, dtype=x.dtype, device=x.device)
dist.all_gather_into_tensor(output_tensor, x, group=self.group)
output_tensor = output_tensor.reshape((world_size,) + input_size)
output_tensor = output_tensor.movedim(0, dim)
output_tensor = output_tensor.reshape(
input_size[:dim] + (world_size * input_size[dim],) + input_size[dim + 1:])
return output_tensor
NPU의 구현은 가장 깔끔하다. 특별한 workaround 없이 torch.distributed를 직접 호출한다. 다만 HCCL 백엔드에 전용 옵션을 전달하는 부분이 별도로 존재한다:
# parallel_state.py
def get_torch_distributed_pg_options(group_name=None):
if not _is_npu:
return None
if group_name is not None and "moe" not in group_name:
return None
import torch_npu
options = torch_npu._C._distributed_c10d.ProcessGroupHCCL.Options()
hccl_buffer_size = int(
os.environ.get("DEEPEP_HCCL_BUFFSIZE") or
os.environ.get("HCCL_BUFFSIZE") or 200)
options.hccl_config = {"hccl_buffer_size": hccl_buffer_size}
return options
MoE 그룹에서는 HCCL 버퍼 크기를 DEEPEP_HCCL_BUFFSIZE 환경변수로 조정할 수 있다.
XpuCommunicator (Intel GPU)
class XpuCommunicator:
def __init__(self, group: ProcessGroup):
if not is_xpu():
self.disabled = True
return
self.disabled = False
self.group = group
self.world_size = dist.get_world_size(self.group)
def all_reduce(self, x: torch.Tensor) -> torch.Tensor:
dist.all_reduce(x, group=self.group)
return x
def gather(self, input_, rank_in_group, dst=0, dim=-1):
# For xpu path, gather doesn't work properly together with ray
# cluster so we use all_gather instead for now.
output_tensor = torch.empty(
(self.world_size,) + input_.size(), dtype=input_.dtype, device=input_.device)
torch.distributed.all_gather_into_tensor(
output_tensor, input_, group=self.group)
if rank_in_group == dst:
output_tensor = output_tensor.movedim(0, dim)
output_tensor = output_tensor.reshape(...)
else:
output_tensor = None
return output_tensor
핵심 quirk: gather()를 all_gather()로 대체한다. Intel XPU에서 gather가 Ray 클러스터와 함께 동작하지 않는 버그 때문이다. 모든 랭크가 전체 데이터를 받은 후, dst가 아닌 랭크는 None을 반환한다.
AllReduce 디스패치 우선순위
# GroupCoordinator.all_reduce()
def all_reduce(self, input_):
if self.world_size == 1:
return input_
# 하드웨어별 커뮤니케이터가 최우선
if self.hpu_communicator is not None and not self.hpu_communicator.disabled:
return self.hpu_communicator.all_reduce(input_)
if self.xpu_communicator is not None and not self.xpu_communicator.disabled:
return self.xpu_communicator.all_reduce(input_)
if self.npu_communicator is not None and not self.npu_communicator.disabled:
return self.npu_communicator.all_reduce(input_)
# 이후 NVIDIA GPU 경로: PyNccl, Custom AR, torch.distributed
...
비교: 세 하드웨어 커뮤니케이터
| 특성 | HPU (Gaudi) | NPU (Ascend) | XPU (Intel GPU) |
|---|---|---|---|
| 백엔드 | HCCL | HCCL | XCCL |
| 특수 처리 | mark_step() 필요 | HCCL 버퍼 크기 설정 | gather → all_gather 우회 |
| all_reduce | in-place | in-place | in-place |
| all_gather | 지원 | 지원 | 지원 |
| gather | 미구현 | 미구현 | all_gather로 에뮬레이션 |
| PyNccl 사용 | 불가 | 불가 | 불가 |
세 커뮤니케이터 모두 torch.distributed를 내부적으로 호출한다는 공통점이 있다. NVIDIA의 Custom AllReduce나 PyNccl 같은 저수준 최적화는 적용되지 않으며, 각 하드웨어 벤더의 통신 라이브러리(HCCL, XCCL)에 의존한다.
설계 근거
왜 별도 커뮤니케이터 클래스가 필요한가? 단순히 torch.distributed.all_reduce()를 호출하면 될 것 같지만, 하드웨어별 quirk가 있다. HPU의 mark_step(), NPU의 HCCL 옵션, XPU의 gather 우회 등은 공통 코드로 처리할 수 없다.
왜 모두 use_xxx_communicator=True로 설정하는가? 각 커뮤니케이터의 __init__에서 is_hpu(), is_npu(), is_xpu() 검사를 수행하므로, 해당 하드웨어가 아니면 self.disabled = True로 설정된다. 조건부 import보다 이 패턴이 더 깔끔하다.
관련 포스트
- SGLang Parallel State: TP/PP/DP/EP 병렬화 상태 관리
- SGLang NCCL & MSCCL++: 집합 통신 라이브러리 통합
- SGLang Custom All-Reduce: NCCL 너머의 최적화된 집합 통신
참고
관련 포스트
SGLang 의 다른글
- 이전글 [SGLang] Shared Memory Broadcast: 프로세스 간 고속 통신
- 현재글 : [SGLang] 하드웨어별 통신: HPU, NPU, XPU 커뮤니케이터
- 다음글 [SGLang] Prefill-Decode Disaggregation 개요: PD 분리 아키텍처
댓글