본문으로 건너뛰기

[Feast] 시간 윈도우 집계를 위한 타일링(Tiling) 지원 구현

PR 링크: feast-dev/feast#5724 상태: Merged | 변경: +1139 / -15

들어가며

Feature store에서 시간 윈도우 집계(time-windowed aggregation)는 핵심 기능이다. 예를 들어 "최근 1시간 거래 금액의 평균"을 실시간으로 계산해야 한다. 단순한 접근은 매번 전체 윈도우를 재스캔하는 것이지만, 스트리밍 환경에서 5분마다 업데이트가 들어올 때 이는 비효율적이다. 이 PR은 **타일링(Tiling)**이라는 기법을 도입하여, 윈도우를 작은 hop 단위로 분할하고 Intermediate Representation(IR)을 저장해 기존 타일을 재사용하는 방식으로 성능을 개선한다.

핵심 코드 분석

1. IRMetadata: 집계 유형별 중간 표현 정의

평균(avg)처럼 단순 병합이 불가능한 "holistic" 집계를 올바르게 처리하기 위해, 집계 유형마다 어떤 중간값을 저장해야 하는지 정의한다.

@dataclass
class IRMetadata:
    type: str  # "algebraic" or "holistic"
    ir_columns: Optional[List[str]] = None
    computation: Optional[str] = None

핵심 문제는 다음과 같다:

WRONG: avg(tile1, tile2) != (avg_tile1 + avg_tile2) / 2

Example:
  tile1: [10, 20, 30] -> avg = 20
  tile2: [100]        -> avg = 100
  
  Correct merged avg: (10+20+30+100) / 4 = 40
  Wrong merged avg:   (20 + 100) / 2     = 60

avg의 경우 sumcount를 IR로 저장하면 올바른 병합이 가능하다. std/var는 추가로 sum_of_squares가 필요하다.

2. Sawtooth Window Tiling 알고리즘

def apply_sawtooth_window_tiling(
    df: pd.DataFrame,
    aggregations: List[Aggregation],
    group_by_keys: List[str],
    timestamp_col: str,
    window_size: timedelta,
    hop_size: timedelta,
) -> pd.DataFrame:
    # Step 1: hop 구간 할당
    hop_size_ms = int(hop_size.total_seconds() * 1000)
    timestamp_ms = df[timestamp_col].astype("int64") // 10**6
    df["_hop_interval"] = (timestamp_ms // hop_size_ms) * hop_size_ms

    # Step 2: entity key + hop 구간으로 그룹핑 후 집계
    # algebraic (sum, count, max, min) -> 직접 집계
    # holistic (avg, std, var) -> IR 컬럼으로 집계
    ...

이벤트를 hop 크기(예: 5분) 단위로 분류하고, 각 hop에 대해 IR을 계산한다. 이후 누적 타일(cumulative tile)에서 윈도우 타일로 변환(subtraction)하여 최종 집계를 도출한다.

3. StreamFeatureView 설정

protobuf 스키마에 타일링 설정 필드가 추가되었다.

Before:

message StreamFeatureViewSpec {
    string name = 1;
    // ... 기존 필드 ...
    FeatureTransformationV2 feature_transformation = 17;
}

After:

message StreamFeatureViewSpec {
    string name = 1;
    // ... 기존 필드 ...
    FeatureTransformationV2 feature_transformation = 17;

    // Enable tiling for efficient window aggregation
    bool enable_tiling = 18;

    // Hop size for tiling (default: 5 minutes)
    google.protobuf.Duration tiling_hop_size = 19;
}

Python SDK에서는 다음과 같이 사용한다:

customer_features = StreamFeatureView(
    name="customer_transaction_features",
    entities=[customer],
    source=KafkaSource(...),
    aggregations=[
        Aggregation(column="amount", function="sum", time_window=timedelta(hours=1)),
        Aggregation(column="amount", function="avg", time_window=timedelta(hours=1)),
    ],
    enable_tiling=True,
    tiling_hop_size=timedelta(minutes=5),
)

왜 이게 좋은가

시점 타일링 없음 타일링 적용 타일 재사용률
T=00:00 1시간 전체 스캔 12개 타일 초기 계산 0%
T=00:05 1시간 전체 스캔 1개 타일 계산 + 11개 재사용 92%
T=00:10 1시간 전체 스캔 1개 타일 계산 + 11개 재사용 92%
  • 스트리밍 환경에서 극적인 성능 향상: 매 업데이트마다 전체 윈도우 대신 hop 크기만큼의 데이터만 처리
  • 수학적 정확성 보장: IR을 통해 avg, std, var 등 holistic 집계도 올바르게 병합
  • Pure pandas 구현: Spark, Ray 등 엔진에서 pandas로 변환 후 처리하므로 엔진 독립적
  • opt-in 설계: enable_tiling=True를 명시해야만 활성화되므로 기존 동작에 영향 없음

정리

이 PR은 feature store의 핵심 과제인 시간 윈도우 집계를 타일링으로 최적화한다. 특히 "홀리스틱 집계의 올바른 병합"이라는 이론적 문제를 IR(Intermediate Representation) 패턴으로 깔끔하게 해결한 점이 인상적이다. protobuf 스키마 변경, 순수 pandas 기반 알고리즘, StreamFeatureView 통합까지 전체 스택을 아우르는 체계적인 구현이다.

참고 자료

⚠️ 알림: 이 분석은 AI가 실제 코드 diff를 기반으로 작성했습니다.

댓글

관련 포스트

PR Analysis 의 다른글