-
Notifications
You must be signed in to change notification settings - Fork 8.9k
optimize: Optimize RM/TM client reconnect #7783
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 2.x
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## 2.x #7783 +/- ##
============================================
+ Coverage 71.10% 71.56% +0.46%
- Complexity 797 883 +86
============================================
Files 1294 1294
Lines 49538 49574 +36
Branches 5879 5888 +9
============================================
+ Hits 35222 35477 +255
+ Misses 11406 11171 -235
- Partials 2910 2926 +16
🚀 New features to boost your workflow:
|
|
I've addressed the resource leak issue by removing the static modifier from the reconnect timer and related resources.Am I on the right track?Let me know if further adjustments are needed!If you are convenient,please take a look❤️. @maple525866 |
| @Override | ||
| public void destroy() { | ||
| if (reconnectTimer != null && timerStarted.get()) { | ||
| reconnectTimer.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When reconnectTimer runs shutdown(), it does not interrupt ongoing tasks, which means that reconnection may still occur after destruction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside from the reconnection issue, logically speaking, there is also a race condition problem here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When reconnectTimer runs shutdown(), it does not interrupt ongoing tasks, which means that reconnection may still occur after destruction.
It means that I should use the reconnectTimer.shutdownNow() not the reconnectTimer.shutdown() to stop all active tasks,right?
Aside from the reconnection issue, logically speaking, there is also a race condition problem here.
It means that I ought to use protected final ReentrantLock mergeLock = new ReentrantLock() to promise the Atomicity,like this?
public void destroy() {
mergeLock.lock();
try {
if (reconnectTimer != null && timerStarted.get()) {
reconnectTimer.shutdown();
timerStarted.set(false);
LOGGER.info("Instance reconnect timer stopped (role: {})", transactionRole.name());
}
clientBootstrap.shutdown();
if (mergeSendExecutorService != null) {
mergeSendExecutorService.shutdown();
}
super.destroy();
} finally {
mergeLock.unlock();
}
}
| protected volatile boolean isSending = false; | ||
|
|
||
| private final Runnable reconnectTask; | ||
| private ScheduledExecutorService reconnectTimer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably better to have a globally unique one here, otherwise there would be multiple reconnectTimers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably better to have a globally unique one here, otherwise there would be multiple reconnectTimers.
I haven't come up with a perfect solution for now. Could you please share your thoughts?❤️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably better to have a globally unique one here, otherwise there would be multiple reconnectTimers.
Or I think we can tackle like this? What do you think?
private volatile ScheduledExecutorService reconnectTimer;
@Override
public void init() {
if (timerStarted.compareAndSet(false, true)) {
mergeLock.lock();
try {
this.reconnectTimer = new ScheduledThreadPoolExecutor(
1, new NamedThreadFactory("Reconnect-Timer-" + transactionRole.name(), 1));
this.reconnectTimer.scheduleAtFixedRate(
this.reconnectTask, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
LOGGER.info("Instance reconnect timer started (role: {})", transactionRole.name());
}finally {
mergeLock.unlock();
}
}
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(
MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}
| this.reconnectTimer = new ScheduledThreadPoolExecutor( | ||
| 1, new NamedThreadFactory("Reconnect-Timer-" + transactionRole.name(), 1)); | ||
| this.reconnectTimer.scheduleAtFixedRate( | ||
| this.reconnectTask, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reconnectTask is still null at this point; you should initialize reconnectTask before creating the thread pool. Also, why not reuse timerExecutor instead of creating a new scheduled thread pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your detailed feedback! Here are the optimizations I've received and made in response to your comments:
I've initialized reconnectTask in the constructor in advance and adjusted the logic to reuse the existing timerExecutor
@Override
public void init() {
mergeLock.lock();
try {
if (timerStarted.compareAndSet(false, true)) {
timerExecutor.scheduleAtFixedRate(
reconnectTask, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
LOGGER.info("Reconnect timer started (role: {})", transactionRole.name());
}
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(
MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
} finally {
mergeLock.unlock();
}
}
public AbstractNettyRemotingClient(
NettyClientConfig nettyClientConfig,
ThreadPoolExecutor messageExecutor,
NettyPoolKey.TransactionRole transactionRole) {
super(messageExecutor);
this.transactionRole = transactionRole;
clientBootstrap = new NettyClientBootstrap(nettyClientConfig, transactionRole);
clientBootstrap.setChannelHandlers(new ClientHandler(), new ChannelEventHandler(this));
clientChannelManager = new NettyClientChannelManager(
new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
this.reconnectTask = () -> {
if (!timerStarted.get()) {
return;
}
try {
String serviceGroup = getTransactionServiceGroup();
if (StringUtils.isNotBlank(serviceGroup)) {
clientChannelManager.reconnect(serviceGroup);
}
} catch (Throwable t) {
LOGGER.error("Reconnect task failed for role: {}", transactionRole.name(), t);
}
};
}
Am I on the right way.I’d really appreciate any feedback and let me know if you have any further questions!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks much better. Good job.
| public void init() { | ||
| timerExecutor.scheduleAtFixedRate( | ||
| () -> { | ||
| try { | ||
| clientChannelManager.reconnect(getTransactionServiceGroup()); | ||
| } catch (Exception ex) { | ||
| LOGGER.warn("reconnect server failed. {}", ex.getMessage()); | ||
| } | ||
| }, | ||
| SCHEDULE_DELAY_MILLS, | ||
| SCHEDULE_INTERVAL_MILLS, | ||
| TimeUnit.MILLISECONDS); | ||
| if (this.isEnableClientBatchSendRequest()) { | ||
| mergeSendExecutorService = new ThreadPoolExecutor( | ||
| MAX_MERGE_SEND_THREAD, | ||
| MAX_MERGE_SEND_THREAD, | ||
| KEEP_ALIVE_TIME, | ||
| TimeUnit.MILLISECONDS, | ||
| new LinkedBlockingQueue<>(), | ||
| new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD)); | ||
| mergeSendExecutorService.submit(new MergedSendRunnable()); | ||
| mergeLock.lock(); | ||
| try { | ||
| if (timerStarted.compareAndSet(false, true)) { | ||
| timerExecutor.scheduleAtFixedRate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think mergeLock should be used here, because it's originally designed for merging and sending data; not using it can ensure single responsibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That means I need to use another like private final ReentrantLock reconnectLock = new ReentrantLock(); instead of mergeLock .Am I on the right way.I’d really appreciate any feedback and let me know if you have any further questions!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it needs to be replaced with another lock instead of using the original mergeLock.
| public void destroy() { | ||
| clientBootstrap.shutdown(); | ||
| if (mergeSendExecutorService != null) { | ||
| mergeSendExecutorService.shutdown(); | ||
| mergeLock.lock(); | ||
| try { | ||
| if (timerStarted.compareAndSet(true, false)) { | ||
| LOGGER.info("Reconnect timer stopped (role: {})", transactionRole.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
…ubator-seata into feature/reconnect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR optimizes the RM/TM client reconnect logic to prevent duplicate timer creation by introducing an AtomicBoolean flag (timerStarted) with ReentrantLock protection to ensure thread-safe initialization and destruction.
Key Changes:
- Added
timerStartedflag andreconnectLockto prevent duplicate timer scheduling - Refactored reconnect logic into a reusable
reconnectTaskcreated in the constructor - Protected both
init()anddestroy()methods with locks to ensure thread safety
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 14 comments.
| File | Description |
|---|---|
| core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java | Implements the core optimization with thread-safe timer management and reconnect task refactoring |
| core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java | Adds comprehensive test coverage for new reconnect logic, timer lifecycle, and edge cases |
| changes/zh-cn/2.x.md | Documents the optimization in Chinese changelog |
| changes/en-us/2.x.md | Documents the optimization in English changelog |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient); | ||
| assertNotNull(reconnectTask, "fail to init reconnectTask"); | ||
| } catch (Exception e) { | ||
| fail("test failed:" + e.getMessage()); |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion message contains mixed English and Chinese characters ("test failed:"). For consistency with the rest of the codebase which uses English, the colon should be the ASCII colon character instead of the full-width Chinese colon.
| fail("test failed:" + e.getMessage()); | |
| fail("test failed: " + e.getMessage()); |
| managerField.set(client, mockManager); | ||
|
|
||
| client.init(); | ||
| Thread.sleep(100); |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test sleeps for 100ms which may not be sufficient for the reconnect timer to execute, especially on slower CI systems. This could lead to flaky tests. Consider either increasing the sleep duration or using a more reliable synchronization mechanism to verify the timer behavior.
| Thread.sleep(100); | |
| TimeUnit.SECONDS.sleep(1); |
| // verify status | ||
| assertNotNull(multiClient); |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion 'assertNotNull(multiClient)' on line 1564 is not meaningful since multiClient was just created on line 1557 and would never be null at this point. Consider replacing this with a more meaningful assertion that verifies the client is properly initialized, such as checking the timerStarted flag or verifying that the timer has been scheduled.
| // verify status | |
| assertNotNull(multiClient); | |
| // verify status: timer should be started after init | |
| try { | |
| Field timerStartedField = AbstractNettyRemotingClient.class.getDeclaredField("timerStarted"); | |
| timerStartedField.setAccessible(true); | |
| AtomicBoolean timerStarted = (AtomicBoolean) timerStartedField.get(multiClient); | |
| assertNotNull(timerStarted); | |
| assertTrue(timerStarted.get()); | |
| } catch (NoSuchFieldException | IllegalAccessException e) { | |
| Assertions.fail("Failed to access timerStarted field on AbstractNettyRemotingClient", e); | |
| } |
| Field clientBootstrapField = AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap"); | ||
| clientBootstrapField.setAccessible(true); | ||
| NettyClientBootstrap mockBootstrap = mock(NettyClientBootstrap.class); | ||
| clientBootstrapField.set(testClient, mockBootstrap); | ||
|
|
||
| testClient.init(); | ||
| verify(mockBootstrap, times(1)).start(); |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The testClient is initialized but not destroyed in a finally block. If an exception occurs during field access or assertions, the client's resources (timers, executors) won't be cleaned up properly. Add a finally block to ensure cleanup.
| Field clientBootstrapField = AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap"); | |
| clientBootstrapField.setAccessible(true); | |
| NettyClientBootstrap mockBootstrap = mock(NettyClientBootstrap.class); | |
| clientBootstrapField.set(testClient, mockBootstrap); | |
| testClient.init(); | |
| verify(mockBootstrap, times(1)).start(); | |
| try { | |
| Field clientBootstrapField = AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap"); | |
| clientBootstrapField.setAccessible(true); | |
| NettyClientBootstrap mockBootstrap = mock(NettyClientBootstrap.class); | |
| clientBootstrapField.set(testClient, mockBootstrap); | |
| testClient.init(); | |
| verify(mockBootstrap, times(1)).start(); | |
| } finally { | |
| testClient.destroy(); | |
| } |
| TestClientWithEmptyServiceGroup testClient = new TestClientWithEmptyServiceGroup(clientConfig, messageExecutor); | ||
| Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask"); | ||
| reconnectTaskField.setAccessible(true); | ||
| Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient); | ||
|
|
||
| assertDoesNotThrow(reconnectTask::run, "serviceGroup is null"); | ||
| } | ||
|
|
||
| @Test | ||
| public void testReconnectTaskThrowException() throws Exception { | ||
| TestNettyRemotingClientWithReconnectException exceptionClient = | ||
| new TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor); | ||
| Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask"); | ||
| reconnectTaskField.setAccessible(true); | ||
| Runnable reconnectTask = (Runnable) reconnectTaskField.get(exceptionClient); | ||
|
|
||
| assertDoesNotThrow(reconnectTask::run, "reconnectTask exception"); |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The testClient is created but never destroyed. Even though the reconnect task is run directly without calling init(), the client still has resources (like the reconnectLock and transactionRole) that should be cleaned up. Add proper cleanup to avoid potential resource leaks.
| TestClientWithEmptyServiceGroup testClient = new TestClientWithEmptyServiceGroup(clientConfig, messageExecutor); | |
| Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask"); | |
| reconnectTaskField.setAccessible(true); | |
| Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient); | |
| assertDoesNotThrow(reconnectTask::run, "serviceGroup is null"); | |
| } | |
| @Test | |
| public void testReconnectTaskThrowException() throws Exception { | |
| TestNettyRemotingClientWithReconnectException exceptionClient = | |
| new TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor); | |
| Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask"); | |
| reconnectTaskField.setAccessible(true); | |
| Runnable reconnectTask = (Runnable) reconnectTaskField.get(exceptionClient); | |
| assertDoesNotThrow(reconnectTask::run, "reconnectTask exception"); | |
| final TestClientWithEmptyServiceGroup testClient = | |
| new TestClientWithEmptyServiceGroup(clientConfig, messageExecutor); | |
| Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask"); | |
| reconnectTaskField.setAccessible(true); | |
| Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient); | |
| try { | |
| assertDoesNotThrow(reconnectTask::run, "serviceGroup is null"); | |
| } finally { | |
| testClient.destroy(); | |
| } | |
| } | |
| @Test | |
| public void testReconnectTaskThrowException() throws Exception { | |
| final TestNettyRemotingClientWithReconnectException exceptionClient = | |
| new TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor); | |
| Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask"); | |
| reconnectTaskField.setAccessible(true); | |
| Runnable reconnectTask = (Runnable) reconnectTaskField.get(exceptionClient); | |
| try { | |
| assertDoesNotThrow(reconnectTask::run, "reconnectTask exception"); | |
| } finally { | |
| exceptionClient.destroy(); | |
| } |
| // Mock clientChannelManager | ||
| NettyClientChannelManager mockChannelManager = mock(NettyClientChannelManager.class); | ||
| Field transactionRoleField = AbstractNettyRemotingClient.class.getDeclaredField("transactionRole"); | ||
| transactionRoleField.setAccessible(true); | ||
|
|
||
| TestReconnectTaskClient client1 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup); | ||
| Field timerStartedField = AbstractNettyRemotingClient.class.getDeclaredField("timerStarted"); | ||
| timerStartedField.setAccessible(true); | ||
| AtomicBoolean timerStarted1 = (AtomicBoolean) timerStartedField.get(client1); | ||
| timerStarted1.set(false); | ||
| Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask"); | ||
| reconnectTaskField.setAccessible(true); | ||
| Runnable reconnectTask1 = (Runnable) reconnectTaskField.get(client1); | ||
|
|
||
| reconnectTask1.run(); | ||
| verify(mockChannelManager, never()).reconnect(anyString()); | ||
|
|
||
| TestReconnectTaskClient client2 = new TestReconnectTaskClient(clientConfig, messageExecutor, ""); | ||
| AtomicBoolean timerStarted2 = (AtomicBoolean) timerStartedField.get(client2); | ||
| timerStarted2.set(true); | ||
| Field channelManagerField = AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager"); | ||
| channelManagerField.setAccessible(true); | ||
| channelManagerField.set(client2, mockChannelManager); | ||
|
|
||
| // verify reconnect | ||
| Runnable reconnectTask2 = (Runnable) reconnectTaskField.get(client2); | ||
| reconnectTask2.run(); | ||
| verify(mockChannelManager, never()).reconnect(anyString()); | ||
|
|
||
| TestReconnectTaskClient client3 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup); | ||
| AtomicBoolean timerStarted3 = (AtomicBoolean) timerStartedField.get(client3); | ||
| timerStarted3.set(true); | ||
| channelManagerField.set(client3, mockChannelManager); | ||
|
|
||
| Runnable reconnectTask3 = (Runnable) reconnectTaskField.get(client3); | ||
| reconnectTask3.run(); | ||
| verify(mockChannelManager, times(1)).reconnect(testServiceGroup); | ||
|
|
||
| TestReconnectTaskClient client4 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup); | ||
| AtomicBoolean timerStarted4 = (AtomicBoolean) timerStartedField.get(client4); | ||
| timerStarted4.set(true); | ||
| RuntimeException testEx = new RuntimeException("test-reconnect-error"); | ||
| doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup); | ||
| channelManagerField.set(client4, mockChannelManager); | ||
|
|
||
| Runnable reconnectTask4 = (Runnable) reconnectTaskField.get(client4); | ||
| assertDoesNotThrow(reconnectTask4::run); |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test creates multiple client instances (client1, client2, client3, client4) but doesn't properly clean them up. Each client has scheduled timers and potentially other resources that should be destroyed. Add proper cleanup in a finally block or use @AfterEach to ensure all test clients are destroyed.
| // Mock clientChannelManager | |
| NettyClientChannelManager mockChannelManager = mock(NettyClientChannelManager.class); | |
| Field transactionRoleField = AbstractNettyRemotingClient.class.getDeclaredField("transactionRole"); | |
| transactionRoleField.setAccessible(true); | |
| TestReconnectTaskClient client1 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup); | |
| Field timerStartedField = AbstractNettyRemotingClient.class.getDeclaredField("timerStarted"); | |
| timerStartedField.setAccessible(true); | |
| AtomicBoolean timerStarted1 = (AtomicBoolean) timerStartedField.get(client1); | |
| timerStarted1.set(false); | |
| Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask"); | |
| reconnectTaskField.setAccessible(true); | |
| Runnable reconnectTask1 = (Runnable) reconnectTaskField.get(client1); | |
| reconnectTask1.run(); | |
| verify(mockChannelManager, never()).reconnect(anyString()); | |
| TestReconnectTaskClient client2 = new TestReconnectTaskClient(clientConfig, messageExecutor, ""); | |
| AtomicBoolean timerStarted2 = (AtomicBoolean) timerStartedField.get(client2); | |
| timerStarted2.set(true); | |
| Field channelManagerField = AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager"); | |
| channelManagerField.setAccessible(true); | |
| channelManagerField.set(client2, mockChannelManager); | |
| // verify reconnect | |
| Runnable reconnectTask2 = (Runnable) reconnectTaskField.get(client2); | |
| reconnectTask2.run(); | |
| verify(mockChannelManager, never()).reconnect(anyString()); | |
| TestReconnectTaskClient client3 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup); | |
| AtomicBoolean timerStarted3 = (AtomicBoolean) timerStartedField.get(client3); | |
| timerStarted3.set(true); | |
| channelManagerField.set(client3, mockChannelManager); | |
| Runnable reconnectTask3 = (Runnable) reconnectTaskField.get(client3); | |
| reconnectTask3.run(); | |
| verify(mockChannelManager, times(1)).reconnect(testServiceGroup); | |
| TestReconnectTaskClient client4 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup); | |
| AtomicBoolean timerStarted4 = (AtomicBoolean) timerStartedField.get(client4); | |
| timerStarted4.set(true); | |
| RuntimeException testEx = new RuntimeException("test-reconnect-error"); | |
| doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup); | |
| channelManagerField.set(client4, mockChannelManager); | |
| Runnable reconnectTask4 = (Runnable) reconnectTaskField.get(client4); | |
| assertDoesNotThrow(reconnectTask4::run); | |
| TestReconnectTaskClient client1 = null; | |
| TestReconnectTaskClient client2 = null; | |
| TestReconnectTaskClient client3 = null; | |
| TestReconnectTaskClient client4 = null; | |
| try { | |
| // Mock clientChannelManager | |
| NettyClientChannelManager mockChannelManager = mock(NettyClientChannelManager.class); | |
| Field transactionRoleField = AbstractNettyRemotingClient.class.getDeclaredField("transactionRole"); | |
| transactionRoleField.setAccessible(true); | |
| client1 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup); | |
| Field timerStartedField = AbstractNettyRemotingClient.class.getDeclaredField("timerStarted"); | |
| timerStartedField.setAccessible(true); | |
| AtomicBoolean timerStarted1 = (AtomicBoolean) timerStartedField.get(client1); | |
| timerStarted1.set(false); | |
| Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask"); | |
| reconnectTaskField.setAccessible(true); | |
| Runnable reconnectTask1 = (Runnable) reconnectTaskField.get(client1); | |
| reconnectTask1.run(); | |
| verify(mockChannelManager, never()).reconnect(anyString()); | |
| client2 = new TestReconnectTaskClient(clientConfig, messageExecutor, ""); | |
| AtomicBoolean timerStarted2 = (AtomicBoolean) timerStartedField.get(client2); | |
| timerStarted2.set(true); | |
| Field channelManagerField = AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager"); | |
| channelManagerField.setAccessible(true); | |
| channelManagerField.set(client2, mockChannelManager); | |
| // verify reconnect | |
| Runnable reconnectTask2 = (Runnable) reconnectTaskField.get(client2); | |
| reconnectTask2.run(); | |
| verify(mockChannelManager, never()).reconnect(anyString()); | |
| client3 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup); | |
| AtomicBoolean timerStarted3 = (AtomicBoolean) timerStartedField.get(client3); | |
| timerStarted3.set(true); | |
| channelManagerField.set(client3, mockChannelManager); | |
| Runnable reconnectTask3 = (Runnable) reconnectTaskField.get(client3); | |
| reconnectTask3.run(); | |
| verify(mockChannelManager, times(1)).reconnect(testServiceGroup); | |
| client4 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup); | |
| AtomicBoolean timerStarted4 = (AtomicBoolean) timerStartedField.get(client4); | |
| timerStarted4.set(true); | |
| RuntimeException testEx = new RuntimeException("test-reconnect-error"); | |
| doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup); | |
| channelManagerField.set(client4, mockChannelManager); | |
| Runnable reconnectTask4 = (Runnable) reconnectTaskField.get(client4); | |
| assertDoesNotThrow(reconnectTask4::run); | |
| } finally { | |
| if (client1 != null) { | |
| client1.destroy(); | |
| } | |
| if (client2 != null) { | |
| client2.destroy(); | |
| } | |
| if (client3 != null) { | |
| client3.destroy(); | |
| } | |
| if (client4 != null) { | |
| client4.destroy(); | |
| } | |
| messageExecutor.shutdownNow(); | |
| } |
| TestNettyRemotingClientWithReconnectException exceptionClient = | ||
| new TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor); | ||
| Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask"); | ||
| reconnectTaskField.setAccessible(true); | ||
| Runnable reconnectTask = (Runnable) reconnectTaskField.get(exceptionClient); | ||
|
|
||
| assertDoesNotThrow(reconnectTask::run, "reconnectTask exception"); |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exceptionClient is created but never destroyed. Add proper cleanup in a finally block to ensure resources are released even if the test fails.
| TestNettyRemotingClientWithReconnectException exceptionClient = | |
| new TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor); | |
| Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask"); | |
| reconnectTaskField.setAccessible(true); | |
| Runnable reconnectTask = (Runnable) reconnectTaskField.get(exceptionClient); | |
| assertDoesNotThrow(reconnectTask::run, "reconnectTask exception"); | |
| TestNettyRemotingClientWithReconnectException exceptionClient = null; | |
| try { | |
| exceptionClient = new TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor); | |
| Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask"); | |
| reconnectTaskField.setAccessible(true); | |
| Runnable reconnectTask = (Runnable) reconnectTaskField.get(exceptionClient); | |
| assertDoesNotThrow(reconnectTask::run, "reconnectTask exception"); | |
| } finally { | |
| if (exceptionClient != null) { | |
| exceptionClient.destroy(); | |
| } | |
| } |
| if (this.isEnableClientBatchSendRequest()) { | ||
| mergeSendExecutorService = new ThreadPoolExecutor( | ||
| MAX_MERGE_SEND_THREAD, | ||
| MAX_MERGE_SEND_THREAD, | ||
| KEEP_ALIVE_TIME, | ||
| TimeUnit.MILLISECONDS, | ||
| new LinkedBlockingQueue<>(), | ||
| new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD)); | ||
| mergeSendExecutorService.submit(new MergedSendRunnable()); | ||
| } |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mergeSendExecutorService can be initialized multiple times if init() is called multiple times. While the timerStarted flag prevents duplicate timer scheduling, there's no similar protection for mergeSendExecutorService. This could lead to thread pool leaks if init() is called multiple times without calling destroy() in between. Consider adding a check to prevent re-initialization of the executor service, similar to the timer protection.
| @Test | ||
| public void testInitAndDestroyMultipleTimes() { | ||
| TestNettyRemotingClient multiClient = new TestNettyRemotingClient(clientConfig, messageExecutor); | ||
|
|
||
| try { | ||
| // initiate first | ||
| multiClient.init(); | ||
|
|
||
| // verify status | ||
| assertNotNull(multiClient); | ||
|
|
||
| multiClient.destroy(); | ||
| multiClient.destroy(); | ||
|
|
||
| } finally { | ||
| // clean | ||
| try { | ||
| multiClient.destroy(); | ||
| } catch (Exception e) { | ||
| // ignore | ||
| } | ||
| } | ||
| } |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test verifies that calling destroy() multiple times doesn't cause issues, but it doesn't verify that subsequent calls to init() after destroy() work correctly. This is important since the timerStarted flag might prevent re-initialization. Consider adding a test that calls init(), destroy(), init() sequence to ensure the client can be reinitialized properly.
| @Test | ||
| public void testReconnectTask() throws Exception { | ||
| String testServiceGroup = "test-group"; | ||
| NettyClientConfig clientConfig = new NettyClientConfig(); | ||
| ThreadPoolExecutor messageExecutor = | ||
| new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); | ||
| // Mock clientChannelManager | ||
| NettyClientChannelManager mockChannelManager = mock(NettyClientChannelManager.class); | ||
| Field transactionRoleField = AbstractNettyRemotingClient.class.getDeclaredField("transactionRole"); | ||
| transactionRoleField.setAccessible(true); | ||
|
|
||
| TestReconnectTaskClient client1 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup); | ||
| Field timerStartedField = AbstractNettyRemotingClient.class.getDeclaredField("timerStarted"); | ||
| timerStartedField.setAccessible(true); | ||
| AtomicBoolean timerStarted1 = (AtomicBoolean) timerStartedField.get(client1); | ||
| timerStarted1.set(false); | ||
| Field reconnectTaskField = AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask"); | ||
| reconnectTaskField.setAccessible(true); | ||
| Runnable reconnectTask1 = (Runnable) reconnectTaskField.get(client1); | ||
|
|
||
| reconnectTask1.run(); | ||
| verify(mockChannelManager, never()).reconnect(anyString()); | ||
|
|
||
| TestReconnectTaskClient client2 = new TestReconnectTaskClient(clientConfig, messageExecutor, ""); | ||
| AtomicBoolean timerStarted2 = (AtomicBoolean) timerStartedField.get(client2); | ||
| timerStarted2.set(true); | ||
| Field channelManagerField = AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager"); | ||
| channelManagerField.setAccessible(true); | ||
| channelManagerField.set(client2, mockChannelManager); | ||
|
|
||
| // verify reconnect | ||
| Runnable reconnectTask2 = (Runnable) reconnectTaskField.get(client2); | ||
| reconnectTask2.run(); | ||
| verify(mockChannelManager, never()).reconnect(anyString()); | ||
|
|
||
| TestReconnectTaskClient client3 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup); | ||
| AtomicBoolean timerStarted3 = (AtomicBoolean) timerStartedField.get(client3); | ||
| timerStarted3.set(true); | ||
| channelManagerField.set(client3, mockChannelManager); | ||
|
|
||
| Runnable reconnectTask3 = (Runnable) reconnectTaskField.get(client3); | ||
| reconnectTask3.run(); | ||
| verify(mockChannelManager, times(1)).reconnect(testServiceGroup); | ||
|
|
||
| TestReconnectTaskClient client4 = new TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup); | ||
| AtomicBoolean timerStarted4 = (AtomicBoolean) timerStartedField.get(client4); | ||
| timerStarted4.set(true); | ||
| RuntimeException testEx = new RuntimeException("test-reconnect-error"); | ||
| doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup); | ||
| channelManagerField.set(client4, mockChannelManager); | ||
|
|
||
| Runnable reconnectTask4 = (Runnable) reconnectTaskField.get(client4); | ||
| assertDoesNotThrow(reconnectTask4::run); | ||
| } |
Copilot
AI
Dec 24, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test creates four separate client instances but uses the same mockChannelManager across all of them. This means the verify() calls are checking the cumulative behavior across all clients, not the individual client behavior. Each client should have its own mock instance to properly isolate the test cases and verify the expected behavior for each scenario independently.
|
Could you please take another look ❤️ @funky-eyes |
| protected volatile boolean isSending = false; | ||
|
|
||
| private final Runnable reconnectTask; | ||
| private AtomicBoolean timerStarted = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite clear on the purpose of this PR. Currently, timerStarted is an instance variable and not shared, which means both RM and TM will each create their own reconnectTask. The reconnectTask itself is also an instance variable with no sharing.
For the old logic, what is the point of adding timerStarted? Wouldn't it be better to unify the reconnect thread pool for both RM and TM into a single one, so that there is only one task per entity (one for RM and one for TM) instead of starting two separate thread pools?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I have unifide RM/TM reconnect thread pools into a single static shared one SHARED_RECONNECT_EXECUTOR (eliminate redundancy), clarified timerStarted's purpose (controls per-instance task lifecycle, no longer meaningless)
and retained one reconnect task per entity (RM/TM) as you suggested.
Ⅰ. Describe what this PR did
Optimized RM/TM client reconnect logic to prevent duplicate timer creation .
Ⅱ. Does this pull request fix one issue?
fixes #5338
Ⅲ. Why don't you add test cases (unit test/integration test)?
Ⅳ. Describe how to verify it
Ⅴ. Special notes for reviews