본문으로 건너뛰기

[Ray] iter_batches에서 프리페치 버퍼링을 올바르게 처리하여 지연시간 안정화

PR 링크: ray-project/ray#58657 상태: Merged | 변경: +10 / -1

들어가며

Ray Data의 iter_batches에서 프리페치는 소비자의 지연시간 요구사항과 처리량 최적화된 데이터 파이프라인 사이의 임피던스 매칭(impedance matching)에 사용됩니다. 그러나 두 가지 문제가 있었습니다: (1) _iter_batches의 큐 깊이가 프리페치 수와 일치하지 않아 버퍼링이 제대로 되지 않았고, (2) _format_in_threadpool의 워커 수가 prefetch_count에 비례하여 생성되어 지연시간 변동이 커졌습니다.

핵심 코드 분석

Before: 큐 깊이 미설정 + 프리페치 비례 워커

def _format_batches(self, batches):
    return _format_in_threadpool(
        batch_iter=batches,
        num_threadpool_workers=self._prefetch_batches,  # 프리페치에 비례
    )

def _iter_batches(self):
    return make_async_gen(
        fn=self._pipeline,
        num_workers=1,
        preserve_ordering=False,
        # buffer_size 미설정 -> 프리페치 의도대로 동작하지 않음
    )

After: 적절한 큐 깊이 + 워커 수 제한

DEFAULT_FORMAT_THREADPOOL_NUM_WORKERS = env_integer(
    "RAY_DATA_MAX_FORMAT_THREADPOOL_NUM_WORKERS", 4
)

def _format_batches(self, batches):
    num_threadpool_workers = min(
        DEFAULT_FORMAT_THREADPOOL_NUM_WORKERS, self._prefetch_batches
    )
    return _format_in_threadpool(
        batch_iter=batches,
        num_threadpool_workers=num_threadpool_workers,  # 최대 4로 제한
    )

def _iter_batches(self):
    return make_async_gen(
        fn=self._pipeline,
        num_workers=1,
        preserve_ordering=False,
        buffer_size=max(self._prefetch_batches, 1),  # 프리페치에 맞는 버퍼
    )

왜 이게 좋은가

  • 프리페치 의도 보존: buffer_sizeprefetch_batches로 설정하여 소비자가 요청한 만큼의 배치가 미리 준비됩니다.
  • 지연시간 안정화: 포맷 워커를 최대 4개로 제한하여 과도한 병렬 포맷팅으로 인한 지연시간 변동(variance)을 줄입니다.
  • 환경 변수 설정 가능: RAY_DATA_MAX_FORMAT_THREADPOOL_NUM_WORKERS로 환경에 맞게 조정할 수 있습니다.
  • 최소 변경: 10줄 추가로 프리페치 동작의 정확성을 크게 개선합니다.

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글