Skip to content

Conversation

@lokidundun
Copy link
Contributor

@lokidundun lokidundun commented Nov 14, 2025

Ⅰ. Describe what this PR did

Optimized RM/TM client reconnect logic to prevent duplicate timer creation .

Ⅱ. Does this pull request fix one issue?

fixes #5338

Ⅲ. Why don't you add test cases (unit test/integration test)?

Ⅳ. Describe how to verify it

Ⅴ. Special notes for reviews

@codecov
Copy link

codecov bot commented Nov 14, 2025

Codecov Report

❌ Patch coverage is 88.88889% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 71.56%. Comparing base (b1db26b) to head (003d4ab).
⚠️ Report is 8 commits behind head on 2.x.

Files with missing lines Patch % Lines
...ta/core/rpc/netty/AbstractNettyRemotingClient.java 88.88% 0 Missing and 4 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##                2.x    #7783      +/-   ##
============================================
+ Coverage     71.10%   71.56%   +0.46%     
- Complexity      797      883      +86     
============================================
  Files          1294     1294              
  Lines         49538    49574      +36     
  Branches       5879     5888       +9     
============================================
+ Hits          35222    35477     +255     
+ Misses        11406    11171     -235     
- Partials       2910     2926      +16     
Files with missing lines Coverage Δ
...ta/core/rpc/netty/AbstractNettyRemotingClient.java 77.90% <88.88%> (-2.34%) ⬇️

... and 27 files with indirect coverage changes

Impacted file tree graph

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lokidundun
Copy link
Contributor Author

I've addressed the resource leak issue by removing the static modifier from the reconnect timer and related resources.Am I on the right track?Let me know if further adjustments are needed!If you are convenient,please take a look❤️. @maple525866

