[Ray Data] RAPIDS MPF 기반 GPU 셔플 지원으로 GPU 데이터 처리 파이프라인 가속
PR 링크: ray-project/ray#61371 상태: Merged | 변경: +1935 / -3
들어가며
Ray Data에서 대규모 데이터셋의 셔플 연산은 전통적으로 CPU 메모리를 경유했습니다. GPU에서 처리 중인 데이터를 셔플하려면 GPU -> CPU -> 네트워크 -> CPU -> GPU 경로를 따라야 하는데, 이는 PCIe 대역폭 병목과 불필요한 메모리 복사를 발생시킵니다. 이 PR은 RAPIDS MPF(Multi-Process Framework)와 UCXX를 활용하여 GPU 메모리에서 직접 해시 셔플을 수행하는 GPUShuffleOperator를 추가합니다.
핵심 코드 분석
GPU Shuffle Actor 구조
@ray.remote(num_gpus=1)
class GPUShuffleActor:
"""RAPIDS MPF 기반 분산 셔플 액터.
UCXX 통신 링으로 구성되며, 데이터가 Ray 오브젝트 스토어나
CPU를 경유하지 않고 GPU 간 직접 전송됩니다."""
def __init__(self, nranks, total_nparts, key_columns, ...):
self._shuffler = BulkRapidsMPFShuffler(
nranks=nranks,
total_nparts=total_nparts,
shuffle_on=key_columns,
rmm_pool_size=rmm_pool_size,
)
def insert_batch(self, batch: pa.Table) -> int:
df = cudf.DataFrame.from_arrow(batch)
self._shuffler.insert_chunk(table=df, column_names=self._columns)
return len(df)
def finish_and_extract(self) -> Iterator:
self._shuffler.insert_finished()
for _, partition in self._shuffler.extract():
cdf = pylibcudf_to_cudf_dataframe(partition, ...)
block = cdf.to_arrow(preserve_index=False)
yield block
UCXX 통신 설정
# Rank 0에서 루트 주소 생성
rank, root_address = actor_0.setup_root.remote()
# 모든 rank에 루트 주소 브로드캐스트
for actor in actors:
actor.setup_worker.remote(root_address)
왜 이게 좋은가
- GPU 직접 전송: UCXX를 통해 GPU 메모리 간 직접 통신하므로, GPU -> CPU -> GPU 복사 오버헤드가 제거된다.
- RMM 풀 관리: RAPIDS Memory Manager(RMM) 풀을 사용하여 GPU 메모리 할당/해제의 오버헤드를 줄인다.
- 스트리밍 프로토콜:
insert_batch와finish_and_extract를 분리하여, 데이터 삽입과 추출을 파이프라인화할 수 있다. - 자동 스필링:
spill_memory_limit옵션으로 GPU 메모리 부족 시 자동으로 호스트 메모리로 스필링한다.
참고 자료
관련 포스트
PR Analysis 의 다른글
- 이전글 [axolotl] Async GRPO 지원: vLLM 비동기 생성과 Importance Sampling으로 RLHF 학습 가속화
- 현재글 : [Ray Data] RAPIDS MPF 기반 GPU 셔플 지원으로 GPU 데이터 처리 파이프라인 가속
- 다음글 [triton] AMD gfx1250에서 Async Copy와 TDM 경로의 Padded Layout 휴리스틱 통합
댓글