Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions docs/source/features/disagg-serving.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,95 @@ Example (two-node deployment):
- **Client entrypoint**
- Send requests or use a load balancer forwarding to `node-a:8000` and `node-b:8000`

### Coordinator and Worker Fleet

A single disaggregated server process is itself a single-threaded orchestrator and can become a throughput bottleneck (it terminates every client connection, runs routing, and proxies the ctx→gen hop). To scale the orchestrator on one node without standing up multiple independent instances, `trtllm-serve disaggregated` can run a **fleet** of stateless disaggregated-server worker processes behind a shared **coordinator**.

The two roles split as follows:

- **Coordinator** — a single process that owns all cluster state: the ctx/gen routers, worker readiness, and (for the KV-cache-aware router) the single ZMQ event-ingest endpoint. It exposes an internal coordination API (`/select`, `/finish`, `/cluster_info`, `/health`).
- **Fleet workers** — `num_workers` stateless disaggregated servers sharing the public port (via uvicorn `--workers`). Each holds a lightweight delegating client: it computes the routing key locally (e.g. block hashes) and delegates the placement decision to the coordinator over HTTP. Workers own no routing state, so routing stays globally consistent no matter which worker terminates a connection.

This is controlled by two fields in the disaggregated config:

- `num_workers` (int, default `1`) — number of disaggregated-server worker processes to run on the public port.
- `disagg_coordinator_url` (str, optional) — URL of an already-running coordinator. When set, this process starts **no** coordinator and its fleet delegates to that external one.

The three resulting topologies:

| `num_workers` | `disagg_coordinator_url` | Behavior |
|---------------|--------------------------|----------|
| `1` | unset | Single self-contained server with an in-process coordinator (the default; unchanged from earlier examples). |
| `> 1` | unset | An **implicit** coordinator starts in this process (on `port - 1`) and a fleet of `num_workers` delegating servers runs on the public port. |
| any | set | **No** coordinator starts here; a fleet of `num_workers` delegating servers points at the external `disagg_coordinator_url`. |

```{note}
The fleet is most useful with a *stateful* router (`kv_cache_aware`, `conversation`) where placement must be globally consistent — that decision is delegated to the coordinator. With a *stateless* router (`round_robin`, `load_balancing`) each worker simply places locally and no coordinator round-trip occurs.
```

#### Example: implicit coordinator + 4-worker fleet

