본문으로 건너뛰기

[Ray] 다중 입력 연산자의 메모리 귀속 오류 수정으로 데드락 해결

PR 링크: ray-project/ray#61208 상태: Merged | 변경: +107 / -45

들어가며

Ray Data의 실행 엔진에서 Union이나 Zip 같은 다중 입력 연산자는 여러 업스트림 연산자로부터 블록을 받습니다. 기존에는 리소스 매니저가 각 업스트림에 다운스트림의 전체 내부 큐 크기를 귀속시켰습니다. 즉, 입력 A가 10B, 입력 B가 90B를 큐에 넣었을 때, A와 B 모두 100B를 사용한 것으로 계산되어 이중 계산(double-counting)이 발생했습니다.

이 문제는 preserve_order=True 옵션에서 데드락을 유발합니다. Union이 라운드 로빈 순서로 느린 입력을 기다리는데, 빠른 입력이 큐를 채워 전체 큐 크기가 커지면 느린 입력까지 백프레셔를 받아 블록을 생산하지 못하게 됩니다.

핵심 코드 분석

입력별 큐 분리

Before:

class OpRuntimeMetrics:
    def __init__(self, op):
        self._internal_inqueue = create_bundle_queue()

    def on_input_queued(self, input: RefBundle):
        self._internal_inqueue.add(input)

    def on_input_dequeued(self, input: RefBundle):
        self._internal_inqueue.remove(input)

    @property
    def obj_store_mem_internal_inqueue(self) -> int:
        return self._internal_inqueue.estimate_size_bytes()

After:

class OpRuntimeMetrics:
    def __init__(self, op):
        num_inputs = max(len(op.input_dependencies), 1)
        self._internal_inqueues = [
            create_bundle_queue() for _ in range(num_inputs)
        ]

    def on_input_queued(self, input: RefBundle, *, input_index: int):
        self._internal_inqueues[input_index].add(input)

    def on_input_dequeued(self, input: RefBundle, *, input_index: int):
        self._internal_inqueues[input_index].remove(input)

    def obj_store_mem_internal_inqueue_for_input(self, input_index: int) -> int:
        return self._internal_inqueues[input_index].estimate_size_bytes()

UnionOp에서의 올바른 input_index 전달

# union_operator.py
def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
    if self._preserve_order:
        self._input_buffers[input_index].add(refs)
        self._metrics.on_input_queued(refs, input_index=input_index)
        self._try_round_robin()
    else:
        self._output_buffer.add(refs)
        self._metrics.on_output_queued(refs)

왜 이게 좋은가

  1. 데드락 해결: preserve_order=True에서 느린 입력이 빠른 입력의 큐 크기 때문에 잘못 백프레셔를 받던 문제가 사라집니다.
  2. 정확한 메모리 귀속: 각 업스트림 연산자는 자신이 실제로 기여한 블록 크기만큼만 메모리 사용량으로 계산됩니다.
  3. 모든 연산자 일관 적용: Union, Zip, Map, ActorPool, OutputSplitter 등 모든 연산자에서 input_index를 명시적으로 전달하도록 변경했습니다.
  4. 후속 작업 기반: preserve_order=False 케이스의 output queue 귀속 문제를 후속 PR에서 해결할 수 있도록 기반을 마련했습니다.

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글