본문으로 건너뛰기

[Ray Data] RAPIDS MPF 기반 GPU 셔플 지원으로 GPU 데이터 처리 파이프라인 가속

PR 링크: ray-project/ray#61371 상태: Merged | 변경: +1935 / -3

들어가며

Ray Data에서 대규모 데이터셋의 셔플 연산은 전통적으로 CPU 메모리를 경유했습니다. GPU에서 처리 중인 데이터를 셔플하려면 GPU -> CPU -> 네트워크 -> CPU -> GPU 경로를 따라야 하는데, 이는 PCIe 대역폭 병목과 불필요한 메모리 복사를 발생시킵니다. 이 PR은 RAPIDS MPF(Multi-Process Framework)와 UCXX를 활용하여 GPU 메모리에서 직접 해시 셔플을 수행하는 GPUShuffleOperator를 추가합니다.

핵심 코드 분석

GPU Shuffle Actor 구조

@ray.remote(num_gpus=1)
class GPUShuffleActor:
    """RAPIDS MPF 기반 분산 셔플 액터.
    UCXX 통신 링으로 구성되며, 데이터가 Ray 오브젝트 스토어나
    CPU를 경유하지 않고 GPU 간 직접 전송됩니다."""

    def __init__(self, nranks, total_nparts, key_columns, ...):
        self._shuffler = BulkRapidsMPFShuffler(
            nranks=nranks,
            total_nparts=total_nparts,
            shuffle_on=key_columns,
            rmm_pool_size=rmm_pool_size,
        )

    def insert_batch(self, batch: pa.Table) -> int:
        df = cudf.DataFrame.from_arrow(batch)
        self._shuffler.insert_chunk(table=df, column_names=self._columns)
        return len(df)

    def finish_and_extract(self) -> Iterator:
        self._shuffler.insert_finished()
        for _, partition in self._shuffler.extract():
            cdf = pylibcudf_to_cudf_dataframe(partition, ...)
            block = cdf.to_arrow(preserve_index=False)
            yield block

UCXX 통신 설정

# Rank 0에서 루트 주소 생성
rank, root_address = actor_0.setup_root.remote()

# 모든 rank에 루트 주소 브로드캐스트
for actor in actors:
    actor.setup_worker.remote(root_address)

왜 이게 좋은가

  1. GPU 직접 전송: UCXX를 통해 GPU 메모리 간 직접 통신하므로, GPU -> CPU -> GPU 복사 오버헤드가 제거된다.
  2. RMM 풀 관리: RAPIDS Memory Manager(RMM) 풀을 사용하여 GPU 메모리 할당/해제의 오버헤드를 줄인다.
  3. 스트리밍 프로토콜: insert_batchfinish_and_extract를 분리하여, 데이터 삽입과 추출을 파이프라인화할 수 있다.
  4. 자동 스필링: spill_memory_limit 옵션으로 GPU 메모리 부족 시 자동으로 호스트 메모리로 스필링한다.

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글