본문으로 건너뛰기

[vLLM] Executor 아키텍처: UniProc, Multiproc, Ray

들어가며

vLLM은 단일 GPU에서부터 다중 노드 클러스터까지 다양한 환경을 지원한다. 이 확장성의 핵심은 Executor 추상화이다. 스케줄러가 생성한 배치를 워커에게 전달하고 결과를 수집하는 역할을 하며, 실행 환경에 따라 UniProc, Multiproc, Ray 등의 구현체가 선택된다.

코드: vllm/v1/executor/abstract.py, multiproc_executor.py, uniproc_executor.py

공식 문서

vLLM 공식 문서: Multiprocessing

핵심 구조/코드 분석

Executor 추상 클래스

class Executor(ABC):
    """Abstract base class for vLLM executors."""

    uses_ray: bool = False
    supports_pp: bool = False

    @staticmethod
    def get_class(vllm_config: VllmConfig) -> type["Executor"]:
        distributed_executor_backend = parallel_config.distributed_executor_backend
        if distributed_executor_backend == "ray":
            from vllm.v1.executor.ray_executor import RayDistributedExecutor
            executor_class = RayDistributedExecutor
        elif distributed_executor_backend == "mp":
            from vllm.v1.executor.multiproc_executor import MultiprocExecutor
            executor_class = MultiprocExecutor
        elif distributed_executor_backend == "uni":
            from vllm.v1.executor.uniproc_executor import UniProcExecutor
            executor_class = UniProcExecutor
        return executor_class

팩토리 패턴으로 구현되어, distributed_executor_backend 설정 하나로 실행 환경이 결정된다. 지연 임포트를 사용하여 불필요한 의존성(Ray 등)을 로딩하지 않는다.

collective_rpc: 분산 RPC 인터페이스

@abstractmethod
def collective_rpc(
    self,
    method: str | Callable[[WorkerBase], _R],
    timeout: float | None = None,
    args: tuple = (),
    kwargs: dict | None = None,
    non_block: bool = False,
) -> list[_R]:
    """Execute an RPC call on all workers."""
    raise NotImplementedError

모든 Executor의 핵심 API이다. 워커의 메서드 이름(문자열) 또는 직접 callable을 전달할 수 있다. non_block=True면 Future를 반환하여 비동기 실행이 가능하다.

UniProcExecutor: 단일 프로세스

class UniProcExecutor(Executor):
    def _init_executor(self) -> None:
        self.driver_worker = WorkerWrapperBase(rpc_rank=0)
        distributed_init_method, rank, local_rank = self._distributed_args()
        kwargs = dict(
            vllm_config=self.vllm_config,
            local_rank=local_rank,
            rank=rank,
            distributed_init_method=distributed_init_method,
            is_driver_worker=True,
            shared_worker_lock=Lock(),
        )
        self.driver_worker.init_worker(all_kwargs=[kwargs])
        self.driver_worker.init_device()
        self.driver_worker.load_model()

    def collective_rpc(self, method, timeout=None, args=(), kwargs=None, non_block=False):
        if not non_block:
            result = run_method(self.driver_worker, method, args, kwargs)
            return [result]

단일 GPU 환경에서 사용되며, 프로세스 내에서 직접 워커를 호출한다. 가장 단순한 구현으로 오버헤드가 거의 없다. async_scheduling이 활성화되면 ThreadPoolExecutor를 사용하여 비동기 모델 실행도 지원한다.

MultiprocExecutor: 멀티프로세스

class MultiprocExecutor(Executor):
    supports_pp: bool = True

    def _init_executor(self) -> None:
        tp_size, pp_size, pcp_size = self._get_parallel_sizes()

        distributed_init_method = get_distributed_init_method(
            get_loopback_ip(), get_open_port()
        )

        self.rpc_broadcast_mq = MessageQueue(
            self.world_size,
            self.local_world_size,
            max_chunk_bytes=max_chunk_bytes,
        )

        for local_rank in range(self.local_world_size):
            unready_worker_handle = WorkerProc.make_worker_process(
                vllm_config=self.vllm_config,
                local_rank=local_rank,
                rank=global_rank,
                distributed_init_method=distributed_init_method,
                input_shm_handle=scheduler_output_handle,
                shared_worker_lock=shared_worker_lock,
                is_driver_worker=is_driver_worker,
            )

파이프라인 병렬화(PP)를 지원하는 멀티프로세스 Executor이다. 핵심 특징:

  1. MessageQueue(공유 메모리): 스케줄러 출력을 공유 메모리 기반 메시지 큐로 브로드캐스트한다.
  2. 워커 프로세스 생성: multiprocessing으로 각 GPU에 대해 독립 프로세스를 생성한다.
  3. 워커 모니터링: 백그라운드 스레드가 워커 프로세스의 건강 상태를 감시한다.

execute_model: 모델 실행

def execute_model(
    self, scheduler_output: SchedulerOutput, non_block: bool = False
) -> ModelRunnerOutput | None | Future[ModelRunnerOutput | None]:
    output = self.collective_rpc(
        "execute_model", args=(scheduler_output,), non_block=non_block
    )
    return output[0]

스케줄러가 결정한 배치(SchedulerOutput)를 워커에 전달하고 결과를 수집한다. non_block=True를 사용하면 다음 스케줄링과 현재 모델 실행을 겹칠 수 있다(async scheduling).

Sleep/Wake Up 메커니즘

def sleep(self, level: int = 1):
    self.collective_rpc("sleep", kwargs=dict(level=level))
    self.sleeping_tags = {"weights", "kv_cache"}
    self.is_sleeping = True

def wake_up(self, tags: list[str] | None = None):
    self.collective_rpc("wake_up", kwargs=dict(tags=tags))

vLLM 특유의 sleep/wake_up 메커니즘이다. GPU 메모리를 해제하고(sleep) 필요할 때 다시 할당한다(wake_up). tags로 가중치와 KV 캐시를 선택적으로 관리할 수 있다.

왜 이 설계인가

  1. 전략 패턴: Executor 인터페이스를 통해 실행 환경의 차이를 캡슐화한다. 스케줄러와 엔진 코어는 Executor의 구체적 구현을 알 필요가 없다.

  2. 공유 메모리 메시지 큐: 프로세스 간 통신에서 직렬화/역직렬화 오버헤드를 최소화한다. SchedulerOutput 같은 큰 데이터를 효율적으로 브로드캐스트한다.

  3. 지연 임포트: Ray가 설치되지 않은 환경에서도 vLLM이 정상 동작한다. 실제로 Ray executor를 사용할 때만 Ray를 임포트한다.

  4. Sleep 메커니즘: 멀티테넌시 환경에서 유휴 모델의 GPU 메모리를 반환하고, 다른 모델에게 할당할 수 있다. 클라우드 환경에서 리소스 효율성을 크게 높인다.

정리

Executor는 vLLM 아키텍처에서 스케줄러와 워커를 연결하는 브릿지이다. collective_rpc 하나의 인터페이스로 단일 GPU, 멀티 GPU, 멀티 노드 환경을 투명하게 추상화한다. 새로운 실행 환경(TPU, 커스텀 클러스터 등)을 지원할 때도 Executor만 구현하면 되므로, 확장성의 핵심이다.

댓글

관련 포스트

vLLM 의 다른글