[Ray] 파이프라인 최적 처리량 계산 유틸리티 함수 추가
PR 링크: ray-project/ray#61273 상태: Merged | 변경: +272 / -0
들어가며
데이터 파이프라인의 모든 연산자는 같은 처리량을 유지해야 합니다. 연산자 i의 태스크당 처리 속도가 r_i라면, 처리량 T를 유지하기 위해 T/r_i개의 태스크가 필요합니다. 이 PR은 리소스 제약과 동시성 제한을 고려하여 파이프라인의 최적 처리량과 리소스 할당을 계산하는 유틸리티 함수를 추가합니다.
핵심 코드 분석
리소스 할당 함수
def allocate_resources(
throughput: float,
rates: Dict[T, float],
resource_requirements: Dict[T, ExecutionResources],
) -> Dict[T, ExecutionResources]:
"""파이프라인이 주어진 처리량을 유지하기 위한 리소스를 할당합니다.
핵심 아이디어: 파이프라인의 모든 연산자는 같은 처리량 T를 유지해야 합니다.
연산자 i의 태스크당 속도가 r_i이면 T/r_i개 태스크가 필요합니다.
"""
task_counts = {op: throughput / rate for op, rate in rates.items()}
return {
op: resource_requirements[op].scale(task_counts[op])
for op in rates
}
최적 처리량 계산
def compute_optimal_throughput(
rates, resource_requirements, resource_limits, concurrency_limits
) -> float:
"""두 가지 제약 중 더 타이트한 것을 선택합니다:
1. 리소스 제한: 모든 연산자의 총 리소스 사용이 예산 내에 있어야 함
2. 동시성 제한: 각 연산자의 태스크 수가 제한을 넘지 않아야 함
"""
return min(
_max_throughput_from_resources(rates, resource_requirements, resource_limits),
_max_throughput_from_concurrency(rates, concurrency_limits),
)
리소스별 최대 처리량 계산
def _max_throughput_from_resources(rates, resource_requirements, resource_limits):
for resource_name in ("cpu", "gpu", "memory"):
resource_limit = getattr(resource_limits, resource_name)
# 단위 처리량당 리소스 비용 = sum(요구량_i / 속도_i)
cost_per_unit = sum(
getattr(resource_requirements[op], resource_name) / rates[op]
for op in rates
)
if cost_per_unit > 0:
max_throughput = min(max_throughput, resource_limit / cost_per_unit)
return max_throughput
왜 이게 좋은가
- 파이프라인 최적화의 핵심 수학을 깔끔한 유틸리티 함수로 캡슐화합니다
- CPU, GPU, 메모리 세 가지 리소스를 모두 고려하여 병목 리소스를 자동으로 식별합니다
- 동시성 제한(예: 액터 풀 최대 크기)도 함께 고려합니다
- 제네릭 타입
T를 사용하여 실제 연산자 클래스에 의존하지 않아 단위 테스트가 쉽습니다 - Ray Data의 auto-scaler나 리소스 할당 컴포넌트에서 활용할 수 있는 기반 모듈입니다
참고 자료
관련 포스트
PR Analysis 의 다른글
- 이전글 [triton] NVIDIA inval_barrier를 leader CTA에서만 실행하도록 변경
- 현재글 : [Ray] 파이프라인 최적 처리량 계산 유틸리티 함수 추가
- 다음글 [Ray] RLlib 커넥터와 배치 유틸리티에 ndarray 빠른 경로 추가
댓글