[Ray] Ray 대규모 리소스 뷰 동기화 -- 메시지 배칭으로 개선
PR 링크: ray-project/ray#57641 상태: Merged | 변경: +329 / -93
들어가며
Ray 클러스터는 RaySyncer를 통해 노드 간 리소스 뷰를 동기화한다. 기존에는 gRPC bidirectional streaming에서 메시지를 하나씩 전송했는데, 수백~수천 노드 규모에서는 개별 메시지 전송의 오버헤드가 누적되어 동기화 지연이 발생한다. 이 PR은 protobuf 레벨에서 RaySyncMessageBatch를 도입하고, 메시지를 모아서 한 번에 전송하는 배칭 메커니즘을 추가한다.
핵심 코드 분석
Protobuf: 배치 메시지 타입 추가
Before:
service RaySyncer {
rpc StartSync(stream RaySyncMessage) returns (stream RaySyncMessage);
}
After:
message RaySyncMessageBatch {
repeated RaySyncMessage messages = 1;
}
service RaySyncer {
rpc StartSync(stream RaySyncMessageBatch) returns (stream RaySyncMessageBatch);
}
gRPC 스트리밍의 단위가 단일 메시지에서 메시지 배치로 변경되었다. 하나의 gRPC write에 여러 메시지를 담아 전송한다.
배칭 전략: 크기 기반 + 타이머 기반
Before:
bool PushToSendingQueue(std::shared_ptr<const RaySyncMessage> message) override {
// ...
sending_buffer_[key] = std::move(message);
StartSend(); // 매번 즉시 전송 시도
return true;
}
After:
bool PushToSendingQueue(std::shared_ptr<const RaySyncMessage> message) override {
// ...
sending_buffer_[key] = std::move(message);
if (sending_buffer_.size() >= max_batch_size_ || max_batch_delay_ms_.count() == 0) {
// 배치 크기 도달 또는 delay 0이면 즉시 전송
if (batch_timer_active_) {
batch_timer_.cancel();
batch_timer_active_ = false;
}
StartSend();
} else {
// 타이머 시작: delay 후 자동 전송
if (!batch_timer_active_) {
batch_timer_active_ = true;
batch_timer_.expires_after(max_batch_delay_ms_);
auto weak_self = std::weak_ptr<RaySyncerBidiReactor>(self_ref_);
batch_timer_.async_wait([weak_self, this](const auto &ec) {
auto self = weak_self.lock();
if (!self) return;
batch_timer_active_ = false;
if (!ec && !*IsDisconnected()) StartSend();
});
}
}
return true;
}
두 가지 조건으로 배치 전송을 트리거한다: (1) max_batch_size에 도달하면 즉시 전송, (2) 도달하지 못하면 max_batch_delay_ms 후에 타이머가 전송을 트리거한다. weak_ptr로 reactor 수명 관리를 안전하게 처리한다.
전송 로직: 개별에서 일괄로
Before:
void StartSend() {
if (sending_) return;
if (sending_buffer_.size() != 0) {
auto iter = sending_buffer_.begin();
auto msg = std::move(iter->second);
sending_buffer_.erase(iter);
Send(std::move(msg), sending_buffer_.empty());
sending_ = true;
}
}
After:
void StartSend() {
if (sending_ || sending_buffer_.empty()) return;
auto message_batch = std::make_shared<RaySyncMessageBatch>();
for (const auto &[key, message] : sending_buffer_) {
*message_batch->add_messages() = *message;
}
sending_buffer_.clear();
Send(std::move(message_batch));
sending_ = true;
}
버퍼의 모든 메시지를 하나의 RaySyncMessageBatch로 묶어 전송한다. 기존에는 하나씩 꺼내서 보냈지만, 이제 한 번의 gRPC write로 여러 메시지를 전송한다.
왜 이게 좋은가
- gRPC 오버헤드 감소: 여러 메시지를 하나의 gRPC write로 전송하므로 serialization, syscall, 네트워크 round-trip 비용이 줄어든다.
- 설정 가능한 trade-off:
gcs_resource_broadcast_max_batch_size와gcs_resource_broadcast_max_batch_delay_ms두 파라미터로 지연과 처리량 사이의 균형을 조절할 수 있다. - 하위 호환성:
batch_size=1, delay=0이 기본값이므로 기존과 동일하게 메시지를 즉시 개별 전송한다. 배칭은 명시적으로 활성화해야 한다. - 메모리 안전성:
shared_ptr+weak_ptr패턴으로 reactor의 비동기 콜백에서 use-after-free를 방지한다. 기존의 raw pointer(new/delete) 방식을shared_ptr로 전환한 것도 이 PR의 중요한 개선이다.
정리
대규모 Ray 클러스터에서 리소스 동기화 병목을 해결하기 위해 gRPC 스트리밍에 메시지 배칭을 도입했다. protobuf 스키마 변경부터 reactor의 전송 로직, 타이머 기반 flush, 메모리 안전성까지 체계적으로 변경한 PR이다. 기본값이 기존 동작을 유지하면서 필요할 때 배칭을 활성화할 수 있는 점진적 도입 전략이 좋다.
참고 자료
- Ray Architecture: RaySyncer -- Ray 내부 동기화 아키텍처
- gRPC Bidirectional Streaming -- gRPC 양방향 스트리밍 개념
알림: 이 분석은 AI가 실제 코드 diff를 기반으로 작성했습니다.
관련 포스트
PR Analysis 의 다른글
- 이전글 [triton] AMD GPU에서 Block Scaled Matmul 지원 추가
- 현재글 : [Ray] Ray 대규모 리소스 뷰 동기화 -- 메시지 배칭으로 개선
- 다음글 [Ray] iter_batches에서 프리페치 버퍼링을 올바르게 처리하여 지연시간 안정화
댓글