본문으로 건너뛰기

[SGLang] 하드웨어별 통신: HPU, NPU, XPU 커뮤니케이터

들어가며

SGLang의 분산 통신은 NVIDIA GPU를 전제로 설계되었지만, 실제로는 세 가지 비-NVIDIA 하드웨어를 지원한다: Intel Gaudi(HPU), Huawei Ascend(NPU), Intel GPU(XPU). 각각의 하드웨어는 torch.distributed의 커스텀 백엔드를 사용하며, 전용 커뮤니케이터 클래스가 하드웨어 고유의 quirk를 처리한다.

이 세 커뮤니케이터는 GroupCoordinator.__init__()에서 조건부로 생성되며, all_reduce()의 디스패치 체인에서 최우선으로 검사된다.

구조도

GroupCoordinator.all_reduce(input_)
    │
    ├── [1순위] hpu_communicator.all_reduce()  ── Intel Gaudi (HPU)
    │                                               Backend: HCCL
    │
    ├── [2순위] xpu_communicator.all_reduce()  ── Intel GPU (XPU)
    │                                               Backend: XCCL
    │
    ├── [3순위] npu_communicator.all_reduce()  ── Huawei Ascend (NPU)
    │                                               Backend: HCCL
    │
    └── [4순위 이하] PyNccl → Custom AR → torch.distributed
                                            (NVIDIA GPU 경로)

디바이스 → 백엔드 매핑:
  "cuda" → "nccl"
  "hpu"  → "hccl"
  "npu"  → "hccl"
  "xpu"  → "xccl"
  "musa" → "mccl"
  "cpu"  → "gloo"

핵심 코드 분석

GroupCoordinator에서의 생성

세 커뮤니케이터 모두 GroupCoordinator.__init__()에서 동일한 패턴으로 생성된다.

# parallel_state.py - GroupCoordinator.__init__()
from sglang.srt.distributed.device_communicators.hpu_communicator import HpuCommunicator
from sglang.srt.distributed.device_communicators.npu_communicator import NpuCommunicator
from sglang.srt.distributed.device_communicators.xpu_communicator import XpuCommunicator

self.hpu_communicator: Optional[HpuCommunicator] = None
if use_hpu_communicator and self.world_size > 1:
    self.hpu_communicator = HpuCommunicator(group=self.device_group)

self.xpu_communicator: Optional[XpuCommunicator] = None
if use_xpu_communicator and self.world_size > 1:
    self.xpu_communicator = XpuCommunicator(group=self.device_group)

self.npu_communicator: Optional[NpuCommunicator] = None
if use_npu_communicator and self.world_size > 1:
    self.npu_communicator = NpuCommunicator(group=self.device_group)

use_hpu_communicator, use_xpu_communicator, use_npu_communicatorinit_model_parallel_group()에서 모두 True로 설정된다. 실제 활성화 여부는 각 커뮤니케이터의 __init__에서 is_hpu(), is_xpu(), is_npu() 검사로 결정된다.

HpuCommunicator (Intel Gaudi)

