Skip to content

Conversation

@ST-XX
Copy link
Collaborator

@ST-XX ST-XX commented Nov 26, 2025

Motivation

💡 If this PR is a Cherry Pick, the PR title needs to follow the format by adding the [Cherry-Pick] label at the very beginning and appending the original PR ID at the end. For example, [Cherry-Pick][CI] Add check trigger and logic(#5191)

💡 如若此PR是Cherry Pick,PR标题需遵循格式,在最开始加上[Cherry-Pick]标签,以及最后面加上原PR ID,例如[Cherry-Pick][CI] Add check trigger and logic(#5191)

Modifications

router.log traceback, port type error
race condition [is_fetching] causing multiple fetch requests

Usage or Command

import unittest
import time
import threading
from concurrent.futures import ThreadPoolExecutor

class TestRaceCondition(unittest.TestCase):
    def test_fetch_request_race_condition(self):
        """
        测试目标:证明在原有的逻辑下,_fetch_request 会被重复提交。
        """

        # 1. 模拟环境 setup
        is_fetching = False
        execution_count = 0  # 记录 _fetch_request 真正被执行的次数
        submit_count = 0     # 记录 提交给线程池 的次数

        # 创建一个真实的线程池
        executor = ThreadPoolExecutor(max_workers=4)

        # 2. 模拟原本的 _fetch_request 函数
        def _fetch_request():
            nonlocal is_fetching, execution_count

            # 【关键点】:这里模拟线程启动延迟或调度延迟
            # 在修改 is_fetching 之前,我们强制让线程等一下
            # 只要这个时间大于主循环的一次迭代时间,Bug 就会复现
            time.sleep(0.1)

            # 原有逻辑:在内部修改状态
            is_fetching = True

            # 模拟业务逻辑执行
            with threading.Lock(): # 加锁为了计数准确
                execution_count += 1

            # 模拟业务耗时
            time.sleep(0.05)

            # 结束复位
            is_fetching = False

        # 3. 模拟主循环逻辑
        running = True
        loop_counter = 0

        print("开始模拟主循环...")

        # 我们只跑几次循环,足以复现问题
        while running and loop_counter < 5:
            # 模拟原代码中的 check
            # if self.cfg.scheduler_config.splitwise_role != "mixed": ...

            if not is_fetching:
                print(f"[Loop {loop_counter}] 检测到 is_fetching 为 False,提交任务!")
                submit_count += 1
                executor.submit(_fetch_request)
            else:
                print(f"[Loop {loop_counter}] 正常:检测到 is_fetching 为 True,跳过。")

            # 模拟主循环的间隔(通常非常快,这里设为 0.02秒)
            # 因为 0.02 < 子线程启动延迟(0.1),所以在子线程把 True 改过来之前,
            # 主线程已经跑了好几圈了。
            time.sleep(0.02)
            loop_counter += 1

        # 等待所有任务执行完毕
        executor.shutdown(wait=True)

        print(f"\n结果统计:")
        print(f"主循环提交次数: {submit_count}")
        print(f"实际执行次数: {execution_count}")

        # 4. 断言
        # 如果逻辑正确,应该是提交1次,执行1次。
        # 如果有 Bug,提交次数会 > 1
        self.assertGreater(submit_count, 1, "Bug复现:任务被重复提交了!")
        self.assertGreater(execution_count, 1, "Bug复现:任务被并发执行了!")

if __name__ == '__main__':
    unittest.main()

bash start_v1_tp1.sh

Accuracy Tests

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

@paddle-bot
Copy link

paddle-bot bot commented Nov 26, 2025

Thanks for your contribution!

@codecov-commenter
Copy link

codecov-commenter commented Nov 26, 2025

Codecov Report

❌ Patch coverage is 50.00000% with 1 line in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (develop@f25ee3a). Learn more about missing BASE report.

Files with missing lines Patch % Lines
fastdeploy/engine/common_engine.py 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             develop    #5238   +/-   ##
==========================================
  Coverage           ?   60.53%           
==========================================
  Files              ?      320           
  Lines              ?    39054           
  Branches           ?     5871           
==========================================
  Hits               ?    23642           
  Misses             ?    13549           
  Partials           ?     1863           
Flag Coverage Δ
GPU 60.53% <50.00%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ST-XX ST-XX changed the title [BugFix] RouterArgs port str -> int [BugFix] race condition [is_fetching] causing multiple fetch requests Nov 27, 2025
juncaipeng
juncaipeng previously approved these changes Nov 27, 2025
Copy link
Collaborator

@juncaipeng juncaipeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a race condition in the request scheduling logic where multiple fetch requests could be submitted concurrently due to the is_fetching flag being set inside the worker thread instead of before task submission. The fix correctly moves the flag assignment to occur immediately before the submit() call, eliminating the race window.

Key changes:

  • Fixed race condition by setting is_fetching = True before submit() instead of inside the worker thread
  • Corrected port type from str to int in router configuration for proper type consistency
  • Removed duplicate paddle.to_tensor() call in resource manager

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
fastdeploy/router/router.py Changed port parameter type from str to int for type correctness
fastdeploy/engine/sched/resource_manager_v1.py Removed duplicate paddle.to_tensor() call that was creating the same tensor twice
fastdeploy/engine/common_engine.py Fixed race condition by moving is_fetching = True assignment from inside _fetch_request() to before thread pool submission, preventing multiple concurrent fetch requests. Applied fix to both "mixed" and non-"mixed" execution paths
Comments suppressed due to low confidence (1)

fastdeploy/engine/common_engine.py:817

  • Critical bug: If get_request_pool.submit(_fetch_request) raises a RuntimeError after is_fetching = True is set, the exception handler at lines 812-817 will either break or re-raise the exception without resetting is_fetching back to False. This will permanently block all future fetch request submissions. You need to reset is_fetching = False before the break statement (line 815) and before the raise statement (line 817) to prevent the flag from being stuck in the True state.
                            is_fetching = True
                            get_request_pool.submit(_fetch_request)
                        except RuntimeError as e:
                            if "shutdown" in str(e):
                                llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")
                                break
                            else:
                                raise

You can also share your feedback on Copilot code review for a chance to win a $100 gift card. Take the survey.

Comment on lines +799 to 801
is_fetching = True
get_request_pool.submit(_fetch_request)

Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical bug: If get_request_pool.submit(_fetch_request) at line 800 throws an exception after is_fetching = True is set at line 799, the exception will be caught by the general exception handler (lines 859-861 in the full context), which logs the error but doesn't reset is_fetching back to False. This will permanently block all future fetch request submissions. Consider wrapping lines 799-800 in a try-except block that ensures is_fetching is reset to False if the submission fails, similar to how the "mixed" role path handles this at lines 809-817.

Suggested change
is_fetching = True
get_request_pool.submit(_fetch_request)
try:
is_fetching = True
get_request_pool.submit(_fetch_request)
except RuntimeError as e:
is_fetching = False
if "shutdown" in str(e):
llm_logger.info("Thread pool shutdown detected, exiting scheduler loop")
break
else:
self.llm_logger.error(f"fetching request error {e} {str(traceback.format_exc())}")
except Exception as e:
is_fetching = False
self.llm_logger.error(f"fetching request error {e} {str(traceback.format_exc())}")

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants