[Ray] 워커 리스너 스레드 최적화: list를 frozenset으로 교체
PR #61353 - [Core] Optimize the worker listener thread
들어가며
Ray의 워커 프로세스에는 GCS로부터 에러 메시지를 수신하는 리스너 스레드가 있습니다. 이 스레드는 무한 루프에서 에러 메시지를 폴링하며, 각 메시지의 job_id가 현재 워커의 job에 해당하는지 확인합니다. 기존 코드에서는 매 반복마다 리스트를 새로 생성하여 in 연산을 수행했는데, 이를 루프 밖에서 한 번만 생성하는 frozenset으로 변경했습니다.
핵심 코드 분석
Before
while True:
_, error_data = worker.gcs_error_subscriber.poll()
if error_data is None:
continue
if error_data["job_id"] is not None and error_data["job_id"] not in [
worker.current_job_id.binary(),
JobID.nil().binary(),
]:
continue
매 폴링마다 2개 요소의 리스트를 새로 생성하고, in 연산이 O(n) 선형 탐색을 수행합니다.
After
expected_job_ids = frozenset(
[worker.current_job_id.binary(), JobID.nil().binary()]
)
while True:
_, error_data = worker.gcs_error_subscriber.poll()
if error_data is None:
continue
if (
error_data["job_id"] is not None
and error_data["job_id"] not in expected_job_ids
):
continue
루프 시작 전에 frozenset을 한 번 생성하여, 매 반복에서 리스트 생성 비용을 제거하고 in 연산을 O(1) 해시 탐색으로 변경합니다.
왜 이게 좋은가
- 객체 생성 제거: 무한 루프 내에서 매번 리스트를 생성하는 비용이 사라집니다.
- O(n) -> O(1):
in연산이 리스트의 선형 탐색에서 frozenset의 해시 탐색으로 변경됩니다. - 불변 보장:
frozenset은 불변이므로 스레드 안전성이 보장됩니다. - 간단하지만 효과적: 7줄의 변경으로 핫 루프의 불필요한 오버헤드를 제거합니다.
참고 자료
관련 포스트
PR Analysis 의 다른글
- 이전글 [Triton] FenceAsync에 비동기 읽기 의존성 추가 — st.shared와 copy_local_to_global 간 정합성 보장
- 현재글 : [Ray] 워커 리스너 스레드 최적화: list를 frozenset으로 교체
- 다음글 [pydantic-ai] 병렬 도구 실행 시 예외 발생 시 형제 태스크 취소 버그 수정
댓글