본문으로 건너뛰기

[Ray Core] request ID 생성을 worker로 이동하여 plasma get 성능 회귀 수정

PR 링크: ray-project/ray#58390 상태: Merged | 변경: +50 / -67

들어가며

Ray의 ray.get을 스레드 안전하게 만들기 위한 이전 PR에서, worker가 AsyncGetObjects 요청을 보낸 후 raylet으로부터 request ID가 담긴 응답을 동기적으로 기다리는 단계가 추가되었다. 이 추가적인 동기 통신 단계로 인해 plasma store의 get 성능이 10,000 ops/s에서 5,000 ops/s 미만으로 50% 이상 회귀했다.

핵심 코드 분석

핵심 아이디어: request ID를 worker에서 생성

raylet에서 request ID를 생성하고 응답으로 전달하는 대신, worker에서 atomic counter로 직접 생성하여 요청과 함께 전송한다.

Worker 측: atomic counter 추가

class CoreWorkerPlasmaStoreProvider {
    // ...
    std::atomic<int64_t> get_request_counter_;
};

// 생성자에서 초기화
CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(...)
    : // ...
      get_request_counter_(0) {
}

요청에 request ID 포함

// Before: 응답에서 request_id를 받음
StatusOr<ipc::ScopedResponse> status_or_cleanup =
    raylet_ipc_client_->AsyncGetObjects(batch_ids, owner_addresses);

// After: 요청에 request_id를 포함
StatusOr<ipc::ScopedResponse> status_or_cleanup =
    raylet_ipc_client_->AsyncGetObjects(
        batch_ids, owner_addresses,
        get_request_counter_.fetch_add(1));

Flatbuffers 스키마 변경

// 삭제된 메시지
table AsyncGetObjectsReply {
    request_id: long;
}

// AsyncGetObjectsRequest에 필드 추가
table AsyncGetObjectsRequest {
    object_ids: [string];
    owner_addresses: [Address];
    get_request_id: long;  // 새로 추가
}

Raylet 측: 응답 전송 코드 전체 제거

// Before: 요청 처리 후 응답 전송
void NodeManager::HandleAsyncGetObjectsRequest(...) {
    // ...
    int64_t request_id = AsyncGet(client, refs);
    // flatbuffer 응답 생성 및 전송
    auto get_reply = protocol::CreateAsyncGetObjectsReply(fbb, request_id);
    fbb.Finish(get_reply);
    client->WriteMessage(...);
}

// After: 요청만 처리, 응답 없음
void NodeManager::HandleAsyncGetObjectsRequest(...) {
    auto request = flatbuffers::GetRoot<protocol::AsyncGetObjectsRequest>(message_data);
    std::vector<rpc::ObjectReference> refs = ...;
    AsyncGet(client, refs, request->get_request_id());
}

왜 이게 좋은가

  1. 성능 회복: plasma store get이 다시 10,000 ops/s 수준으로 복구되었다. 50% 이상의 성능 회귀가 해결되었다.
  2. 동기 통신 제거: worker가 raylet 응답을 기다리는 블로킹 단계를 완전히 제거했다.
  3. Atomic 안전성: std::atomic<int64_t>fetch_add는 lock-free하고 스레드 안전하여, 원래의 스레드 안전성 요구사항도 충족한다.
  4. 코드 순삭제: 50줄 추가에 67줄 삭제로, 불필요한 응답 메시지 타입과 처리 코드가 정리되었다.

참고 자료

댓글

관련 포스트

PR Analysis 의 다른글