[Loki] 인메모리 레이트 트래커로 UpdateRates RPC 구현
PR 링크: grafana/loki#19907 상태: Merged | 변경: +168 / -73
들어가며
Grafana Loki의 인제스트 리미터 서비스에는 스트림별 수집 속도를 추적하기 위한 UpdateRates RPC가 있었지만, 빈 응답만 반환하는 스텁 상태였습니다. 레이트 버킷 관련 코드도 모두 주석 처리되어 있었습니다. 이 PR은 순환 리스트(circular buffer) 기반의 인메모리 레이트 트래커를 구현하여 이 기능을 활성화합니다. Kafka 복제는 아직 추가하지 않았는데, 재시작 시 리플레이 볼륨이 너무 크기 때문입니다.
핵심 코드 분석
UpdateRates RPC 구현
Before:
func (s *Service) UpdateRates(
_ context.Context,
_ *proto.UpdateRatesRequest,
) (*proto.UpdateRatesResponse, error) {
return &proto.UpdateRatesResponse{}, nil
}
After:
func (s *Service) UpdateRates(
_ context.Context,
req *proto.UpdateRatesRequest,
) (*proto.UpdateRatesResponse, error) {
updated, err := s.usage.UpdateRates(req.Tenant, req.Streams, s.clock.Now())
if err != nil {
return nil, err
}
resp := proto.UpdateRatesResponse{
Results: make([]*proto.UpdateRatesResult, len(updated)),
}
for i, stream := range updated {
var totalSize uint64
for _, bucket := range stream.rateBuckets {
totalSize += bucket.size
}
averageRate := totalSize / (uint64(s.cfg.BucketSize.Seconds()) * uint64(len(stream.rateBuckets)))
resp.Results[i] = &proto.UpdateRatesResult{
StreamHash: stream.hash,
Rate: averageRate,
}
}
return &resp, nil
}
순환 버퍼 기반 레이트 버킷
func (s *usageStore) updateWithBuckets(...) {
// 순환 리스트의 버킷 인덱스 계산
bucketNum := seenAtUnixNano / int64(s.bucketSize)
bucketIdx := int(bucketNum % int64(s.numBuckets))
bucket := stream.rateBuckets[bucketIdx]
// 레이트 윈도우 밖의 오래된 버킷이면 리셋
bucketStart := seenAt.Truncate(s.bucketSize).UnixNano()
if bucket.timestamp < bucketStart {
bucket.timestamp = bucketStart
bucket.size = 0
}
bucket.size += metadata.TotalSize
stream.rateBuckets[bucketIdx] = bucket
}
왜 이게 좋은가
- 인메모리 설계: Kafka 복제 없이 인메모리로 동작하므로 빠르게 레이트 버킷이 채워지고, 재시작 시 복구도 빠릅니다.
- 순환 버퍼: 고정 크기 배열을 순환 사용하므로 추가 메모리 할당이 없습니다.
- 정확한 평균 계산: 활성 버킷만 사용하여 평균 속도를 계산하므로, 빈 버킷이 평균을 끌어내리지 않습니다.
- 기존 로직 분리: ExceedsLimits RPC에 영향을 주지 않기 위해
updateWithBuckets를 별도 함수로 분리했습니다.
참고 자료
관련 포스트
PR Analysis 의 다른글
- 이전글 [triton] tl.cat 연산을 permute+reshape+join으로 재구현하여 결정적(deterministic) 동작 보장
- 현재글 : [Loki] 인메모리 레이트 트래커로 UpdateRates RPC 구현
- 다음글 [triton] AMD GPU에서 Block Scaled Matmul 지원 추가
댓글