[Grafana Loki] 배치 처리를 파이프라인 래퍼로 분리하여 캐시 통합 준비
PR 링크: grafana/loki#21123 상태: Merged | 변경: +1015 / -386
들어가며
Loki의 쿼리 엔진은 Arrow 레코드 배치를 처리하는 파이프라인 구조로 동작한다. 기존에는 배치 크기 조절(작은 레코드들을 모아서 큰 배치로 합치는 것)이 drain 파이프라인 내부에 하드코딩되어 있었다. 이 PR은 배치 처리를 독립적인 batchingPipeline 래퍼로 분리하여, 향후 태스크 캐시가 배치 단위로 결과를 저장하고 재생할 수 있도록 기반을 마련한다.
핵심 코드 분석
After: 독립 batchingPipeline 래퍼
type batchingPipeline struct {
inner Pipeline
batchSize int64
agg *arrowagg.Records
pending arrow.RecordBatch // 오버플로우 레코드 보관
done bool
}
func (p *batchingPipeline) Read(ctx context.Context) (arrow.RecordBatch, error) {
if p.batchSize <= 0 {
return p.inner.Read(ctx) // 패스스루
}
// pending 레코드 포함
if p.pending != nil {
p.agg.Append(p.pending)
currentCount += p.pending.NumRows()
p.pending = nil
}
for {
rec, err := p.inner.Read(ctx)
// ...
// 배치 오버플로우 시 다음 Read로 이월
if currentCount > 0 && currentCount+rec.NumRows() > p.batchSize {
p.pending = rec
break
}
p.agg.Append(rec)
currentCount += rec.NumRows()
if currentCount >= p.batchSize {
break
}
}
return p.agg.Aggregate()
}
물리적 계획에 배치 래핑을 적용하는 부분:
plan, err = physical.WrapWithBatching(plan, e.cfg.BatchSize)
왜 이게 좋은가
- 관심사 분리: 배치 처리 로직이 drain 파이프라인에서 완전히 분리되었다. drain은 파이프라인에서 읽어 싱크에 쓰는 것만 담당하고, 배치 크기 조절은 래퍼가 담당한다.
- 캐시 통합 기반: 배치가 파이프라인 경계에서 생성되므로, 태스크 캐시가 완성된 배치를 저장하고 캐시 히트 시 해당 배치를 그대로 재생할 수 있다.
- 스키마 조정 자동화:
arrowagg.Records를 사용하여 서로 다른 스키마를 가진 레코드들을 자동으로 조정하면서 합친다. Arrow 레코드 배치 간의 스키마 불일치 문제를 래퍼 레벨에서 해결한다. - 오버플로우 처리:
pending필드로 배치 크기를 초과하는 레코드를 다음 Read 호출로 정확하게 이월하여, 데이터 손실 없이 배치 경계를 유지한다.
참고 자료
관련 포스트
PR Analysis 의 다른글
- 이전글 [PyTorch] Inductor MPS Metal 셰이더 half-precision 타입 불일치 수정
- 현재글 : [Grafana Loki] 배치 처리를 파이프라인 래퍼로 분리하여 캐시 통합 준비
- 다음글 [pytorch] Inductor: bf16/fp16에서 addmm unfuse를 방지하여 정밀도 손실 해결
댓글