class HpuCommunicator:
    def __init__(self, group: ProcessGroup):
        if not is_hpu():
            self.disabled = True
            return
        self.disabled = False
        self.group = group
        self.world_size = dist.get_world_size(self.group)

    def all_reduce(self, x: torch.Tensor) -> torch.Tensor:
        # FIXME: workaround for a bug in Habana PT bridge
        # occurring when PT_HPU_ENABLE_LAZY_COLLECTIVES=true
        htorch.core.mark_step()
        dist.all_reduce(x, group=self.group)
        return x

    def all_gather(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor:
        world_size = self.world_size
        if dim < 0:
            dim += x.dim()
        input_size = x.size()
        output_tensor = torch.empty(
            (world_size,) + input_size, dtype=x.dtype, device=x.device)
        htorch.core.mark_step()
        dist.all_gather_into_tensor(output_tensor, x, group=self.group)
        output_tensor = output_tensor.movedim(0, dim)
        output_tensor = output_tensor.reshape(
            input_size[:dim] + (world_size * input_size[dim],) + input_size[dim + 1:])
        return output_tensor

핵심 quirk: htorch.core.mark_step() 호출이 필수다. Intel Gaudi의 Lazy Execution 모드에서 집합 통신 전에 이전 연산을 확정(materialize)해야 한다. 이 호출 없이는 PT_HPU_ENABLE_LAZY_COLLECTIVES=true 환경에서 버그가 발생한다.

NpuCommunicator (Huawei Ascend)

class NpuCommunicator:
    def __init__(self, group: ProcessGroup):
        if not is_npu():
            self.disabled = True
            return
        self.disabled = False
        self.group = group
        self.world_size = dist.get_world_size(self.group)

    def all_reduce(self, x: torch.Tensor) -> torch.Tensor:
        dist.all_reduce(x, group=self.group)
        return x

    def all_gather(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor:
        world_size = self.world_size
        if dim < 0:
            dim += x.dim()
        input_size = x.size()
        output_size = (input_size[0] * world_size,) + input_size[1:]
        output_tensor = torch.empty(output_size, dtype=x.dtype, device=x.device)
        dist.all_gather_into_tensor(output_tensor, x, group=self.group)
        output_tensor = output_tensor.reshape((world_size,) + input_size)
        output_tensor = output_tensor.movedim(0, dim)
        output_tensor = output_tensor.reshape(
            input_size[:dim] + (world_size * input_size[dim],) + input_size[dim + 1:])
        return output_tensor

NPU의 구현은 가장 깔끔하다. 특별한 workaround 없이 torch.distributed를 직접 호출한다. 다만 HCCL 백엔드에 전용 옵션을 전달하는 부분이 별도로 존재한다:

# parallel_state.py
def get_torch_distributed_pg_options(group_name=None):
    if not _is_npu:
        return None
    if group_name is not None and "moe" not in group_name:
        return None
    import torch_npu
    options = torch_npu._C._distributed_c10d.ProcessGroupHCCL.Options()
    hccl_buffer_size = int(
        os.environ.get("DEEPEP_HCCL_BUFFSIZE") or
        os.environ.get("HCCL_BUFFSIZE") or 200)
    options.hccl_config = {"hccl_buffer_size": hccl_buffer_size}
    return options

MoE 그룹에서는 HCCL 버퍼 크기를 DEEPEP_HCCL_BUFFSIZE 환경변수로 조정할 수 있다.

XpuCommunicator (Intel GPU)

class XpuCommunicator:
    def __init__(self, group: ProcessGroup):
        if not is_xpu():
            self.disabled = True
            return
        self.disabled = False
        self.group = group
        self.world_size = dist.get_world_size(self.group)

    def all_reduce(self, x: torch.Tensor) -> torch.Tensor:
        dist.all_reduce(x, group=self.group)
        return x

    def gather(self, input_, rank_in_group, dst=0, dim=-1):
        # For xpu path, gather doesn't work properly together with ray
        # cluster so we use all_gather instead for now.
        output_tensor = torch.empty(
            (self.world_size,) + input_.size(), dtype=input_.dtype, device=input_.device)
        torch.distributed.all_gather_into_tensor(
            output_tensor, input_, group=self.group)
        if rank_in_group == dst:
            output_tensor = output_tensor.movedim(0, dim)
            output_tensor = output_tensor.reshape(...)
        else:
            output_tensor = None
        return output_tensor

핵심 quirk: gather()all_gather()로 대체한다. Intel XPU에서 gather가 Ray 클러스터와 함께 동작하지 않는 버그 때문이다. 모든 랭크가 전체 데이터를 받은 후, dst가 아닌 랭크는 None을 반환한다.

AllReduce 디스패치 우선순위

# GroupCoordinator.all_reduce()
def all_reduce(self, input_):
    if self.world_size == 1:
        return input_

    # 하드웨어별 커뮤니케이터가 최우선
    if self.hpu_communicator is not None and not self.hpu_communicator.disabled:
        return self.hpu_communicator.all_reduce(input_)
    if self.xpu_communicator is not None and not self.xpu_communicator.disabled:
        return self.xpu_communicator.all_reduce(input_)
    if self.npu_communicator is not None and not self.npu_communicator.disabled:
        return self.npu_communicator.all_reduce(input_)

    # 이후 NVIDIA GPU 경로: PyNccl, Custom AR, torch.distributed
    ...

비교: 세 하드웨어 커뮤니케이터

특성 HPU (Gaudi) NPU (Ascend) XPU (Intel GPU)
백엔드 HCCL HCCL XCCL
특수 처리 mark_step() 필요 HCCL 버퍼 크기 설정 gather → all_gather 우회
all_reduce in-place in-place in-place
all_gather 지원 지원 지원
gather 미구현 미구현 all_gather로 에뮬레이션
PyNccl 사용 불가 불가 불가

세 커뮤니케이터 모두 torch.distributed를 내부적으로 호출한다는 공통점이 있다. NVIDIA의 Custom AllReduce나 PyNccl 같은 저수준 최적화는 적용되지 않으며, 각 하드웨어 벤더의 통신 라이브러리(HCCL, XCCL)에 의존한다.

설계 근거

왜 별도 커뮤니케이터 클래스가 필요한가? 단순히 torch.distributed.all_reduce()를 호출하면 될 것 같지만, 하드웨어별 quirk가 있다. HPU의 mark_step(), NPU의 HCCL 옵션, XPU의 gather 우회 등은 공통 코드로 처리할 수 없다.

왜 모두 use_xxx_communicator=True로 설정하는가? 각 커뮤니케이터의 __init__에서 is_hpu(), is_npu(), is_xpu() 검사를 수행하므로, 해당 하드웨어가 아니면 self.disabled = True로 설정된다. 조건부 import보다 이 패턴이 더 깔끔하다.

관련 포스트

참고

댓글

관련 포스트

SGLang 의 다른글