본문으로 건너뛰기

[Grafana Loki] 배치 처리를 파이프라인 래퍼로 분리하여 캐시 통합 준비

PR 링크: grafana/loki#21123 상태: Merged | 변경: +1015 / -386

들어가며

Loki의 쿼리 엔진은 Arrow 레코드 배치를 처리하는 파이프라인 구조로 동작한다. 기존에는 배치 크기 조절(작은 레코드들을 모아서 큰 배치로 합치는 것)이 drain 파이프라인 내부에 하드코딩되어 있었다. 이 PR은 배치 처리를 독립적인 batchingPipeline 래퍼로 분리하여, 향후 태스크 캐시가 배치 단위로 결과를 저장하고 재생할 수 있도록 기반을 마련한다.

핵심 코드 분석

After: 독립 batchingPipeline 래퍼

type batchingPipeline struct {
    inner     Pipeline
    batchSize int64
    agg       *arrowagg.Records
    pending   arrow.RecordBatch  // 오버플로우 레코드 보관
    done      bool
}

func (p *batchingPipeline) Read(ctx context.Context) (arrow.RecordBatch, error) {
    if p.batchSize <= 0 {
        return p.inner.Read(ctx)  // 패스스루
    }
    // pending 레코드 포함
    if p.pending != nil {
        p.agg.Append(p.pending)
        currentCount += p.pending.NumRows()
        p.pending = nil
    }
    for {
        rec, err := p.inner.Read(ctx)
        // ...
        // 배치 오버플로우 시 다음 Read로 이월
        if currentCount > 0 && currentCount+rec.NumRows() > p.batchSize {
            p.pending = rec
            break
        }
        p.agg.Append(rec)
        currentCount += rec.NumRows()
        if currentCount >= p.batchSize {
            break
        }
    }
    return p.agg.Aggregate()
}

물리적 계획에 배치 래핑을 적용하는 부분:

plan, err = physical.WrapWithBatching(plan, e.cfg.BatchSize)

왜 이게 좋은가

  1. 관심사 분리: 배치 처리 로직이 drain 파이프라인에서 완전히 분리되었다. drain은 파이프라인에서 읽어 싱크에 쓰는 것만 담당하고, 배치 크기 조절은 래퍼가 담당한다.
  2. 캐시 통합 기반: 배치가 파이프라인 경계에서 생성되므로, 태스크 캐시가 완성된 배치를 저장하고 캐시 히트 시 해당 배치를 그대로 재생할 수 있다.
  3. 스키마 조정 자동화: arrowagg.Records를 사용하여 서로 다른 스키마를 가진 레코드들을 자동으로 조정하면서 합친다. Arrow 레코드 배치 간의 스키마 불일치 문제를 래퍼 레벨에서 해결한다.
  4. 오버플로우 처리: pending 필드로 배치 크기를 초과하는 레코드를 다음 Read 호출로 정확하게 이월하여, 데이터 손실 없이 배치 경계를 유지한다.

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글