[Grafana Loki] 범위 집계를 병렬 파티션으로 푸시다운하여 쿼리 처리 최적화
PR 링크: grafana/loki#20655 상태: Merged | 변경: +281 / -10
들어가며
Loki의 쿼리 엔진은 물리적 실행 계획을 최적화할 때 parallelPushdown이라는 최적화 패스를 사용한다. 이 최적화는 특정 연산 노드를 Parallelize 노드 안으로 밀어넣어 각 데이터 파티션에서 로컬로 실행되게 한다. 기존에는 Projection, Filter, TopK 같은 노드만 지원했는데, 이 PR은 RangeAggregation도 푸시다운 대상에 추가한다. 단, 수학적으로 안전한 조합만 허용한다.
핵심 코드 분석
Before: RangeAggregation은 병렬화 불가
switch node.(type) {
case *Projection, *Filter, *ColumnCompat:
// 시프트 노드: Parallelize 안으로 이동
case *TopK:
// 샤딩 노드: Parallelize 안으로 이동
}
// RangeAggregation은 항상 글로벌에서 실행
After: 안전한 집계 조합만 병렬화
case *RangeAggregation:
vecAgg := p.findParentVectorAggregation(node)
if vecAgg == nil {
return false
}
if !canShardAggregation(vecAgg, node) {
return false
}
for _, parallelize := range p.plan.Children(node) {
p.plan.graph.Inject(parallelize, node.Clone())
}
p.plan.graph.Eliminate(node)
p.pushed[node] = struct{}{}
return true
허용되는 조합을 판단하는 canShardAggregation 함수:
func canShardAggregation(vec *VectorAggregation, rng *RangeAggregation) bool {
switch vec.Operation {
case types.VectorAggregationTypeSum:
return rng.Operation == types.RangeAggregationTypeSum ||
rng.Operation == types.RangeAggregationTypeCount
case types.VectorAggregationTypeMax:
return rng.Operation == types.RangeAggregationTypeMax
case types.VectorAggregationTypeMin:
return rng.Operation == types.RangeAggregationTypeMin
}
return false
}
왜 이게 좋은가
- 네트워크 전송량 감소: 기존에는 미집계된 원시 로그 데이터가 모든 파티션에서 중앙 집계 노드로 전송되었다. 이제 각 파티션에서 로컬 집계를 수행하므로 전송되는 데이터가 집계 결과로 축소된다.
- 파이프라인 병목 해소: 단일 다운스트림 집계 노드가 병목이 되어 각 스레드가 레코드 전송을 기다리는 문제가 있었다. 로컬 집계가 파이프라인 브레이커 역할을 하여 각 스레드가 독립적으로 처리할 수 있다.
- 수학적 정확성 보장:
sum(sum),sum(count),max(max),min(min)만 허용하고,avg나 그룹핑 불일치 같은 위험한 조합은 거부한다. 분배법칙이 성립하는 연산만 병렬화하여 결과 정확성을 보장한다.
참고 자료
관련 포스트
PR Analysis 의 다른글
- 이전글 [feast] Feast 성능 최적화: 엔티티 키 직렬화 Hot Path 2.4배 개선하기
- 현재글 : [Grafana Loki] 범위 집계를 병렬 파티션으로 푸시다운하여 쿼리 처리 최적화
- 다음글 [Grafana Loki] 검증이 완료될 때까지 accepted stream 캐시를 비활성화
댓글