Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ def update_state_after_alloc(self, request: "Request",
"""
Update KVConnector state after block allocation.
"""
if num_external_tokens == 0:
return
self._lmcache_engine.update_state_after_alloc(request,
num_external_tokens)

Expand Down
18 changes: 7 additions & 11 deletions vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,14 @@ def update_state_after_alloc(self, request: "Request",
if params.get("remote_block_ids"):
if all(p in params for p in ("remote_engine_id", "remote_host",
"remote_port")):
# If remote_blocks and num_external_tokens = 0, we have
# a full prefix cache hit on the D worker. We need to call
# send_notif in _read_blocks to free the memory on the P.
local_block_ids = (blocks.get_unhashed_block_ids()
if num_external_tokens > 0 else [])
# Get unhashed blocks to pull from remote.
self._reqs_need_recv[request.request_id] = (
request, blocks.get_unhashed_block_ids())
request, local_block_ids)
Comment on lines +241 to +248
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertgshaw2-redhat I'm still not sure that this part or the change to always call update_state_after_alloc is needed. I'd already added logic for this case in get_num_new_matched_tokens above:

# NOTE: if count is 0 here, we have less than block_size
# tokens to pull after subtracting the local prefix cache hit.
# The remote only sends fully computed blocks, so there is
# nothing to transfer but we still need to notify the
# prefill worker so that the remote blocks are freed.
if all(p in params for p in ("remote_engine_id", "remote_host",
"remote_port")):
self._reqs_need_recv[request.request_id] = (request, [])

I can see that the other two fixes below in build_connector_meta and _read_blocks are of course needed though.

If you think it's better to have this logic in this method then we can remove it from the other one. But again I feel it's logically clearer to not call update_state_after_alloc if 0 was returned from get_num_new_matched_tokens.

Copy link
Collaborator Author

@robertgshaw2-redhat robertgshaw2-redhat May 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that get_num_new_matched_tokens should be a pure function. Adding a side effect to it is surprising given the name of the method and the fact that we will have different behavior depending on what happens if the request is or is not able to be scheduled. This issue is actually causing a bug right now.

  • If allocate_slots returns None, the request will remain in the waiting queue. this will cause us to add the requests to reqs_need_recv more than one and as a result we will call read_blocks twice which will do a double free on the P worker side. Similarly this will happen if the request is preempted (it will get re-added to waiting). This is because we are not properly updating the request to have do_remote_prefill=False when it is added to reqs_need_recv from the get_num_new_matched_tokens function.

This is all just evidence that putting a side effect into this function is not a good idea. The update_state_after_alloc is where we should handle everything related to reqs_need_recv so we have a single place where all the logic is handled.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed those lines from get_num_new_matched_tokens

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertgshaw2-redhat that makes sense, I agree about the pure function thing. I did also notice the fact that this could result in a double free on the P worker side in the case that it can't be scheduled, which isn't ideal (though I think would probably be harmless).

But to me, thinking from the pov of a generic connector interface, it still feels a bit odd given the connector isn't offering any tokens. I guess we should very clearly document the semantics and expectations for the interface.

A related quirk is that in the async load case, I think currently update_state_after_alloc will be called twice for a request (a second time once the request moves out of WAITING_FOR_REMOTE_KVS).

else:
logger.warning(
"Got invalid KVTransferParams: %s. This "
Expand All @@ -259,15 +264,6 @@ def build_connector_meta(
# Loop through scheduled reqs and convert to ReqMeta.
for req_id, (req, block_ids) in self._reqs_need_recv.items():
assert req.kv_transfer_params is not None
# For the case where there are no remote blocks to pull
# (block_ids is empty), we don't need to schedule
# an async read on the worker side.
if not block_ids:
logger.debug(
"Skipping adding request %s to NixlConnectorMetadata, "
"as there are no remote blocks to pull", req_id)
continue

meta.add_new_req(
request_id=req_id,
local_block_ids=block_ids,
Expand Down Expand Up @@ -731,7 +727,7 @@ def _read_blocks(
# just notify P worker that we have the blocks we need.
num_local_blocks = len(local_block_ids)
if num_local_blocks == 0:
self.nixl_wrapper.send_notif(dst_engine_id,
self.nixl_wrapper.send_notif(self._remote_agents[dst_engine_id],
notif_msg=request_id.encode("utf-8"))
return

Expand Down
6 changes: 3 additions & 3 deletions vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,11 @@ def schedule(self) -> SchedulerOutput:
# The request cannot be scheduled.
break

# KVConnector: update internal state after allocation.
# KVTransfer: the connector uses this info to determine
# if a load is needed. Note that
# This information is used to determine if a load is
# needed for this request.
if num_external_computed_tokens:
assert self.connector is not None
if self.connector is not None:
self.connector.update_state_after_alloc(
request,
new_computed_blocks + new_blocks,
Expand Down