본문으로 건너뛰기

[Grafana Loki] 범위 집계를 병렬 파티션으로 푸시다운하여 쿼리 처리 최적화

PR 링크: grafana/loki#20655 상태: Merged | 변경: +281 / -10

들어가며

Loki의 쿼리 엔진은 물리적 실행 계획을 최적화할 때 parallelPushdown이라는 최적화 패스를 사용한다. 이 최적화는 특정 연산 노드를 Parallelize 노드 안으로 밀어넣어 각 데이터 파티션에서 로컬로 실행되게 한다. 기존에는 Projection, Filter, TopK 같은 노드만 지원했는데, 이 PR은 RangeAggregation도 푸시다운 대상에 추가한다. 단, 수학적으로 안전한 조합만 허용한다.

핵심 코드 분석

Before: RangeAggregation은 병렬화 불가

switch node.(type) {
case *Projection, *Filter, *ColumnCompat:
    // 시프트 노드: Parallelize 안으로 이동
case *TopK:
    // 샤딩 노드: Parallelize 안으로 이동
}
// RangeAggregation은 항상 글로벌에서 실행

After: 안전한 집계 조합만 병렬화

case *RangeAggregation:
    vecAgg := p.findParentVectorAggregation(node)
    if vecAgg == nil {
        return false
    }
    if !canShardAggregation(vecAgg, node) {
        return false
    }
    for _, parallelize := range p.plan.Children(node) {
        p.plan.graph.Inject(parallelize, node.Clone())
    }
    p.plan.graph.Eliminate(node)
    p.pushed[node] = struct{}{}
    return true

허용되는 조합을 판단하는 canShardAggregation 함수:

func canShardAggregation(vec *VectorAggregation, rng *RangeAggregation) bool {
    switch vec.Operation {
    case types.VectorAggregationTypeSum:
        return rng.Operation == types.RangeAggregationTypeSum ||
               rng.Operation == types.RangeAggregationTypeCount
    case types.VectorAggregationTypeMax:
        return rng.Operation == types.RangeAggregationTypeMax
    case types.VectorAggregationTypeMin:
        return rng.Operation == types.RangeAggregationTypeMin
    }
    return false
}

왜 이게 좋은가

  1. 네트워크 전송량 감소: 기존에는 미집계된 원시 로그 데이터가 모든 파티션에서 중앙 집계 노드로 전송되었다. 이제 각 파티션에서 로컬 집계를 수행하므로 전송되는 데이터가 집계 결과로 축소된다.
  2. 파이프라인 병목 해소: 단일 다운스트림 집계 노드가 병목이 되어 각 스레드가 레코드 전송을 기다리는 문제가 있었다. 로컬 집계가 파이프라인 브레이커 역할을 하여 각 스레드가 독립적으로 처리할 수 있다.
  3. 수학적 정확성 보장: sum(sum), sum(count), max(max), min(min)만 허용하고, avg나 그룹핑 불일치 같은 위험한 조합은 거부한다. 분배법칙이 성립하는 연산만 병렬화하여 결과 정확성을 보장한다.

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글