본문으로 건너뛰기

[Ray] StreamingRepartition과 MapBatches 연산자 퓨전으로 스케줄링 오버헤드 제거

PR 링크: ray-project/ray#59108 상태: Merged | 변경: +356 / -55

들어가며

Ray Data 파이프라인에서 MapBatchesStreamingRepartition이 연속으로 배치되면, 각각 별도의 스케줄링 단계를 거칩니다. batch_sizetarget_num_rows_per_block이 동일할 때 이 두 연산자를 퓨전하면 불필요한 중간 스케줄링과 데이터 셔플을 제거할 수 있습니다. 이 PR은 이 최적화를 구현합니다.

핵심 코드 분석

Before: 별도 스케줄링

map_batches(fn, batch_size=64) -> streaming_repartition(target_num_rows=64)

두 연산자가 독립적으로 스케줄링되어, 중간 결과를 직렬화/역직렬화하고 별도의 태스크로 실행됩니다.

After: 퓨전된 단일 연산자

# operator_fusion.py
def _fuse_streaming_repartition_operators_in_dag(self, dag):
    upstream_ops = dag.input_dependencies
    while (
        len(upstream_ops) == 1
        and isinstance(self._op_map[dag], StreamingRepartition)
        and isinstance(self._op_map[upstream_ops[0]], MapBatches)
        and self._can_fuse(dag, upstream_ops[0])
    ):
        dag = self._get_fused_streaming_repartition_operator(dag, upstream_ops[0])
        upstream_ops = dag.input_dependencies
    return dag

퓨전 조건은 batch_size == target_num_rows_per_block일 때만 적용됩니다. 퓨전된 연산자는 supports_fusion=False로 설정되어 추가 퓨전을 방지합니다.

퓨전 규칙 예시

map -> s_r -> map -> map  =>  (map -> s_r) -> (map -> map)
s_r -> map -> map -> s_r  =>  s_r -> map -> (map -> s_r)

왜 이게 좋은가

1. 스케줄링 오버헤드 제거

퓨전하지 않으면 중간 단계에서 블록을 직렬화하고 새 태스크를 스케줄링해야 합니다. 퓨전하면 단일 태스크 안에서 map 함수 실행과 블록 크기 조정이 함께 이루어집니다.

2. LLM 배치 추론의 collate 최적화

이 PR의 주요 동기는 LLM 배치 추론에서 collate 연산을 최적화하는 것입니다. collate는 여러 샘플을 하나의 배치로 묶는 과정인데, MapBatches로 collate 후 StreamingRepartition으로 블록을 재분배하는 패턴이 일반적입니다. 퓨전으로 이 과정의 레이턴시를 크게 줄일 수 있습니다.

3. 단일 텐서 최적화

# torch_utils.py
if len(tensor_sequence) == 1 and (
    device is None or tensor_sequence[0].device == torch.device(device)
):
    return tensor_sequence[0]

텐서가 하나뿐이고 이미 올바른 디바이스에 있을 때 불필요한 concat을 건너뛰는 fast-path도 추가되었습니다.

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글