-
-
Notifications
You must be signed in to change notification settings - Fork 11.6k
[P/D] Add P/D disaggregation deployment on Ray #29649
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[P/D] Add P/D disaggregation deployment on Ray #29649
Conversation
|
Documentation preview: https://vllm--29649.org.readthedocs.build/en/29649/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant new feature: Prefill-Decode (P/D) disaggregation deployment on Ray via a new pdjob CLI command. This allows for better resource utilization and independent scaling of prefill and decode workers. The implementation includes new base classes, configuration parsing from YAML, and the CLI entrypoint itself. While the feature is well-structured, I've identified a critical security vulnerability and several high-severity issues related to configuration, maintainability, and efficiency that should be addressed before merging.
| process = subprocess.Popen( | ||
| self._command, | ||
| shell=True, | ||
| stdout=subprocess.PIPE, | ||
| stderr=subprocess.PIPE, | ||
| text=True | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ProxyServer.start method uses subprocess.Popen with shell=True to execute a command string that is constructed from the user-provided configuration file. This is a critical command injection vulnerability. A malicious user could provide a crafted command in the scheduler.command field of the YAML config (e.g., "my_script.py; rm -rf /") to execute arbitrary code on the machine running the proxy server.
To fix this, you should parse the command string into a list of arguments and execute it with shell=False. Please use shlex.split() for safe parsing. You will need to add import shlex at the top of the file.
| process = subprocess.Popen( | |
| self._command, | |
| shell=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True | |
| ) | |
| # Use shlex.split to avoid command injection vulnerabilities. | |
| cmd_list = shlex.split(self._command) | |
| process = subprocess.Popen( | |
| cmd_list, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True) |
| decode_dp: 8 | ||
|
|
||
| # Tensor parallelism size for prefill workers | ||
| prefill_tp: 8 | ||
|
|
||
| # Tensor parallelism size for decode workers | ||
| decode_tp: 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The configuration for the decode worker in the scheduler section is inconsistent with the settings in the decode section. Specifically, decode_dp is 8 and decode_tp is 1 in the scheduler config, while data_parallel_size is 4 and tensor_parallel_size is 2 in the decode worker config. This will be confusing for users and may lead to incorrect behavior if the scheduler uses these values for its logic. The example configuration should be internally consistent.
decode_dp: 4
# Tensor parallelism size for prefill workers
prefill_tp: 8
# Tensor parallelism size for decode workers
decode_tp: 2| # --decode $DECODE_URL_1 --decode $DECODE_URL_2 \ | ||
| # --host 0.0.0.0 --port {PORT}" | ||
| # ------------------------------------------------------------------------- | ||
| command: "python tests/v1/kv_connector/nixl_integration/toy_proxy_server.py --port {PORT} --prefiller-hosts $PREFILL_HOST_1 $PREFILL_HOST_2 --prefiller-ports $PREFILL_PORT_1 $PREFILL_PORT_2 --decoder-hosts $DECODE_HOST_1 $DECODE_HOST_2 --decoder-ports $DECODE_PORT_1 $DECODE_PORT_2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The example configuration for the scheduler command points to a toy_proxy_server.py script located within the tests directory. User-facing examples should not rely on test code, as this is confusing and not a good practice. Please update the example to use a production-ready proxy/scheduler script, such as one of the implementations in vllm/entrypoints/cli/pd/llm_scheduler/.
| if not ray.get_gpu_ids(): | ||
| # hack the flashMLA gpu devices check | ||
| # When current worker has no GPU, try to get available GPU from placement group | ||
|
|
||
| @ray.remote | ||
| def _get_gpu_ids(): | ||
| return ray.get_gpu_ids() | ||
|
|
||
| current_placement_group = ray.util.get_current_placement_group() | ||
| if current_placement_group: | ||
| print("Trying get one possible GPU id from current placement group") | ||
| gpu_ids = [] | ||
| # Iterate through all bundles in placement group to find available GPUs | ||
| for idx in range(current_placement_group.bundle_count): | ||
| scheduling_strategy = PlacementGroupSchedulingStrategy( | ||
| placement_group=current_placement_group, | ||
| placement_group_bundle_index=idx, | ||
| ) | ||
| result = ray.get(_get_gpu_ids.options( | ||
| num_cpus=0, | ||
| num_gpus=1, | ||
| scheduling_strategy=scheduling_strategy).remote()) | ||
|
|
||
| if result: | ||
| gpu_ids.extend([str(i) for i in result]) | ||
|
|
||
| if not gpu_ids: | ||
| print(f"No GPUs available for this placement group: {current_placement_group}") | ||
| else: | ||
| os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(gpu_ids) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method for obtaining GPU IDs when a worker doesn't have a GPU is inefficient. It launches a remote task for each bundle in the placement group just to get ray.get_gpu_ids(). This can be slow and add significant overhead to service startup time, especially with a large number of bundles. As the comment mentions, this is a "hack".
A more efficient approach would be to inspect the placement group's properties directly. You could explore using pg.bundle_specs to get information about the resources allocated to each bundle without launching remote tasks.
| supported_connectors = ( | ||
| "nixlconnector", | ||
| "p2pncclconnector", | ||
| "multiconnector", | ||
| "sharedstorageconnector", | ||
| "lmcacheconnectorv1", | ||
| "lmcachempconnector", | ||
| "offloadingconnector", | ||
| "decodebenchconnector", | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The list of supported_connectors is hardcoded. This creates a maintenance burden, as this list will need to be manually updated every time a new KV connector is added or removed. A better approach is to dynamically retrieve the list of registered connectors from the KVConnectorFactory.
Please add from vllm.distributed.kv_transfer.kv_connector.factory import KVConnectorFactory to the imports at the top of the file.
| supported_connectors = ( | |
| "nixlconnector", | |
| "p2pncclconnector", | |
| "multiconnector", | |
| "sharedstorageconnector", | |
| "lmcacheconnectorv1", | |
| "lmcachempconnector", | |
| "offloadingconnector", | |
| "decodebenchconnector", | |
| ) | |
| # Dynamically get supported connectors from the factory. | |
| supported_connectors = tuple( | |
| key.lower() for key in KVConnectorFactory._registry.keys() | |
| ) | |
| # This is more maintainable than a hardcoded list. | |
| # | |
| # | |
| # | |
| # | |
| # |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| for idx in range(prefill_config.replicas): | ||
| rank_id = RankIDGenerator.next_rank_id() | ||
| service = self._start_single_service( | ||
| rank_id, ServiceType.prefill, runtime_env, | ||
| prefill_config.role_params, prefill_config.role_num_gpus, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split role GPU budget across replicas
Each prefill/decode replica calls _start_single_service with prefill_config.role_num_gpus/decode_config.role_num_gpus, which are loaded directly from the role’s num_gpus field (config.py) as the total GPU budget for that role. When replicas > 1, this allocates a full placement group for the entire role on every replica, multiplying the requested GPUs by the replica count and causing placement group creation to fail or hang on any multi-replica deployment. The per-replica placement group should be sized from the role budget divided by replicas (and the decode loop has the same issue below).
Useful? React with 👍 / 👎.
| # connect to an existed ray cluster | ||
| runtime_env = {"env_vars": config.envs, "working_dir": config.working_dir} | ||
| ray.init(log_to_driver=not args.disable_log_to_driver, | ||
| runtime_env=runtime_env) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pdjob does not join existing Ray cluster
The pdjob entrypoint always calls ray.init without an address, despite the comment about connecting to an existing cluster. In Ray this starts a new local cluster by default; it will not attach to the head node you started with ray start --head, so workers on other nodes are never used and the PD deployment silently runs single-node. Pass the cluster address (e.g., address="auto" or the head IP) so the job attaches to the running cluster.
Useful? React with 👍 / 👎.
|
👋 Hi! Thank you for contributing to the vLLM project. 💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels. Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run You ask your reviewers to trigger select CI tests on top of Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging. To run CI, PR reviewers can either: Add If you have any questions, please reach out to us on Slack at https://slack.vllm.ai. 🚀 |
Purpose
This PR introduces a lightweight P/D disaggregation deployment mode for vLLM on Ray.
The main motivation is to provide a Ray‑native deployment option that cleanly separates prefill and decode workers, while keeping the architecture simple and easy to operate. Compared to heavier multi‑component solutions, this approach focuses on:
Better resource utilization: parameters and data workers can be scheduled independently to match heterogeneous cluster resources.
This PR was provided by Tencent. Thanks to @ConeyLiu @SongGuyang for their collaborative development.
Compatible issue #29651
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model.