Extend the `disagg_config.yaml` from the [trtllm-serve](#trtllm-serve) example with `num_workers` and a router type:

```yaml
hostname: localhost
port: 8000
backend: pytorch
# Run 4 stateless disaggregated-server workers on port 8000, with an implicit
# coordinator started in-process on port 7999 (port - 1).
num_workers: 4
context_servers:
num_instances: 2
urls:
- "localhost:8001"
- "localhost:8002"
router:
type: kv_cache_aware
generation_servers:
num_instances: 1
urls:
- "localhost:8003"
router:
type: kv_cache_aware
```

Launch it exactly as before — the coordinator and fleet are started for you:

```
trtllm-serve disaggregated -c disagg_config.yaml
```

Clients still send requests to the public endpoint (`localhost:8000`); the fleet transparently delegates routing to the coordinator.

#### Example: external coordinator

To point a fleet at a coordinator already running elsewhere (for example, one shared across nodes), set `disagg_coordinator_url` and omit the coordinator from this process:

```yaml
hostname: localhost
port: 8000
backend: pytorch
num_workers: 4
disagg_coordinator_url: "http://coordinator-host:7999"
context_servers:
num_instances: 2
urls:
- "localhost:8001"
- "localhost:8002"
router:
type: kv_cache_aware
generation_servers:
num_instances: 1
urls:
- "localhost:8003"
router:
type: kv_cache_aware
```

```{note}
A fleet worker fails fast if its coordinator is unreachable: on startup it probes the coordinator's `/cluster_info` with bounded retry (up to `--server_start_timeout` seconds) and exits with an error rather than coming up and returning `Cluster is not ready` for every request.
```

## Environment Variables

TRT-LLM uses some environment variables to control the behavior of disaggregated service.
Expand Down
219 changes: 203 additions & 16 deletions tensorrt_llm/commands/serve.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import atexit
import gc
import importlib
import inspect
Expand Down Expand Up @@ -1238,6 +1239,43 @@ def disaggregated(
# Inherited by child processes via env var; used for deduplication at query time.
os.environ[DisaggLauncherEnvs.TLLM_DISAGG_DEPLOYMENT_ID] = uuid.uuid4().hex

metadata_server_cfg = parse_metadata_server_config_file(
metadata_server_config_file)

# Disable GC by default (see note below).
if os.getenv("TRTLLM_DISAGG_SERVER_DISABLE_GC", "1") == "1":
gc.disable()

# Startup topology is driven by explicit config (num_workers +
# disagg_coordinator_url), NOT the WEB_CONCURRENCY env var. Three cases:
# (a) disagg_coordinator_url set -> don't start a coordinator here; the
# fleet delegates to that external one (num_workers sizes the fleet).
# (b) url absent, num_workers>1 -> start an implicit in-process
# coordinator (port-1) + a delegating uvicorn fleet on the public port.
# (c) url absent, num_workers==1 -> one self-contained server with a local
# (in-process) coordinator.
num_workers = disagg_cfg.num_workers
coordinator_url = disagg_cfg.disagg_coordinator_url

if coordinator_url:
# (a) External coordinator: fork a fleet of delegating servers (or a
# single one) pointed at it; never start a coordinator in this process.
_serve_disagg_fleet(disagg_cfg, config_file,
metadata_server_config_file, request_timeout,
server_start_timeout, num_workers, coordinator_url)
return

if num_workers > 1:
# (b) Implicit coordinator in this process (on port-1) + a delegating
# uvicorn fleet (workers=N) on the public port. See below.
_serve_coordinator_and_fleet(disagg_cfg, config_file,
metadata_server_config_file,
metadata_server_cfg, request_timeout,
server_start_timeout, num_workers)
return

# (c) num_workers==1, no external coordinator: a single disagg server with an
# in-process (local) coordinator. Pre-bind the socket (validates port), serve.
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind((disagg_cfg.hostname, disagg_cfg.port))
Expand All @@ -1246,32 +1284,171 @@ def disaggregated(
f"Failed to bind socket to {disagg_cfg.hostname}:{disagg_cfg.port}: {e}"
)

metadata_server_cfg = parse_metadata_server_config_file(
metadata_server_config_file)

server = OpenAIDisaggServer(
config=disagg_cfg,
req_timeout_secs=request_timeout,
server_start_timeout_secs=server_start_timeout,
metadata_server_cfg=metadata_server_cfg,
metrics_interval_secs=metrics_log_interval)

# Disable GC by default
# When concurrency is high, the number of Python objects increases, so
# GC runs frequently and takes a long time to process. In this case,
# requests are not immediately forwarded to CTX workers and GEN workers,
# causing them to run with small batch sizes. Disabling GC can mitigate
# this problem.
# By testing this feature, we didn't observe significant RSS or VMS
# increment, and observed that `count0` (obtained by `gc.get_count()`)
# increases by fewer than 1,000 after every 200,000 requests, while the
# maximum value of `count0` exceeded 3,000,000 during the test.
if os.getenv("TRTLLM_DISAGG_SERVER_DISABLE_GC", "1") == "1":
gc.disable()

asyncio.run(server(disagg_cfg.hostname, disagg_cfg.port, sockets=[s]))


def _launch_disagg_fleet(disagg_cfg, config_file, metadata_server_config_file,
request_timeout, server_start_timeout, num_workers,
coordinator_url):
"""Fork a uvicorn fleet of delegating disagg servers pointed at ``coordinator_url``.

Each worker is an ordinary ``OpenAIDisaggServer`` built with ``coordinator_url``
so it holds a remote ``CoordinatorClient``. Invoked as ``python -m uvicorn`` so
there is no bespoke worker command; uvicorn owns the shared socket, worker
supervision, and graceful shutdown. MPI/PMIX/SLURM env is stripped so a worker
(a plain HTTP process) never joins an MPI namespace. Returns the Popen handle.
"""
public_host, public_port = disagg_cfg.hostname, disagg_cfg.port
child_env = {
k: v for k, v in os.environ.items()
if not k.startswith(("SLURM_", "PMIX_", "PMI_", "OMPI_", "UCX_",
"I_MPI_", "HYDRA_", "MPI_"))
}
# num_workers is explicit config now; ensure no stale WEB_CONCURRENCY leaks in
# and re-forks each plain-HTTP worker into a nested fleet.
child_env.pop("WEB_CONCURRENCY", None)
child_env[DisaggWorkerEnvs.TLLM_DISAGG_COORDINATOR_URL] = coordinator_url
child_env[DisaggWorkerEnvs.TLLM_DISAGG_CONFIG_FILE] = os.path.abspath(
config_file)
if metadata_server_config_file:
child_env[DisaggWorkerEnvs.TLLM_DISAGG_METADATA_CONFIG_FILE] = \
os.path.abspath(metadata_server_config_file)
child_env[DisaggWorkerEnvs.TLLM_DISAGG_REQUEST_TIMEOUT] = str(request_timeout)
child_env[DisaggWorkerEnvs.TLLM_DISAGG_SERVER_START_TIMEOUT] = str(
server_start_timeout)
cmd = [sys.executable, "-m", "uvicorn", "--factory",
"--host", str(public_host), "--port", str(public_port),
"--workers", str(num_workers), "--timeout-keep-alive", "10",
"tensorrt_llm.commands.serve:create_disagg_server_app"]
logger.info(f"Launching disagg fleet: {num_workers} uvicorn workers on "
f"{public_host}:{public_port}, coordinator={coordinator_url}")
logger.info(f"Disagg fleet command: {' '.join(cmd)}")
fleet = subprocess.Popen(cmd, env=child_env, stdout=sys.stdout,
stderr=sys.stderr, start_new_session=True)
logger.info(f"Disagg fleet launched (pid={fleet.pid})")

def _cleanup():
if fleet.poll() is None:
fleet.terminate()
try:
fleet.wait(timeout=10)
except Exception:
fleet.kill()

atexit.register(_cleanup)
return fleet


def _serve_disagg_fleet(disagg_cfg, config_file, metadata_server_config_file,
request_timeout, server_start_timeout, num_workers,
coordinator_url):
"""External coordinator: fork the delegating fleet and block on it.

No coordinator is started in this process -- the fleet delegates to the
already-running coordinator at ``coordinator_url``.
"""
fleet = _launch_disagg_fleet(disagg_cfg, config_file,
metadata_server_config_file, request_timeout,
server_start_timeout, num_workers,
coordinator_url)
rc = fleet.wait()
if rc != 0:
raise RuntimeError(f"Disagg fleet exited with code {rc}")


def _serve_coordinator_and_fleet(disagg_cfg, config_file,
metadata_server_config_file,
metadata_server_cfg, request_timeout,
server_start_timeout, num_workers):
"""workers>1, no external URL: coordinator server here + a delegating fleet.

This process runs the coordinator server (owns the ctx/gen routers, cluster
state, and centralized ZMQ ingest) on ``port-1``; a uvicorn fleet on the
public port delegates to it.
"""
from tensorrt_llm.serve.coordinator_server import CoordinatorServer
from tensorrt_llm.serve.disagg_coordinator import DisaggCoordinatorService

public_host, public_port = disagg_cfg.hostname, disagg_cfg.port
coord_port = int(os.environ.get("TLLM_DISAGG_COORDINATOR_PORT",
public_port - 1))
coord_url = f"http://{public_host}:{coord_port}"

# 1. Launch the delegating fleet pointed at the implicit coordinator we start
# below (port-1). Workers hold CoordinatorClients (no core), so they can't
# race the coordinator's single ZMQ ingest bind.
_launch_disagg_fleet(disagg_cfg, config_file, metadata_server_config_file,
request_timeout, server_start_timeout, num_workers,
coord_url)

# 2. Build + serve the coordinator in this process. It OWNS routing state and
# builds the owner routers itself (single shared namespace-aware core + ONE
# ZMQ ingest server, started once here).
def _client_factory(router, role, max_retries=1):
from tensorrt_llm.serve.openai_client import OpenAIHttpClient
return OpenAIHttpClient(router, role, request_timeout, max_retries)

coordinator = DisaggCoordinatorService(
disagg_cfg, _client_factory,
metadata_config=metadata_server_cfg,
server_start_timeout_secs=server_start_timeout)
logger.info(f"Coordinator serving on {public_host}:{coord_port} "
f"(fleet on public port {public_port})")
asyncio.run(CoordinatorServer(coordinator)(public_host, coord_port))


def create_disagg_server_app():
"""uvicorn import-string factory: build one disagg server's FastAPI app.

Rebuilt inside each uvicorn worker process (workers=N). Fully stateless --
reads config + coordinator URL from the env ``_serve_coordinator_and_fleet``
exported (see ``DisaggWorkerEnvs``); the server holds a remote
``CoordinatorClient`` so routing/readiness are delegated to the coordinator.
"""
# A worker is a plain HTTP process, never an MPI rank; drop WEB_CONCURRENCY so
# it is never itself re-forked into multiple uvicorn workers.
os.environ.pop("WEB_CONCURRENCY", None)
if os.getenv("TRTLLM_DISAGG_SERVER_DISABLE_GC", "1") == "1":
gc.disable()

# All N fleet workers share one stdout; tag every trtllm log line from this
# worker with its PID so interleaved fleet output is attributable.
import logging as _logging
for _h in _logging.getLogger("TRT-LLM").handlers:
_h.setFormatter(_logging.Formatter(
fmt=f"[%(asctime)s] [fleet-worker pid={os.getpid()}] %(message)s",
datefmt="%m/%d/%Y-%H:%M:%S"))

config_file = os.environ[DisaggWorkerEnvs.TLLM_DISAGG_CONFIG_FILE]
coordinator_url = os.environ[DisaggWorkerEnvs.TLLM_DISAGG_COORDINATOR_URL]
metadata_config_file = os.environ.get(
DisaggWorkerEnvs.TLLM_DISAGG_METADATA_CONFIG_FILE)
request_timeout = int(os.environ.get(
DisaggWorkerEnvs.TLLM_DISAGG_REQUEST_TIMEOUT, "180"))
server_start_timeout = int(os.environ.get(
DisaggWorkerEnvs.TLLM_DISAGG_SERVER_START_TIMEOUT, "180"))

disagg_cfg = parse_disagg_config_file(config_file)
metadata_server_cfg = parse_metadata_server_config_file(
metadata_config_file)

server = OpenAIDisaggServer(
config=disagg_cfg,
req_timeout_secs=request_timeout,
server_start_timeout_secs=server_start_timeout,
metadata_server_cfg=metadata_server_cfg,
coordinator_url=coordinator_url)
logger.info(f"Disagg server app built, coordinator={coordinator_url}")
return server.app


def set_cuda_device():
if (os.getenv("OMPI_COMM_WORLD_RANK")):
env_global_rank = int(os.environ["OMPI_COMM_WORLD_RANK"])
Expand Down Expand Up @@ -1367,6 +1544,16 @@ class DisaggLauncherEnvs(StrEnum):
TLLM_DISAGG_ROLE = "TRTLLM_DISAGG_ROLE"


class DisaggWorkerEnvs(StrEnum):
# Passed from the `disaggregated` coordinator to the forked worker fleet
# (uvicorn workers=N) via env, then read by create_disagg_server_app in each worker.
TLLM_DISAGG_COORDINATOR_URL = "TRTLLM_DISAGG_COORDINATOR_URL"
TLLM_DISAGG_CONFIG_FILE = "TRTLLM_DISAGG_CONFIG_FILE"
TLLM_DISAGG_METADATA_CONFIG_FILE = "TRTLLM_DISAGG_METADATA_CONFIG_FILE"
TLLM_DISAGG_REQUEST_TIMEOUT = "TRTLLM_DISAGG_REQUEST_TIMEOUT"
TLLM_DISAGG_SERVER_START_TIMEOUT = "TRTLLM_DISAGG_SERVER_START_TIMEOUT"


def _launch_disaggregated_server(disagg_config_file: str, llm_args: dict):
# Launching the server
instance_idx = os.environ.get(DisaggLauncherEnvs.TLLM_DISAGG_INSTANCE_IDX)
Expand Down
14 changes: 14 additions & 0 deletions tensorrt_llm/llmapi/disagg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ class DisaggServerConfig():
# the orchestrator relays a string instead of materializing the token-id list
# on its event loop. Text-only, non-harmony deployments (see _get_ctx_request).
gen_tokids_ctxbytes: bool = False
# Number of uvicorn disagg-server worker processes to fork on the public port.
# >1 means a fleet of delegating servers behind one coordinator. Replaces the
# WEB_CONCURRENCY env var (explicit config over implicit env).
num_workers: int = 1
# URL of an already-running coordinator (e.g. "http://host:8332"). When set,
# this process does NOT start a coordinator -- the fleet delegates to this
# external one. When absent and num_workers>1, an implicit coordinator is
# started in-process. When absent and num_workers==1, a single self-contained
# server with a local (in-process) coordinator is run.
disagg_coordinator_url: Optional[str] = None


@dataclass
Expand Down Expand Up @@ -145,6 +155,8 @@ def extract_disagg_cfg(hostname: str = 'localhost',
'generation_first'] = 'context_first',
gen_strip_message_history: bool = False,
gen_tokids_ctxbytes: bool = False,
num_workers: int = 1,
disagg_coordinator_url: Optional[str] = None,
**kwargs: Any) -> DisaggServerConfig:
context_servers = context_servers or {}
generation_servers = generation_servers or {}
Expand Down Expand Up @@ -194,6 +206,8 @@ def extract_disagg_cfg(hostname: str = 'localhost',
config.schedule_style = schedule_style
config.gen_strip_message_history = gen_strip_message_history
config.gen_tokids_ctxbytes = gen_tokids_ctxbytes
config.num_workers = num_workers
config.disagg_coordinator_url = disagg_coordinator_url
return config


Expand Down
Loading
Loading