[Ray] 다중 입력 연산자의 메모리 귀속 오류 수정으로 데드락 해결
PR 링크: ray-project/ray#61208 상태: Merged | 변경: +107 / -45
들어가며
Ray Data의 실행 엔진에서 Union이나 Zip 같은 다중 입력 연산자는 여러 업스트림 연산자로부터 블록을 받습니다. 기존에는 리소스 매니저가 각 업스트림에 다운스트림의 전체 내부 큐 크기를 귀속시켰습니다. 즉, 입력 A가 10B, 입력 B가 90B를 큐에 넣었을 때, A와 B 모두 100B를 사용한 것으로 계산되어 이중 계산(double-counting)이 발생했습니다.
이 문제는 preserve_order=True 옵션에서 데드락을 유발합니다. Union이 라운드 로빈 순서로 느린 입력을 기다리는데, 빠른 입력이 큐를 채워 전체 큐 크기가 커지면 느린 입력까지 백프레셔를 받아 블록을 생산하지 못하게 됩니다.
핵심 코드 분석
입력별 큐 분리
Before:
class OpRuntimeMetrics:
def __init__(self, op):
self._internal_inqueue = create_bundle_queue()
def on_input_queued(self, input: RefBundle):
self._internal_inqueue.add(input)
def on_input_dequeued(self, input: RefBundle):
self._internal_inqueue.remove(input)
@property
def obj_store_mem_internal_inqueue(self) -> int:
return self._internal_inqueue.estimate_size_bytes()
After:
class OpRuntimeMetrics:
def __init__(self, op):
num_inputs = max(len(op.input_dependencies), 1)
self._internal_inqueues = [
create_bundle_queue() for _ in range(num_inputs)
]
def on_input_queued(self, input: RefBundle, *, input_index: int):
self._internal_inqueues[input_index].add(input)
def on_input_dequeued(self, input: RefBundle, *, input_index: int):
self._internal_inqueues[input_index].remove(input)
def obj_store_mem_internal_inqueue_for_input(self, input_index: int) -> int:
return self._internal_inqueues[input_index].estimate_size_bytes()
UnionOp에서의 올바른 input_index 전달
# union_operator.py
def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
if self._preserve_order:
self._input_buffers[input_index].add(refs)
self._metrics.on_input_queued(refs, input_index=input_index)
self._try_round_robin()
else:
self._output_buffer.add(refs)
self._metrics.on_output_queued(refs)
왜 이게 좋은가
- 데드락 해결:
preserve_order=True에서 느린 입력이 빠른 입력의 큐 크기 때문에 잘못 백프레셔를 받던 문제가 사라집니다. - 정확한 메모리 귀속: 각 업스트림 연산자는 자신이 실제로 기여한 블록 크기만큼만 메모리 사용량으로 계산됩니다.
- 모든 연산자 일관 적용: Union, Zip, Map, ActorPool, OutputSplitter 등 모든 연산자에서
input_index를 명시적으로 전달하도록 변경했습니다. - 후속 작업 기반:
preserve_order=False케이스의 output queue 귀속 문제를 후속 PR에서 해결할 수 있도록 기반을 마련했습니다.
참고 자료
관련 포스트
PR Analysis 의 다른글
- 이전글 [Triton] AsyncCompileMode 에러 발생 시 active_mode 초기화 보장
- 현재글 : [Ray] 다중 입력 연산자의 메모리 귀속 오류 수정으로 데드락 해결
- 다음글 [Loki] 쿼리 엔진 정합성 테스트 병렬 실행으로 CI 시간 15% 단축
댓글