본문으로 건너뛰기

[Ray Data] _map_task 공통 인자 캐싱으로 직렬화 오버헤드 절감

PR 링크: ray-project/ray#61996 상태: Merged | 변경: +33 / -83

들어가며

Ray Data에서 map 태스크를 제출하거나 actor를 생성할 때, map_transformerDataContext 객체가 매번 pickle 직렬화되어 전달되고 있었다. 이 직렬화는 flamegraph에서 상당한 비중을 차지하고 있었으며, 특히 actor 생성 시 전체 시간의 절반을 차지할 수 있었다. 이 객체들은 한 번 만들어지면 변하지 않으므로, Object Store에 한 번만 저장하고 ObjectRef로 재사용할 수 있다.

핵심 코드 분석

map_transformer_ref를 cached_property로 변경

@functools.cached_property
def _map_transformer_ref(self) -> ObjectRef[MapTransformer]:
    """Lazily serialize _map_transformer to object store on first access."""
    ref = ray.put(self._map_transformer)
    self._warn_large_udf(ref)
    return ref

@functools.cached_property
def _data_context_ref(self) -> ObjectRef[DataContext]:
    return ray.put(self.data_context)

Actor Pool에서도 캐싱된 ref 사용

# Before
actor = self._actor_cls.remote(
    ctx=ctx,  # 매번 직렬화
    map_transformer=self._map_transformer,  # 매번 직렬화
)

# After
actor = self._actor_cls.remote(
    ctx=self._data_context_ref,  # ObjectRef 전달
    map_transformer=self._map_transformer_ref,  # ObjectRef 전달
)

Task Pool에서도 동일

# Before
gen = self._map_task.remote(
    self._map_transformer_ref,
    data_context,  # 매번 직렬화
    ctx,
)

# After
gen = self._map_task.remote(
    self._map_transformer_ref,
    self._data_context_ref,  # ObjectRef 전달
    ctx,
)

왜 이게 좋은가

  1. 직렬화 비용 제거: ray.put()으로 한 번 Object Store에 저장한 후, ObjectRef만 전달하면 직렬화 없이 데이터를 공유할 수 있다.
  2. Actor 생성 2배 가속: 이전 기여자의 측정에 따르면 actor 생성 시간이 절반으로 줄었다.
  3. 순 삭제 50줄: 33줄 추가에 83줄 삭제로, 코드가 간결해졌다.
  4. functools.cached_property: lazy initialization과 캐싱을 한 번에 처리하여, 기존의 수동 __map_transformer_ref 관리 코드를 제거했다.

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글