[Ray] iter_batches에서 프리페치 버퍼링을 올바르게 처리하여 지연시간 안정화
PR 링크: ray-project/ray#58657 상태: Merged | 변경: +10 / -1
들어가며
Ray Data의 iter_batches에서 프리페치는 소비자의 지연시간 요구사항과 처리량 최적화된 데이터 파이프라인 사이의 임피던스 매칭(impedance matching)에 사용됩니다. 그러나 두 가지 문제가 있었습니다: (1) _iter_batches의 큐 깊이가 프리페치 수와 일치하지 않아 버퍼링이 제대로 되지 않았고, (2) _format_in_threadpool의 워커 수가 prefetch_count에 비례하여 생성되어 지연시간 변동이 커졌습니다.
핵심 코드 분석
Before: 큐 깊이 미설정 + 프리페치 비례 워커
def _format_batches(self, batches):
return _format_in_threadpool(
batch_iter=batches,
num_threadpool_workers=self._prefetch_batches, # 프리페치에 비례
)
def _iter_batches(self):
return make_async_gen(
fn=self._pipeline,
num_workers=1,
preserve_ordering=False,
# buffer_size 미설정 -> 프리페치 의도대로 동작하지 않음
)
After: 적절한 큐 깊이 + 워커 수 제한
DEFAULT_FORMAT_THREADPOOL_NUM_WORKERS = env_integer(
"RAY_DATA_MAX_FORMAT_THREADPOOL_NUM_WORKERS", 4
)
def _format_batches(self, batches):
num_threadpool_workers = min(
DEFAULT_FORMAT_THREADPOOL_NUM_WORKERS, self._prefetch_batches
)
return _format_in_threadpool(
batch_iter=batches,
num_threadpool_workers=num_threadpool_workers, # 최대 4로 제한
)
def _iter_batches(self):
return make_async_gen(
fn=self._pipeline,
num_workers=1,
preserve_ordering=False,
buffer_size=max(self._prefetch_batches, 1), # 프리페치에 맞는 버퍼
)
왜 이게 좋은가
- 프리페치 의도 보존:
buffer_size를prefetch_batches로 설정하여 소비자가 요청한 만큼의 배치가 미리 준비됩니다. - 지연시간 안정화: 포맷 워커를 최대 4개로 제한하여 과도한 병렬 포맷팅으로 인한 지연시간 변동(variance)을 줄입니다.
- 환경 변수 설정 가능:
RAY_DATA_MAX_FORMAT_THREADPOOL_NUM_WORKERS로 환경에 맞게 조정할 수 있습니다. - 최소 변경: 10줄 추가로 프리페치 동작의 정확성을 크게 개선합니다.
참고 자료
관련 포스트
PR Analysis 의 다른글
- 이전글 [Ray] Ray 대규모 리소스 뷰 동기화 -- 메시지 배칭으로 개선
- 현재글 : [Ray] iter_batches에서 프리페치 버퍼링을 올바르게 처리하여 지연시간 안정화
- 다음글 [triton] AMD 비동기 복사에서 block 차원 중복 복사 허용
댓글