@Override
public void destroy() {
if (reconnectTimer != null && timerStarted.get()) {
reconnectTimer.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reconnectTimer runs shutdown(), it does not interrupt ongoing tasks, which means that reconnection may still occur after destruction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside from the reconnection issue, logically speaking, there is also a race condition problem here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reconnectTimer runs shutdown(), it does not interrupt ongoing tasks, which means that reconnection may still occur after destruction.

It means that I should use the reconnectTimer.shutdownNow() not the reconnectTimer.shutdown() to stop all active tasks,right?

Aside from the reconnection issue, logically speaking, there is also a race condition problem here.

It means that I ought to use protected final ReentrantLock mergeLock = new ReentrantLock() to promise the Atomicity,like this?

public void destroy() {
    mergeLock.lock();
    try {
        if (reconnectTimer != null && timerStarted.get()) {
            reconnectTimer.shutdown();
            timerStarted.set(false);
            LOGGER.info("Instance reconnect timer stopped (role: {})", transactionRole.name());
        }
        clientBootstrap.shutdown();
        if (mergeSendExecutorService != null) {
            mergeSendExecutorService.shutdown();
        }
        super.destroy();
    } finally {
        mergeLock.unlock();
    }
}

protected volatile boolean isSending = false;

private final Runnable reconnectTask;
private ScheduledExecutorService reconnectTimer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably better to have a globally unique one here, otherwise there would be multiple reconnectTimers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably better to have a globally unique one here, otherwise there would be multiple reconnectTimers.

I haven't come up with a perfect solution for now. Could you please share your thoughts?❤️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably better to have a globally unique one here, otherwise there would be multiple reconnectTimers.

Or I think we can tackle like this? What do you think?

private volatile ScheduledExecutorService reconnectTimer;
    @Override
    public void init() {
        if (timerStarted.compareAndSet(false, true)) {
            mergeLock.lock();
            try {
                this.reconnectTimer = new ScheduledThreadPoolExecutor(
                        1, new NamedThreadFactory("Reconnect-Timer-" + transactionRole.name(), 1));
                this.reconnectTimer.scheduleAtFixedRate(
                        this.reconnectTask, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
                LOGGER.info("Instance reconnect timer started (role: {})", transactionRole.name());
            }finally {
                mergeLock.unlock();
            }
        }
        if (this.isEnableClientBatchSendRequest()) {
            mergeSendExecutorService = new ThreadPoolExecutor(
                    MAX_MERGE_SEND_THREAD,
                    MAX_MERGE_SEND_THREAD,
                    KEEP_ALIVE_TIME,
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(),
                    new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
            mergeSendExecutorService.submit(new MergedSendRunnable());
        }
        super.init();
        clientBootstrap.start();
    }

this.reconnectTimer = new ScheduledThreadPoolExecutor(
1, new NamedThreadFactory("Reconnect-Timer-" + transactionRole.name(), 1));
this.reconnectTimer.scheduleAtFixedRate(
this.reconnectTask, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reconnectTask is still null at this point; you should initialize reconnectTask before creating the thread pool. Also, why not reuse timerExecutor instead of creating a new scheduled thread pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your detailed feedback! Here are the optimizations I've received and made in response to your comments:
I've initialized reconnectTask in the constructor in advance and adjusted the logic to reuse the existing timerExecutor

    @Override
    public void init() {
        mergeLock.lock();
        try {
            if (timerStarted.compareAndSet(false, true)) {
                timerExecutor.scheduleAtFixedRate(
                        reconnectTask, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
                LOGGER.info("Reconnect timer started (role: {})", transactionRole.name());
            }

            if (this.isEnableClientBatchSendRequest()) {
                mergeSendExecutorService = new ThreadPoolExecutor(
                        MAX_MERGE_SEND_THREAD,
                        MAX_MERGE_SEND_THREAD,
                        KEEP_ALIVE_TIME,
                        TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>(),
                        new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
                mergeSendExecutorService.submit(new MergedSendRunnable());
            }

            super.init();
            clientBootstrap.start();
        } finally {
            mergeLock.unlock();
        }
    }

    public AbstractNettyRemotingClient(
            NettyClientConfig nettyClientConfig,
            ThreadPoolExecutor messageExecutor,
            NettyPoolKey.TransactionRole transactionRole) {
        super(messageExecutor);
        this.transactionRole = transactionRole;
        clientBootstrap = new NettyClientBootstrap(nettyClientConfig, transactionRole);
        clientBootstrap.setChannelHandlers(new ClientHandler(), new ChannelEventHandler(this));
        clientChannelManager = new NettyClientChannelManager(
                new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
        this.reconnectTask = () -> {
            if (!timerStarted.get()) {
                return;
            }
            try {
                String serviceGroup = getTransactionServiceGroup();
                if (StringUtils.isNotBlank(serviceGroup)) {
                    clientChannelManager.reconnect(serviceGroup);
                }
            } catch (Throwable t) {
                LOGGER.error("Reconnect task failed for role: {}", transactionRole.name(), t);
            }
        };
    }

Am I on the right way.I’d really appreciate any feedback and let me know if you have any further questions!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much better. Good job.

@funky-eyes funky-eyes added this to the 2.6.0 milestone Nov 17, 2025
Comment on lines 126 to 130
public void init() {
timerExecutor.scheduleAtFixedRate(
() -> {
try {
clientChannelManager.reconnect(getTransactionServiceGroup());
} catch (Exception ex) {
LOGGER.warn("reconnect server failed. {}", ex.getMessage());
}
},
SCHEDULE_DELAY_MILLS,
SCHEDULE_INTERVAL_MILLS,
TimeUnit.MILLISECONDS);
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(
MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
mergeLock.lock();
try {
if (timerStarted.compareAndSet(false, true)) {
timerExecutor.scheduleAtFixedRate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think mergeLock should be used here, because it's originally designed for merging and sending data; not using it can ensure single responsibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means I need to use another like private final ReentrantLock reconnectLock = new ReentrantLock(); instead of mergeLock .Am I on the right way.I’d really appreciate any feedback and let me know if you have any further questions!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it needs to be replaced with another lock instead of using the original mergeLock.

Comment on lines 291 to 295
public void destroy() {
clientBootstrap.shutdown();
if (mergeSendExecutorService != null) {
mergeSendExecutorService.shutdown();
mergeLock.lock();
try {
if (timerStarted.compareAndSet(true, false)) {
LOGGER.info("Reconnect timer stopped (role: {})", transactionRole.name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@lokidundun lokidundun closed this Dec 3, 2025
@lokidundun lokidundun reopened this Dec 3, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes the RM/TM client reconnect logic to prevent duplicate timer creation by introducing an AtomicBoolean flag (timerStarted) with ReentrantLock protection to ensure thread-safe initialization and destruction.

Key Changes:

  • Added timerStarted flag and reconnectLock to prevent duplicate timer scheduling
  • Refactored reconnect logic into a reusable reconnectTask created in the constructor
  • Protected both init() and destroy() methods with locks to ensure thread safety

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 14 comments.

File Description
core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java Implements the core optimization with thread-safe timer management and reconnect task refactoring
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java Adds comprehensive test coverage for new reconnect logic, timer lifecycle, and edge cases
changes/zh-cn/2.x.md Documents the optimization in Chinese changelog
changes/en-us/2.x.md Documents the optimization in English changelog

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);
assertNotNull(reconnectTask, "fail to init reconnectTask");
} catch (Exception e) {
fail("test failed:" + e.getMessage());
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion message contains mixed English and Chinese characters ("test failed:"). For consistency with the rest of the codebase which uses English, the colon should be the ASCII colon character instead of the full-width Chinese colon.

Suggested change
fail("test failed" + e.getMessage());
fail("test failed: " + e.getMessage());

Copilot uses AI. Check for mistakes.
managerField.set(client, mockManager);

client.init();
Thread.sleep(100);
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test sleeps for 100ms which may not be sufficient for the reconnect timer to execute, especially on slower CI systems. This could lead to flaky tests. Consider either increasing the sleep duration or using a more reliable synchronization mechanism to verify the timer behavior.

Suggested change
Thread.sleep(100);
TimeUnit.SECONDS.sleep(1);

Copilot uses AI. Check for mistakes.
Comment on lines 1563 to 1564
// verify status
assertNotNull(multiClient);
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion 'assertNotNull(multiClient)' on line 1564 is not meaningful since multiClient was just created on line 1557 and would never be null at this point. Consider replacing this with a more meaningful assertion that verifies the client is properly initialized, such as checking the timerStarted flag or verifying that the timer has been scheduled.

Suggested change
// verify status
assertNotNull(multiClient);
// verify status: timer should be started after init
try {
Field timerStartedField = AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
timerStartedField.setAccessible(true);
AtomicBoolean timerStarted = (AtomicBoolean) timerStartedField.get(multiClient);
assertNotNull(timerStarted);
assertTrue(timerStarted.get());
} catch (NoSuchFieldException | IllegalAccessException e) {
Assertions.fail("Failed to access timerStarted field on AbstractNettyRemotingClient", e);
}

Copilot uses AI. Check for mistakes.
Comment on lines 1770 to 1776
Field clientBootstrapField = AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
clientBootstrapField.setAccessible(true);
NettyClientBootstrap mockBootstrap = mock(NettyClientBootstrap.class);
clientBootstrapField.set(testClient, mockBootstrap);

testClient.init();
verify(mockBootstrap, times(1)).start();
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The testClient is initialized but not destroyed in a finally block. If an exception occurs during field access or assertions, the client's resources (timers, executors) won't be cleaned up properly. Add a finally block to ensure cleanup.

Suggested change
Field clientBootstrapField = AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
clientBootstrapField.setAccessible(true);
NettyClientBootstrap mockBootstrap = mock(NettyClientBootstrap.class);
clientBootstrapField.set(testClient, mockBootstrap);
testClient.init();
verify(mockBootstrap, times(1)).start();
try {
Field clientBootstrapField = AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
clientBootstrapField.setAccessible(true);
NettyClientBootstrap mockBootstrap = mock(NettyClientBootstrap.class);
clientBootstrapField.set(testClient, mockBootstrap);
testClient.init();
verify(mockBootstrap, times(1)).start();
} finally {
testClient.destroy();
}

Copilot uses AI. Check for mistakes.
Comment on lines 1748 to 1764
TestClientWithEmptyServiceGroup testClient = new TestClientWithEmptyServiceGroup(clientConfig, messageExecutor);
Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);

assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
}

@Test
public void testReconnectTaskThrowException() throws Exception {
TestNettyRemotingClientWithReconnectException exceptionClient =
new TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask = (Runnable) reconnectTaskField.get(exceptionClient);

assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The testClient is created but never destroyed. Even though the reconnect task is run directly without calling init(), the client still has resources (like the reconnectLock and transactionRole) that should be cleaned up. Add proper cleanup to avoid potential resource leaks.

Suggested change
TestClientWithEmptyServiceGroup testClient = new TestClientWithEmptyServiceGroup(clientConfig, messageExecutor);
Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);
assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
}
@Test
public void testReconnectTaskThrowException() throws Exception {
TestNettyRemotingClientWithReconnectException exceptionClient =
new TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask = (Runnable) reconnectTaskField.get(exceptionClient);
assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
final TestClientWithEmptyServiceGroup testClient =
new TestClientWithEmptyServiceGroup(clientConfig, messageExecutor);
Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);
try {
assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
} finally {
testClient.destroy();
}
}
@Test
public void testReconnectTaskThrowException() throws Exception {
final TestNettyRemotingClientWithReconnectException exceptionClient =
new TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask = (Runnable) reconnectTaskField.get(exceptionClient);
try {
assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
} finally {
exceptionClient.destroy();
}

Copilot uses AI. Check for mistakes.
Comment on lines +1799 to +1845
// Mock clientChannelManager
NettyClientChannelManager mockChannelManager = mock(NettyClientChannelManager.class);
Field transactionRoleField = AbstractNettyRemotingClient.class.getDeclaredField("transactionRole");
transactionRoleField.setAccessible(true);

TestReconnectTaskClient client1 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
Field timerStartedField = AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
timerStartedField.setAccessible(true);
AtomicBoolean timerStarted1 = (AtomicBoolean) timerStartedField.get(client1);
timerStarted1.set(false);
Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask1 = (Runnable) reconnectTaskField.get(client1);

reconnectTask1.run();
verify(mockChannelManager, never()).reconnect(anyString());

TestReconnectTaskClient client2 = new TestReconnectTaskClient(clientConfig, messageExecutor, "");
AtomicBoolean timerStarted2 = (AtomicBoolean) timerStartedField.get(client2);
timerStarted2.set(true);
Field channelManagerField = AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
channelManagerField.setAccessible(true);
channelManagerField.set(client2, mockChannelManager);

// verify reconnect
Runnable reconnectTask2 = (Runnable) reconnectTaskField.get(client2);
reconnectTask2.run();
verify(mockChannelManager, never()).reconnect(anyString());

TestReconnectTaskClient client3 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
AtomicBoolean timerStarted3 = (AtomicBoolean) timerStartedField.get(client3);
timerStarted3.set(true);
channelManagerField.set(client3, mockChannelManager);

Runnable reconnectTask3 = (Runnable) reconnectTaskField.get(client3);
reconnectTask3.run();
verify(mockChannelManager, times(1)).reconnect(testServiceGroup);

TestReconnectTaskClient client4 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
AtomicBoolean timerStarted4 = (AtomicBoolean) timerStartedField.get(client4);
timerStarted4.set(true);
RuntimeException testEx = new RuntimeException("test-reconnect-error");
doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup);
channelManagerField.set(client4, mockChannelManager);

Runnable reconnectTask4 = (Runnable) reconnectTaskField.get(client4);
assertDoesNotThrow(reconnectTask4::run);
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test creates multiple client instances (client1, client2, client3, client4) but doesn't properly clean them up. Each client has scheduled timers and potentially other resources that should be destroyed. Add proper cleanup in a finally block or use @AfterEach to ensure all test clients are destroyed.

Suggested change
// Mock clientChannelManager
NettyClientChannelManager mockChannelManager = mock(NettyClientChannelManager.class);
Field transactionRoleField = AbstractNettyRemotingClient.class.getDeclaredField("transactionRole");
transactionRoleField.setAccessible(true);
TestReconnectTaskClient client1 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
Field timerStartedField = AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
timerStartedField.setAccessible(true);
AtomicBoolean timerStarted1 = (AtomicBoolean) timerStartedField.get(client1);
timerStarted1.set(false);
Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask1 = (Runnable) reconnectTaskField.get(client1);
reconnectTask1.run();
verify(mockChannelManager, never()).reconnect(anyString());
TestReconnectTaskClient client2 = new TestReconnectTaskClient(clientConfig, messageExecutor, "");
AtomicBoolean timerStarted2 = (AtomicBoolean) timerStartedField.get(client2);
timerStarted2.set(true);
Field channelManagerField = AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
channelManagerField.setAccessible(true);
channelManagerField.set(client2, mockChannelManager);
// verify reconnect
Runnable reconnectTask2 = (Runnable) reconnectTaskField.get(client2);
reconnectTask2.run();
verify(mockChannelManager, never()).reconnect(anyString());
TestReconnectTaskClient client3 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
AtomicBoolean timerStarted3 = (AtomicBoolean) timerStartedField.get(client3);
timerStarted3.set(true);
channelManagerField.set(client3, mockChannelManager);
Runnable reconnectTask3 = (Runnable) reconnectTaskField.get(client3);
reconnectTask3.run();
verify(mockChannelManager, times(1)).reconnect(testServiceGroup);
TestReconnectTaskClient client4 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
AtomicBoolean timerStarted4 = (AtomicBoolean) timerStartedField.get(client4);
timerStarted4.set(true);
RuntimeException testEx = new RuntimeException("test-reconnect-error");
doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup);
channelManagerField.set(client4, mockChannelManager);
Runnable reconnectTask4 = (Runnable) reconnectTaskField.get(client4);
assertDoesNotThrow(reconnectTask4::run);
TestReconnectTaskClient client1 = null;
TestReconnectTaskClient client2 = null;
TestReconnectTaskClient client3 = null;
TestReconnectTaskClient client4 = null;
try {
// Mock clientChannelManager
NettyClientChannelManager mockChannelManager = mock(NettyClientChannelManager.class);
Field transactionRoleField = AbstractNettyRemotingClient.class.getDeclaredField("transactionRole");
transactionRoleField.setAccessible(true);
client1 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
Field timerStartedField = AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
timerStartedField.setAccessible(true);
AtomicBoolean timerStarted1 = (AtomicBoolean) timerStartedField.get(client1);
timerStarted1.set(false);
Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask1 = (Runnable) reconnectTaskField.get(client1);
reconnectTask1.run();
verify(mockChannelManager, never()).reconnect(anyString());
client2 = new TestReconnectTaskClient(clientConfig, messageExecutor, "");
AtomicBoolean timerStarted2 = (AtomicBoolean) timerStartedField.get(client2);
timerStarted2.set(true);
Field channelManagerField = AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
channelManagerField.setAccessible(true);
channelManagerField.set(client2, mockChannelManager);
// verify reconnect
Runnable reconnectTask2 = (Runnable) reconnectTaskField.get(client2);
reconnectTask2.run();
verify(mockChannelManager, never()).reconnect(anyString());
client3 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
AtomicBoolean timerStarted3 = (AtomicBoolean) timerStartedField.get(client3);
timerStarted3.set(true);
channelManagerField.set(client3, mockChannelManager);
Runnable reconnectTask3 = (Runnable) reconnectTaskField.get(client3);
reconnectTask3.run();
verify(mockChannelManager, times(1)).reconnect(testServiceGroup);
client4 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
AtomicBoolean timerStarted4 = (AtomicBoolean) timerStartedField.get(client4);
timerStarted4.set(true);
RuntimeException testEx = new RuntimeException("test-reconnect-error");
doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup);
channelManagerField.set(client4, mockChannelManager);
Runnable reconnectTask4 = (Runnable) reconnectTaskField.get(client4);
assertDoesNotThrow(reconnectTask4::run);
} finally {
if (client1 != null) {
client1.destroy();
}
if (client2 != null) {
client2.destroy();
}
if (client3 != null) {
client3.destroy();
}
if (client4 != null) {
client4.destroy();
}
messageExecutor.shutdownNow();
}

Copilot uses AI. Check for mistakes.
Comment on lines 1758 to 1764
TestNettyRemotingClientWithReconnectException exceptionClient =
new TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask = (Runnable) reconnectTaskField.get(exceptionClient);

assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exceptionClient is created but never destroyed. Add proper cleanup in a finally block to ensure resources are released even if the test fails.

Suggested change
TestNettyRemotingClientWithReconnectException exceptionClient =
new TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask = (Runnable) reconnectTaskField.get(exceptionClient);
assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
TestNettyRemotingClientWithReconnectException exceptionClient = null;
try {
exceptionClient = new TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask = (Runnable) reconnectTaskField.get(exceptionClient);
assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
} finally {
if (exceptionClient != null) {
exceptionClient.destroy();
}
}

Copilot uses AI. Check for mistakes.
Comment on lines 136 to 145
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(
MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mergeSendExecutorService can be initialized multiple times if init() is called multiple times. While the timerStarted flag prevents duplicate timer scheduling, there's no similar protection for mergeSendExecutorService. This could lead to thread pool leaks if init() is called multiple times without calling destroy() in between. Consider adding a check to prevent re-initialization of the executor service, similar to the timer protection.

Copilot uses AI. Check for mistakes.
Comment on lines 1555 to 1577
@Test
public void testInitAndDestroyMultipleTimes() {
TestNettyRemotingClient multiClient = new TestNettyRemotingClient(clientConfig, messageExecutor);

try {
// initiate first
multiClient.init();

// verify status
assertNotNull(multiClient);

multiClient.destroy();
multiClient.destroy();

} finally {
// clean
try {
multiClient.destroy();
} catch (Exception e) {
// ignore
}
}
}
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test verifies that calling destroy() multiple times doesn't cause issues, but it doesn't verify that subsequent calls to init() after destroy() work correctly. This is important since the timerStarted flag might prevent re-initialization. Consider adding a test that calls init(), destroy(), init() sequence to ensure the client can be reinitialized properly.

Copilot uses AI. Check for mistakes.
Comment on lines +1793 to +1846
@Test
public void testReconnectTask() throws Exception {
String testServiceGroup = "test-group";
NettyClientConfig clientConfig = new NettyClientConfig();
ThreadPoolExecutor messageExecutor =
new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
// Mock clientChannelManager
NettyClientChannelManager mockChannelManager = mock(NettyClientChannelManager.class);
Field transactionRoleField = AbstractNettyRemotingClient.class.getDeclaredField("transactionRole");
transactionRoleField.setAccessible(true);

TestReconnectTaskClient client1 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
Field timerStartedField = AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
timerStartedField.setAccessible(true);
AtomicBoolean timerStarted1 = (AtomicBoolean) timerStartedField.get(client1);
timerStarted1.set(false);
Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask1 = (Runnable) reconnectTaskField.get(client1);

reconnectTask1.run();
verify(mockChannelManager, never()).reconnect(anyString());

TestReconnectTaskClient client2 = new TestReconnectTaskClient(clientConfig, messageExecutor, "");
AtomicBoolean timerStarted2 = (AtomicBoolean) timerStartedField.get(client2);
timerStarted2.set(true);
Field channelManagerField = AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
channelManagerField.setAccessible(true);
channelManagerField.set(client2, mockChannelManager);

// verify reconnect
Runnable reconnectTask2 = (Runnable) reconnectTaskField.get(client2);
reconnectTask2.run();
verify(mockChannelManager, never()).reconnect(anyString());

TestReconnectTaskClient client3 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
AtomicBoolean timerStarted3 = (AtomicBoolean) timerStartedField.get(client3);
timerStarted3.set(true);
channelManagerField.set(client3, mockChannelManager);

Runnable reconnectTask3 = (Runnable) reconnectTaskField.get(client3);
reconnectTask3.run();
verify(mockChannelManager, times(1)).reconnect(testServiceGroup);

TestReconnectTaskClient client4 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
AtomicBoolean timerStarted4 = (AtomicBoolean) timerStartedField.get(client4);
timerStarted4.set(true);
RuntimeException testEx = new RuntimeException("test-reconnect-error");
doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup);
channelManagerField.set(client4, mockChannelManager);

Runnable reconnectTask4 = (Runnable) reconnectTaskField.get(client4);
assertDoesNotThrow(reconnectTask4::run);
}
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test creates four separate client instances but uses the same mockChannelManager across all of them. This means the verify() calls are checking the cumulative behavior across all clients, not the individual client behavior. Each client should have its own mock instance to properly isolate the test cases and verify the expected behavior for each scenario independently.

Copilot uses AI. Check for mistakes.
@lokidundun
Copy link
Contributor Author

Could you please take another look ❤️ @funky-eyes

@funky-eyes funky-eyes modified the milestones: 2.6.0, 2.7.0 Jan 7, 2026
protected volatile boolean isSending = false;

private final Runnable reconnectTask;
private AtomicBoolean timerStarted = new AtomicBoolean(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite clear on the purpose of this PR. Currently, timerStarted is an instance variable and not shared, which means both RM and TM will each create their own reconnectTask. The reconnectTask itself is also an instance variable with no sharing.

For the old logic, what is the point of adding timerStarted? Wouldn't it be better to unify the reconnect thread pool for both RM and TM into a single one, so that there is only one task per entity (one for RM and one for TM) instead of starting two separate thread pools?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I have unifide RM/TM reconnect thread pools into a single static shared one SHARED_RECONNECT_EXECUTOR (eliminate redundancy), clarified timerStarted's purpose (controls per-instance task lifecycle, no longer meaningless)
and retained one reconnect task per entity (RM/TM) as you suggested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Is it necessary that both the RM client and the TM client start a timer for reconnecting to seata server ?

3 participants