본문으로 건너뛰기

[Loki] 인메모리 레이트 트래커로 UpdateRates RPC 구현

PR 링크: grafana/loki#19907 상태: Merged | 변경: +168 / -73

들어가며

Grafana Loki의 인제스트 리미터 서비스에는 스트림별 수집 속도를 추적하기 위한 UpdateRates RPC가 있었지만, 빈 응답만 반환하는 스텁 상태였습니다. 레이트 버킷 관련 코드도 모두 주석 처리되어 있었습니다. 이 PR은 순환 리스트(circular buffer) 기반의 인메모리 레이트 트래커를 구현하여 이 기능을 활성화합니다. Kafka 복제는 아직 추가하지 않았는데, 재시작 시 리플레이 볼륨이 너무 크기 때문입니다.

핵심 코드 분석

UpdateRates RPC 구현

Before:

func (s *Service) UpdateRates(
    _ context.Context,
    _ *proto.UpdateRatesRequest,
) (*proto.UpdateRatesResponse, error) {
    return &proto.UpdateRatesResponse{}, nil
}

After:

func (s *Service) UpdateRates(
    _ context.Context,
    req *proto.UpdateRatesRequest,
) (*proto.UpdateRatesResponse, error) {
    updated, err := s.usage.UpdateRates(req.Tenant, req.Streams, s.clock.Now())
    if err != nil {
        return nil, err
    }
    resp := proto.UpdateRatesResponse{
        Results: make([]*proto.UpdateRatesResult, len(updated)),
    }
    for i, stream := range updated {
        var totalSize uint64
        for _, bucket := range stream.rateBuckets {
            totalSize += bucket.size
        }
        averageRate := totalSize / (uint64(s.cfg.BucketSize.Seconds()) * uint64(len(stream.rateBuckets)))
        resp.Results[i] = &proto.UpdateRatesResult{
            StreamHash: stream.hash,
            Rate:       averageRate,
        }
    }
    return &resp, nil
}

순환 버퍼 기반 레이트 버킷

func (s *usageStore) updateWithBuckets(...) {
    // 순환 리스트의 버킷 인덱스 계산
    bucketNum := seenAtUnixNano / int64(s.bucketSize)
    bucketIdx := int(bucketNum % int64(s.numBuckets))
    bucket := stream.rateBuckets[bucketIdx]

    // 레이트 윈도우 밖의 오래된 버킷이면 리셋
    bucketStart := seenAt.Truncate(s.bucketSize).UnixNano()
    if bucket.timestamp < bucketStart {
        bucket.timestamp = bucketStart
        bucket.size = 0
    }
    bucket.size += metadata.TotalSize
    stream.rateBuckets[bucketIdx] = bucket
}

왜 이게 좋은가

  • 인메모리 설계: Kafka 복제 없이 인메모리로 동작하므로 빠르게 레이트 버킷이 채워지고, 재시작 시 복구도 빠릅니다.
  • 순환 버퍼: 고정 크기 배열을 순환 사용하므로 추가 메모리 할당이 없습니다.
  • 정확한 평균 계산: 활성 버킷만 사용하여 평균 속도를 계산하므로, 빈 버킷이 평균을 끌어내리지 않습니다.
  • 기존 로직 분리: ExceedsLimits RPC에 영향을 주지 않기 위해 updateWithBuckets를 별도 함수로 분리했습니다.

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글