본문으로 건너뛰기

[Loki] 싱크에 쓰기 전 레코드 배치 처리로 라운드트립 감소

PR #20821 - feat: Batch records before writing to sinks

들어가며

Grafana Loki의 쿼리 엔진에서 물리적 계획(physical planning) 단계가 오래 걸리는 문제가 있었습니다. 원인을 분석한 결과, 대부분의 시간이 싱크(sink)에 쓰기를 기다리는 데 소비되고 있었습니다. 기존에는 스케줄러가 생성한 각 태스크가 파이프라인에서 레코드를 읽을 때마다 개별적으로 싱크에 전송했는데, 이를 record_batch_size 설정에 따라 여러 레코드를 묶어서 한 번에 전송하도록 변경했습니다.

핵심 코드 분석

Before

for {
    rec, err := pipeline.Read(ctx)
    // ...
    for _, sink := range job.Sinks {
        err := sink.Send(ctx, rec)
        // 에러 처리
    }
}

매 레코드마다 모든 싱크에 개별 전송합니다.

After

batchAggregator := arrowagg.NewRecords(memory.DefaultAllocator)

flush := func(toSend arrow.RecordBatch) {
    for _, sink := range sinks {
        err := sink.Send(ctx, toSend)
        // 에러 처리
    }
}

for {
    rec, err := pipeline.Read(ctx)
    // ...
    if batchSizeRecords <= 0 {
        flush(rec) // 배치 비활성화 시 즉시 전송
        continue
    }
    if currentBatchRecordCount + rec.NumRows() > batchSizeRecords {
        flushBatch() // 배치 크기 초과 시 플러시
    }
    batchAggregator.Append(rec)
    currentBatchRecordCount += rec.NumRows()
}
// 남은 배치 플러시
flushBatch()

batchSizeRecords 설정에 따라 레코드를 누적한 후 스키마 조정(reconciliation)을 거쳐 한 번에 전송합니다.

왜 이게 좋은가

  1. 네트워크 라운드트립 감소: N개의 개별 전송이 ceil(N/batchSize)로 줄어듭니다.
  2. 싱크 대기 시간 최소화: 물리적 계획 단계에서 싱크 쓰기 대기 시간이 대폭 줄어듭니다.
  3. 설정 가능한 배치 크기: batchSizeRecords <= 0이면 기존 동작과 동일하여 하위 호환성을 유지합니다.
  4. 관측 가능성 강화: TaskDrainBatchesProduced, TaskDrainRecordsReceived 등 새로운 메트릭이 추가되어 배치 동작을 모니터링할 수 있습니다.

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글