본문으로 건너뛰기

[Loki] 쿼리 엔진 병렬 푸시다운 최적화 패스 추가

PR 링크: grafana/loki#19522 상태: Merged | 변경: +410 / -4

들어가며

Loki의 쿼리 엔진은 논리 계획(logical plan)을 물리 계획(physical plan)으로 변환할 때, Parallelize 노드를 통해 작업을 병렬화합니다. 기본적으로 MAKE_TABLE 처리 직후에 Parallelize 노드가 삽입되지만, 그 위에 있는 필터링, 파싱, 제한(Limit) 같은 작업들은 병렬화 범위 밖에 남아 있었습니다. 이 PR은 이러한 작업들을 Parallelize 노드 아래로 푸시다운하여 병렬 처리할 수 있게 하는 최적화 패스를 추가합니다.

핵심 코드 분석

parallelPushdown 규칙

type parallelPushdown struct {
    plan   *Plan
    pushed map[Node]struct{}
}

func (p *parallelPushdown) apply(node Node) bool {
    // canPushdown은 노드의 모든 자식이 Parallelize인 경우에만 true 반환
    if !p.canPushdown(node) {
        return false
    }
    if p.pushed == nil {
        p.pushed = make(map[Node]struct{})
    } else if _, ok := p.pushed[node]; ok {
        // 동일 노드에 규칙을 두 번 이상 적용하지 않음
        return false
    }
    // 노드를 Parallelize 내부로 이동...
}

Clone 인터페이스 추가

노드 재배치를 위해 Node 인터페이스에 Clone 메서드가 추가되었습니다:

// Filter의 Clone 구현
func (f *Filter) Clone() Node {
    return &Filter{
        Predicates: cloneExpressions(f.Predicates),
    }
}

// Limit의 Clone 구현
func (l *Limit) Clone() Node {
    return &Limit{
        Skip:  l.Skip,
        Fetch: l.Fetch,
    }
}

Expression 인터페이스에도 Clone이 추가되어 딥 복사가 가능합니다:

func cloneExpressions[E Expression](exprs []E) []E {
    clonedExprs := make([]E, len(exprs))
    for i, expr := range exprs {
        clonedExprs[i] = expr.Clone().(E)
    }
    return clonedExprs
}

왜 이게 좋은가

  • 병렬 처리 범위 확대: 필터링과 파싱이 각 병렬 워커에서 독립적으로 수행되므로 처리량이 증가합니다.
  • TopK 최적화 기반 마련: Limit을 로컬 TopK와 글로벌 TopK로 분할하여 병렬화하는 패턴의 기반이 됩니다.
  • 안전한 노드 복사: Clone 인터페이스를 통해 최적화 패스가 노드를 안전하게 재배치할 수 있습니다.
  • 확장 가능한 설계: 향후 메트릭 처리 등 다른 연산에 대한 병렬 푸시다운도 동일한 프레임워크로 구현할 수 있습니다.

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글