[Loki] 싱크에 쓰기 전 레코드 배치 처리로 라운드트립 감소
PR #20821 - feat: Batch records before writing to sinks
들어가며
Grafana Loki의 쿼리 엔진에서 물리적 계획(physical planning) 단계가 오래 걸리는 문제가 있었습니다. 원인을 분석한 결과, 대부분의 시간이 싱크(sink)에 쓰기를 기다리는 데 소비되고 있었습니다. 기존에는 스케줄러가 생성한 각 태스크가 파이프라인에서 레코드를 읽을 때마다 개별적으로 싱크에 전송했는데, 이를 record_batch_size 설정에 따라 여러 레코드를 묶어서 한 번에 전송하도록 변경했습니다.
핵심 코드 분석
Before
for {
rec, err := pipeline.Read(ctx)
// ...
for _, sink := range job.Sinks {
err := sink.Send(ctx, rec)
// 에러 처리
}
}
매 레코드마다 모든 싱크에 개별 전송합니다.
After
batchAggregator := arrowagg.NewRecords(memory.DefaultAllocator)
flush := func(toSend arrow.RecordBatch) {
for _, sink := range sinks {
err := sink.Send(ctx, toSend)
// 에러 처리
}
}
for {
rec, err := pipeline.Read(ctx)
// ...
if batchSizeRecords <= 0 {
flush(rec) // 배치 비활성화 시 즉시 전송
continue
}
if currentBatchRecordCount + rec.NumRows() > batchSizeRecords {
flushBatch() // 배치 크기 초과 시 플러시
}
batchAggregator.Append(rec)
currentBatchRecordCount += rec.NumRows()
}
// 남은 배치 플러시
flushBatch()
batchSizeRecords 설정에 따라 레코드를 누적한 후 스키마 조정(reconciliation)을 거쳐 한 번에 전송합니다.
왜 이게 좋은가
- 네트워크 라운드트립 감소: N개의 개별 전송이 ceil(N/batchSize)로 줄어듭니다.
- 싱크 대기 시간 최소화: 물리적 계획 단계에서 싱크 쓰기 대기 시간이 대폭 줄어듭니다.
- 설정 가능한 배치 크기:
batchSizeRecords <= 0이면 기존 동작과 동일하여 하위 호환성을 유지합니다. - 관측 가능성 강화:
TaskDrainBatchesProduced,TaskDrainRecordsReceived등 새로운 메트릭이 추가되어 배치 동작을 모니터링할 수 있습니다.
참고 자료
관련 포스트
PR Analysis 의 다른글
- 이전글 [Loki] 빈 레이블 제거에 더 단순한 함수 사용
- 현재글 : [Loki] 싱크에 쓰기 전 레코드 배치 처리로 라운드트립 감소
- 다음글 [Ray RLlib] space_utils.batch()에서 np.stack 대신 사전 할당 배열로 연결 속도 개선
댓글