본문으로 건너뛰기

[Ray] 파이프라인 최적 처리량 계산 유틸리티 함수 추가

PR 링크: ray-project/ray#61273 상태: Merged | 변경: +272 / -0

들어가며

데이터 파이프라인의 모든 연산자는 같은 처리량을 유지해야 합니다. 연산자 i의 태스크당 처리 속도가 r_i라면, 처리량 T를 유지하기 위해 T/r_i개의 태스크가 필요합니다. 이 PR은 리소스 제약과 동시성 제한을 고려하여 파이프라인의 최적 처리량과 리소스 할당을 계산하는 유틸리티 함수를 추가합니다.

핵심 코드 분석

리소스 할당 함수

def allocate_resources(
    throughput: float,
    rates: Dict[T, float],
    resource_requirements: Dict[T, ExecutionResources],
) -> Dict[T, ExecutionResources]:
    """파이프라인이 주어진 처리량을 유지하기 위한 리소스를 할당합니다.

    핵심 아이디어: 파이프라인의 모든 연산자는 같은 처리량 T를 유지해야 합니다.
    연산자 i의 태스크당 속도가 r_i이면 T/r_i개 태스크가 필요합니다.
    """
    task_counts = {op: throughput / rate for op, rate in rates.items()}
    return {
        op: resource_requirements[op].scale(task_counts[op])
        for op in rates
    }

최적 처리량 계산

def compute_optimal_throughput(
    rates, resource_requirements, resource_limits, concurrency_limits
) -> float:
    """두 가지 제약 중 더 타이트한 것을 선택합니다:
    1. 리소스 제한: 모든 연산자의 총 리소스 사용이 예산 내에 있어야 함
    2. 동시성 제한: 각 연산자의 태스크 수가 제한을 넘지 않아야 함
    """
    return min(
        _max_throughput_from_resources(rates, resource_requirements, resource_limits),
        _max_throughput_from_concurrency(rates, concurrency_limits),
    )

리소스별 최대 처리량 계산

def _max_throughput_from_resources(rates, resource_requirements, resource_limits):
    for resource_name in ("cpu", "gpu", "memory"):
        resource_limit = getattr(resource_limits, resource_name)
        # 단위 처리량당 리소스 비용 = sum(요구량_i / 속도_i)
        cost_per_unit = sum(
            getattr(resource_requirements[op], resource_name) / rates[op]
            for op in rates
        )
        if cost_per_unit > 0:
            max_throughput = min(max_throughput, resource_limit / cost_per_unit)
    return max_throughput

왜 이게 좋은가

  • 파이프라인 최적화의 핵심 수학을 깔끔한 유틸리티 함수로 캡슐화합니다
  • CPU, GPU, 메모리 세 가지 리소스를 모두 고려하여 병목 리소스를 자동으로 식별합니다
  • 동시성 제한(예: 액터 풀 최대 크기)도 함께 고려합니다
  • 제네릭 타입 T를 사용하여 실제 연산자 클래스에 의존하지 않아 단위 테스트가 쉽습니다
  • Ray Data의 auto-scaler나 리소스 할당 컴포넌트에서 활용할 수 있는 기반 모듈입니다

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글