Skip to content

Commit 8e7d305

Browse files
committed
adapt config refactor.
Signed-off-by: LiuBingyu <liubingyu62@gmail.com>
1 parent 8a9add1 commit 8e7d305

5 files changed

Lines changed: 351 additions & 26 deletions

File tree

tests/e2e/online_serving/test_qwen3_omni.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
models = ["Qwen/Qwen3-Omni-30B-A3B-Instruct"]
2020

21-
# Set VLLM_TEST_PD_MODE=1 to test PD disaggregation (follow-up — deploy overlay not yet migrated).
21+
# Set VLLM_TEST_PD_MODE=1 to test PD disaggregation via the deploy config's
22+
# ``pd_separation`` section.
2223
_USE_PD = os.environ.get("VLLM_TEST_PD_MODE", "0") == "1"
2324

2425
_CI_DEPLOY = get_deploy_config_path("ci/qwen3_omni_moe.yaml")
@@ -37,6 +38,23 @@ def get_chunk_config(config_path: str | None = None):
3738
return modify_stage_config(config_path, updates={"async_chunk": True})
3839

3940

41+
def get_pd_config(config_path: str | None = None):
42+
"""Load the qwen3_omni CI deploy yaml with PD separation enabled."""
43+
if config_path is None:
44+
config_path = _CI_DEPLOY
45+
return modify_stage_config(
46+
config_path,
47+
updates={
48+
"pd_separation.enabled": True,
49+
"pd_separation.async_chunk": False,
50+
"stages": {
51+
1: {"devices": "2"},
52+
2: {"devices": "2"},
53+
},
54+
},
55+
)
56+
57+
4058
def get_prefix_caching_config(config_path: str):
4159
"""Create a stage config with prefix caching enabled on the thinker (stage 0)."""
4260
path = modify_stage_config(
@@ -52,10 +70,9 @@ def get_prefix_caching_config(config_path: str):
5270

5371
# Platform-specific overrides live inside the new deploy yaml's ``platforms:``
5472
# section, so a single ``_CI_DEPLOY`` path serves CUDA, ROCm, and XPU.
55-
# TODO: re-add VLLM_TEST_PD_MODE branch once the PD-disaggregation deploy
56-
# overlay has been migrated to the new schema (previously used the deleted
57-
# ``qwen3_omni_moe_pd_ci.yaml`` stage-configs file).
58-
if current_omni_platform.is_xpu():
73+
if _USE_PD:
74+
stage_configs = [get_pd_config()]
75+
elif current_omni_platform.is_xpu():
5976
stage_configs = [_CI_DEPLOY]
6077
else: # CUDA + ROCm MI325 share the same deploy config
6178
stage_configs = [get_chunk_config()]
@@ -111,7 +128,6 @@ def get_max_batch_size(size_type="few"):
111128
@pytest.mark.advanced_model
112129
@pytest.mark.core_model
113130
@pytest.mark.omni
114-
@pytest.mark.skipif(_USE_PD, reason="Temporarily skip PD mode in this test module.")
115131
@hardware_test(res={"cuda": "H100", "rocm": "MI325"}, num_cards=3 if _USE_PD else 2)
116132
@pytest.mark.parametrize("omni_server", test_params, indirect=True)
117133
def test_mix_to_text_audio_001(omni_server, openai_client) -> None:
@@ -151,7 +167,6 @@ def test_mix_to_text_audio_001(omni_server, openai_client) -> None:
151167
@pytest.mark.advanced_model
152168
@pytest.mark.core_model
153169
@pytest.mark.omni
154-
@pytest.mark.skipif(_USE_PD, reason="Temporarily skip PD mode in this test module.")
155170
@hardware_test(res={"cuda": "H100", "rocm": "MI325"}, num_cards=3 if _USE_PD else 2)
156171
@pytest.mark.parametrize("omni_server", test_params, indirect=True)
157172
def test_text_to_text_001(omni_server, openai_client) -> None:

tests/entrypoints/test_pd_disaggregation.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,41 +1081,49 @@ def test_pop_uses_fallback_when_no_stored(self, monkeypatch):
10811081

10821082

10831083
class TestPDYAMLConfig:
1084-
def test_pd_yaml_loads(self):
1085-
"""The PD separation YAML config should load without errors."""
1084+
def test_pd_yaml_loads(self, tmp_path):
1085+
"""PD deploy config should merge into a 4-stage runtime config."""
10861086
import os
10871087

1088-
yaml_path = os.path.join(
1089-
os.path.dirname(__file__),
1090-
"../../vllm_omni/model_executor/stage_configs/qwen3_omni_moe_pd_separation.yaml",
1091-
)
1092-
yaml_path = os.path.abspath(yaml_path)
1093-
if not os.path.exists(yaml_path):
1094-
pytest.skip("PD separation YAML not found")
1088+
import vllm_omni.model_executor.models.qwen3_omni.pipeline # noqa: F401
1089+
from vllm_omni.config.stage_config import _PIPELINE_REGISTRY, load_deploy_config, merge_pipeline_deploy
10951090

1096-
from omegaconf import OmegaConf
1091+
base_path = os.path.abspath(
1092+
os.path.join(os.path.dirname(__file__), "../../vllm_omni/deploy/qwen3_omni_moe.yaml")
1093+
)
1094+
if not os.path.exists(base_path):
1095+
pytest.skip("Qwen3-Omni deploy config not found")
1096+
1097+
overlay = tmp_path / "qwen3_omni_pd_overlay.yaml"
1098+
overlay.write_text(
1099+
f"base_config: {base_path}\n"
1100+
"pd_separation:\n"
1101+
" enabled: true\n"
1102+
" async_chunk: false\n",
1103+
encoding="utf-8",
1104+
)
10971105

1098-
cfg = OmegaConf.load(yaml_path)
1099-
stages = cfg.stage_args
1106+
deploy = load_deploy_config(overlay)
1107+
stages = merge_pipeline_deploy(_PIPELINE_REGISTRY["qwen3_omni_moe"], deploy)
11001108
assert len(stages) == 4
11011109

11021110
# Prefill stage
1103-
assert stages[0].is_prefill_only is True
1111+
assert stages[0].yaml_extras["is_prefill_only"] is True
11041112
assert stages[0].final_output is False
11051113
assert stages[0].is_comprehension is True
11061114

11071115
# Decode stage
1108-
assert stages[1].is_decode_only is True
1116+
assert stages[1].yaml_extras["is_decode_only"] is True
11091117
assert stages[1].final_output is True
11101118
assert stages[1].final_output_type == "text"
11111119
assert stages[1].is_comprehension is True
1112-
assert 0 in stages[1].engine_input_source
1120+
assert 0 in stages[1].input_sources
11131121

11141122
# KV transfer configs
1115-
assert stages[0].engine_args.kv_transfer_config.kv_role == "kv_producer"
1116-
assert stages[1].engine_args.kv_transfer_config.kv_role == "kv_consumer"
1117-
assert stages[0].engine_args.kv_transfer_config.kv_connector == "MooncakeConnector"
1118-
assert stages[1].engine_args.kv_transfer_config.kv_connector == "MooncakeConnector"
1123+
assert stages[0].yaml_engine_args["kv_transfer_config"]["kv_role"] == "kv_producer"
1124+
assert stages[1].yaml_engine_args["kv_transfer_config"]["kv_role"] == "kv_consumer"
1125+
assert stages[0].yaml_engine_args["kv_transfer_config"]["kv_connector"] == "MooncakeConnector"
1126+
assert stages[1].yaml_engine_args["kv_transfer_config"]["kv_connector"] == "MooncakeConnector"
11191127

11201128

11211129
class TestPrefillStopNeutralization:

tests/test_config_factory.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,62 @@ def test_merge_pipeline_deploy(self):
812812
assert s0.yaml_engine_args["engine_output_type"] == "latent"
813813
assert s0.yaml_extras["default_sampling_params"]["detokenize"] is True
814814

815+
def test_merge_pipeline_deploy_with_pd_separation(self, tmp_path):
816+
from pathlib import Path
817+
818+
import vllm_omni.model_executor.models.qwen3_omni.pipeline # noqa: F401
819+
from vllm_omni.config.stage_config import load_deploy_config, merge_pipeline_deploy
820+
821+
pipeline = _PIPELINE_REGISTRY["qwen3_omni_moe"]
822+
base = Path(__file__).parent.parent / "vllm_omni" / "deploy" / "qwen3_omni_moe.yaml"
823+
if not base.exists():
824+
pytest.skip("Deploy config not found")
825+
826+
overlay = tmp_path / "qwen3_omni_pd.yaml"
827+
overlay.write_text(
828+
f"base_config: {base}\n"
829+
"pd_separation:\n"
830+
" enabled: true\n"
831+
" target_stage_id: 0\n"
832+
" async_chunk: false\n"
833+
" stages:\n"
834+
" - role: prefill\n"
835+
" max_num_seqs: 16\n"
836+
" devices: \"0\"\n"
837+
" engine_extras:\n"
838+
" kv_transfer_config:\n"
839+
" kv_connector: MooncakeConnector\n"
840+
" kv_role: kv_producer\n"
841+
" kv_rank: 0\n"
842+
" kv_parallel_size: 2\n"
843+
" - role: decode\n"
844+
" max_num_seqs: 64\n"
845+
" devices: \"1\"\n"
846+
" engine_extras:\n"
847+
" kv_transfer_config:\n"
848+
" kv_connector: MooncakeConnector\n"
849+
" kv_role: kv_consumer\n"
850+
" kv_rank: 1\n"
851+
" kv_parallel_size: 2\n",
852+
encoding="utf-8",
853+
)
854+
855+
deploy = load_deploy_config(overlay)
856+
stages = merge_pipeline_deploy(pipeline, deploy)
857+
858+
assert len(stages) == 4
859+
assert stages[0].yaml_extras["is_prefill_only"] is True
860+
assert stages[1].yaml_extras["is_decode_only"] is True
861+
assert stages[1].input_sources == [0]
862+
assert stages[2].input_sources == [1]
863+
assert stages[3].input_sources == [2]
864+
assert stages[0].yaml_engine_args.get("async_chunk") is not True
865+
assert stages[1].yaml_engine_args.get("custom_process_next_stage_input_func") is None
866+
assert stages[0].yaml_engine_args["kv_transfer_config"]["kv_role"] == "kv_producer"
867+
assert stages[1].yaml_engine_args["kv_transfer_config"]["kv_role"] == "kv_consumer"
868+
assert stages[2].yaml_extras["input_connectors"] == {"from_stage_1": "connector_of_shared_memory"}
869+
assert stages[3].yaml_extras["input_connectors"] == {"from_stage_2": "connector_of_shared_memory"}
870+
815871

816872
class TestQwen3OmniPipeline:
817873
def test_registered(self):

0 commit comments

Comments
 (0)