본문으로 건너뛰기

[Ray] OpResourceAllocator 리팩토링으로 데이터 흐름 명시화

PR 링크: ray-project/ray#57788 상태: Merged | 변경: +497 / -231

들어가며

Ray Data의 실행 엔진은 OpResourceAllocator를 통해 각 연산자에 리소스를 할당합니다. 기존 구현에서는 메서드 시그니처가 내부 상태에 의존하여, 데이터가 어떻게 흘러가는지 코드만 봐서는 파악하기 어려웠습니다. 이 PR은 API를 리팩토링하여 필요한 파라미터를 명시적으로 받도록 하고, progress bar에 예산 및 할당 정보를 추가하여 디버깅 편의성을 높입니다.

핵심 코드 분석

API 데이터 흐름 명시화

Before:

class ResourceBudgetBackpressurePolicy(BackpressurePolicy):
    def can_add_input(self, op: "PhysicalOperator") -> bool:
        budget = self._resource_manager.get_budget(op)
        if budget is None:
            return True
        return op.incremental_resource_usage().satisfies_limit(budget)

After:

class ResourceBudgetBackpressurePolicy(BackpressurePolicy):
    def can_add_input(self, op: "PhysicalOperator") -> bool:
        if self._resource_manager._op_resource_allocator is not None:
            return self._resource_manager._op_resource_allocator.can_submit_new_task(op)
        return True

메트릭 이름 명확화

# Before
task_completion_time_without_backpressure: float

# After
task_completion_time_s: float  # 전체 소요 시간
task_completion_time_excl_backpressure_s: float  # 백프레셔 제외 시간

연산자 리소스 경계 정의 개선

# Before
def min_max_resource_requirements(self):
    """Returns the min and max resources to start the operator
    and make progress."""

# After
def min_max_resource_requirements(self):
    """Returns lower/upper boundary of resource requirements:
    - Minimal: lower bound (min) of resources required to start
    - Maximum: upper bound (max) of how many resources could be utilized."""

왜 이게 좋은가

  1. 암묵적 의존 제거: 리소스 할당 결정에 필요한 데이터를 메서드 파라미터로 명시하여, 코드 리뷰와 디버깅이 쉬워집니다.
  2. 공통 로직 추상화: OpResourceAllocator 기본 클래스에 공통 메서드를 모아, 구현체마다 중복 코드를 줄였습니다.
  3. 디버깅 강화: verbose 모드 progress bar에 예산과 할당 정보를 추가하여, 리소스 병목을 실시간으로 확인할 수 있습니다.
  4. 메트릭 정밀화: task_completion_time_s(전체)와 task_completion_time_excl_backpressure_s(백프레셔 제외)를 분리하여, 순수 작업 시간과 대기 시간을 구분할 수 있습니다.

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글