본문으로 건너뛰기

[Ray] iter_batches 속도 향상: block ref 해석을 배치 ray.get()으로 전환

PR 링크: ray-project/ray#58467 상태: Merged | 변경: +184 / -16

들어가며

Ray Data의 iter_batches()는 내부적으로 resolve_block_refs()를 호출하여 ObjectRef를 실제 Block 데이터로 변환합니다. 기존에는 각 block ref마다 개별 ray.get() 호출을 실행했습니다. ray.get()은 오브젝트 스토어와의 통신을 수반하므로, 여러 ref를 한 번에 해석하면 오버헤드를 크게 줄일 수 있습니다.

핵심 코드 분석

개별 ray.get()을 배치 ray.get()으로 전환

Before (util.py):

def resolve_block_refs(block_ref_iter, stats=None):
    for block_ref in block_ref_iter:
        current_hit, current_miss, current_unknown = _calculate_ref_hits([block_ref])
        hits += current_hit
        # TODO: Optimize further by batching multiple references
        # in a single ray.get() call.
        with stats.iter_get_s.timer() if stats else nullcontext():
            block = ray.get(block_ref)
        yield block

After:

def resolve_block_refs(block_ref_iter, ctx, stats=None, max_get_batch_size=None):
    pending = []
    for block_ref in block_ref_iter:
        pending.append(block_ref)
        if len(pending) >= _get_effective_batch_size():
            for block in _resolve_pending():
                yield block
    for block in _resolve_pending():
        yield block

def _resolve_pending():
    # 한 번의 ray.get()으로 여러 ref를 동시 해석
    blocks = ray.get(pending)
    pending.clear()
    return blocks

동적 배치 크기 결정

def _max_block_get_batch_size(self) -> int:
    prefetched_blocks = self._prefetcher.num_prefetched_blocks()
    if prefetched_blocks <= 0:
        prefetched_blocks = (
            self._prefetch_batches if self._prefetch_batches > 0 else 0
        )
    limit = max(1, prefetched_blocks + 1)
    return min(self._ctx.iter_get_block_batch_size, limit)

배치 크기가 프리페치된 블록 수에 따라 동적으로 조절되어, 아직 준비되지 않은 블록을 기다리느라 파이프라인이 멈추는 것을 방지합니다.

새로운 설정 옵션

DataContext.iter_get_block_batch_size를 통해 사용자가 최대 배치 크기를 조절할 수 있습니다.

왜 이게 좋은가

  • ray.get([ref1, ref2, ...]) 단일 호출이 ray.get(ref1) + ray.get(ref2) + ... 개별 호출보다 훨씬 효율적입니다
  • 오브젝트 스토어 RPC 라운드트립 수가 N에서 N/batch_size로 감소합니다
  • 프리페치와 연동되어 파이프라인 버블을 최소화합니다
  • 기존 TODO 주석(# TODO: Optimize further by batching)을 실제로 구현한 변경입니다

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글