[Ray] StreamingRepartition과 MapBatches 연산자 퓨전으로 스케줄링 오버헤드 제거
PR 링크: ray-project/ray#59108 상태: Merged | 변경: +356 / -55
들어가며
Ray Data 파이프라인에서 MapBatches와 StreamingRepartition이 연속으로 배치되면, 각각 별도의 스케줄링 단계를 거칩니다. batch_size와 target_num_rows_per_block이 동일할 때 이 두 연산자를 퓨전하면 불필요한 중간 스케줄링과 데이터 셔플을 제거할 수 있습니다. 이 PR은 이 최적화를 구현합니다.
핵심 코드 분석
Before: 별도 스케줄링
map_batches(fn, batch_size=64) -> streaming_repartition(target_num_rows=64)
두 연산자가 독립적으로 스케줄링되어, 중간 결과를 직렬화/역직렬화하고 별도의 태스크로 실행됩니다.
After: 퓨전된 단일 연산자
# operator_fusion.py
def _fuse_streaming_repartition_operators_in_dag(self, dag):
upstream_ops = dag.input_dependencies
while (
len(upstream_ops) == 1
and isinstance(self._op_map[dag], StreamingRepartition)
and isinstance(self._op_map[upstream_ops[0]], MapBatches)
and self._can_fuse(dag, upstream_ops[0])
):
dag = self._get_fused_streaming_repartition_operator(dag, upstream_ops[0])
upstream_ops = dag.input_dependencies
return dag
퓨전 조건은 batch_size == target_num_rows_per_block일 때만 적용됩니다. 퓨전된 연산자는 supports_fusion=False로 설정되어 추가 퓨전을 방지합니다.
퓨전 규칙 예시
map -> s_r -> map -> map => (map -> s_r) -> (map -> map)
s_r -> map -> map -> s_r => s_r -> map -> (map -> s_r)
왜 이게 좋은가
1. 스케줄링 오버헤드 제거
퓨전하지 않으면 중간 단계에서 블록을 직렬화하고 새 태스크를 스케줄링해야 합니다. 퓨전하면 단일 태스크 안에서 map 함수 실행과 블록 크기 조정이 함께 이루어집니다.
2. LLM 배치 추론의 collate 최적화
이 PR의 주요 동기는 LLM 배치 추론에서 collate 연산을 최적화하는 것입니다. collate는 여러 샘플을 하나의 배치로 묶는 과정인데, MapBatches로 collate 후 StreamingRepartition으로 블록을 재분배하는 패턴이 일반적입니다. 퓨전으로 이 과정의 레이턴시를 크게 줄일 수 있습니다.
3. 단일 텐서 최적화
# torch_utils.py
if len(tensor_sequence) == 1 and (
device is None or tensor_sequence[0].device == torch.device(device)
):
return tensor_sequence[0]
텐서가 하나뿐이고 이미 올바른 디바이스에 있을 때 불필요한 concat을 건너뛰는 fast-path도 추가되었습니다.
참고 자료
- Ray Data Operator Fusion 문서 — Ray Data의 연산자 퓨전 전략
- Operator Fusion in Query Engines — 쿼리 엔진에서의 연산자 퓨전 이론
관련 포스트
PR Analysis 의 다른글
- 이전글 [vllm] xxHash로 Prefix Caching 해싱 성능 가속
- 현재글 : [Ray] StreamingRepartition과 MapBatches 연산자 퓨전으로 스케줄링 오버헤드 제거
- 다음글 [Triton] SwiGLU exp2 최적화 부분 롤백 — 수치 정확도 우선
댓글