본문으로 건너뛰기

[Ray Data] 논리적 최적화 규칙에서 in-place 변형을 제거하여 불변성 준비

PR 링크: ray-project/ray#60334 상태: Merged | 변경: +49 / -45

들어가며

Ray Data의 논리적 최적화 파이프라인은 LogicalOperator로 구성된 DAG를 변환하여 쿼리 실행 계획을 개선한다. 기존에는 limit_pushdown, predicate_pushdown, inherit_batch_format 규칙들이 DAG 노드의 _input_dependencies_output_dependencies를 직접 수정(in-place mutation)했다. 이 PR은 노드를 복사하고 새로 연결하는 방식으로 전환하여, 향후 LogicalOperator를 불변(immutable)하고 비교 가능(comparable)하게 만들기 위한 기반을 마련한다.

핵심 코드 분석

Before: in-place 변형 (inherit_batch_format)

def _apply(self, op: LogicalOperator):
    nodes = deque()
    for node in op.post_order_iter():
        nodes.appendleft(node)

    while len(nodes) > 0:
        current_op = nodes.pop()
        if isinstance(current_op, AbstractAllToAll):
            upstream_op = current_op.input_dependencies[0]
            while upstream_op.input_dependencies:
                if isinstance(upstream_op, MapBatches) and upstream_op._batch_format:
                    current_op._batch_format = upstream_op._batch_format  # 직접 수정
                    break
                upstream_op = upstream_op.input_dependencies[0]
    return op

After: 복사-재구축 (inherit_batch_format)

def _apply(self, op: LogicalOperator):
    def transform(node: LogicalOperator) -> LogicalOperator:
        if not isinstance(node, AbstractAllToAll):
            return node

        upstream_op = node.input_dependencies[0]
        while upstream_op.input_dependencies:
            if isinstance(upstream_op, MapBatches) and upstream_op._batch_format:
                new_op = copy.copy(node)  # 복사본 생성
                new_op._batch_format = upstream_op._batch_format
                new_op._output_dependencies = []
                return new_op
            upstream_op = upstream_op.input_dependencies[0]
        return node

    return op._apply_transform(transform)

limit_pushdown 개선

# Before: 기존 노드의 의존성을 직접 조작
original_children = list(union_op.input_dependencies)
for child in original_children:
    child._output_dependencies.remove(union_op)
limit_op._input_dependencies = [new_union]
new_union._output_dependencies = [limit_op]

# After: 새 노드를 생성하여 재구축
new_union = Union(*branch_tails)
return Limit(new_union, limit_op._limit)

왜 이게 좋은가

  1. 부작용 제거: in-place 변형은 동일 노드를 참조하는 다른 코드에 예기치 않은 영향을 줄 수 있다. 복사-재구축은 원본 DAG를 보존하면서 새 DAG를 생성한다.
  2. 불변성 기반 마련: 이 PR은 LogicalOperator를 불변으로 만드는 2단계 작업의 1단계다. 불변 노드는 __eq____hash__를 구현할 수 있어, 최적화 결과 캐싱이나 중복 제거가 가능해진다.
  3. 중복 Limit 삽입 방지: _push_limit_into_union에서 이미 같은 limit이 있는 브랜치에 중복 Limit을 삽입하지 않는 가드를 추가했다.

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글