-
Notifications
You must be signed in to change notification settings - Fork 8.9k
optimize: Optimize RM/TM client reconnect #7783
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 2.x
Are you sure you want to change the base?
Changes from 37 commits
871a6d7
5bda6c2
b8f436d
38c21d7
ae81603
713c172
d7ca2d6
e90a2fc
b98592b
e004dd3
8ab62d9
1a1d4b2
90864b2
dc0b49a
29c865f
e1653f5
2183a20
715e35c
3b261b8
9a71eec
6d24bf8
1faddba
f1f4e76
57f6dae
1374211
a39988e
b75b01c
e832b3a
b9ae7a6
070122f
3248451
dd736a9
f1c7560
7c705dc
5f183c0
0f28ad7
c194948
57d48ed
abf9b75
f9da76e
bf60ee4
003d4ab
ba272ac
4a6e225
eb78486
c3921e7
4f5302d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,6 +67,7 @@ | |
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.TimeoutException; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.locks.Condition; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
| import java.util.function.Function; | ||
|
|
@@ -96,6 +97,10 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting | |
| protected final Condition mergeCondition = mergeLock.newCondition(); | ||
| protected volatile boolean isSending = false; | ||
|
|
||
| private final Runnable reconnectTask; | ||
| private AtomicBoolean timerStarted = new AtomicBoolean(false); | ||
| private final ReentrantLock reconnectLock = new ReentrantLock(); | ||
|
|
||
| /** | ||
| * When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap. | ||
| */ | ||
|
|
@@ -120,29 +125,30 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting | |
|
|
||
| @Override | ||
| public void init() { | ||
| timerExecutor.scheduleAtFixedRate( | ||
| () -> { | ||
| try { | ||
| clientChannelManager.reconnect(getTransactionServiceGroup()); | ||
| } catch (Exception ex) { | ||
| LOGGER.warn("reconnect server failed. {}", ex.getMessage()); | ||
| } | ||
| }, | ||
| SCHEDULE_DELAY_MILLS, | ||
| SCHEDULE_INTERVAL_MILLS, | ||
| TimeUnit.MILLISECONDS); | ||
| if (this.isEnableClientBatchSendRequest()) { | ||
| mergeSendExecutorService = new ThreadPoolExecutor( | ||
| MAX_MERGE_SEND_THREAD, | ||
| MAX_MERGE_SEND_THREAD, | ||
| KEEP_ALIVE_TIME, | ||
| TimeUnit.MILLISECONDS, | ||
| new LinkedBlockingQueue<>(), | ||
| new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD)); | ||
| mergeSendExecutorService.submit(new MergedSendRunnable()); | ||
| reconnectLock.lock(); | ||
| try { | ||
| if (timerStarted.compareAndSet(false, true)) { | ||
| timerExecutor.scheduleAtFixedRate( | ||
| reconnectTask, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS); | ||
| LOGGER.info("Reconnect timer started (role: {})", transactionRole.name()); | ||
| } | ||
|
|
||
| if (this.isEnableClientBatchSendRequest()) { | ||
| mergeSendExecutorService = new ThreadPoolExecutor( | ||
| MAX_MERGE_SEND_THREAD, | ||
| MAX_MERGE_SEND_THREAD, | ||
| KEEP_ALIVE_TIME, | ||
| TimeUnit.MILLISECONDS, | ||
| new LinkedBlockingQueue<>(), | ||
| new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD)); | ||
| mergeSendExecutorService.submit(new MergedSendRunnable()); | ||
| } | ||
|
||
|
|
||
| super.init(); | ||
| clientBootstrap.start(); | ||
| } finally { | ||
| reconnectLock.unlock(); | ||
| } | ||
| super.init(); | ||
| clientBootstrap.start(); | ||
| } | ||
|
|
||
| public AbstractNettyRemotingClient( | ||
|
|
@@ -155,6 +161,19 @@ public AbstractNettyRemotingClient( | |
| clientBootstrap.setChannelHandlers(new ClientHandler(), new ChannelEventHandler(this)); | ||
| clientChannelManager = new NettyClientChannelManager( | ||
| new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig); | ||
| this.reconnectTask = () -> { | ||
| if (!timerStarted.get()) { | ||
| return; | ||
| } | ||
| try { | ||
| String serviceGroup = getTransactionServiceGroup(); | ||
| if (StringUtils.isNotBlank(serviceGroup)) { | ||
| clientChannelManager.reconnect(serviceGroup); | ||
| } | ||
| } catch (Throwable t) { | ||
| LOGGER.error("Reconnect task failed for role: {}", transactionRole.name(), t); | ||
| } | ||
| }; | ||
|
||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -271,11 +290,19 @@ public void destroyChannel(String serverAddress, Channel channel) { | |
|
|
||
| @Override | ||
| public void destroy() { | ||
| clientBootstrap.shutdown(); | ||
| if (mergeSendExecutorService != null) { | ||
| mergeSendExecutorService.shutdown(); | ||
| reconnectLock.lock(); | ||
| try { | ||
| if (timerStarted.compareAndSet(true, false)) { | ||
| LOGGER.info("Reconnect timer stopped (role: {})", transactionRole.name()); | ||
| } | ||
| clientBootstrap.shutdown(); | ||
| if (mergeSendExecutorService != null) { | ||
| mergeSendExecutorService.shutdown(); | ||
| } | ||
| super.destroy(); | ||
|
||
| } finally { | ||
| reconnectLock.unlock(); | ||
| } | ||
| super.destroy(); | ||
| } | ||
|
|
||
| public void setTransactionMessageHandler(TransactionMessageHandler transactionMessageHandler) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite clear on the purpose of this PR. Currently,
timerStartedis an instance variable and not shared, which means both RM and TM will each create their ownreconnectTask. ThereconnectTaskitself is also an instance variable with no sharing.For the old logic, what is the point of adding
timerStarted? Wouldn't it be better to unify the reconnect thread pool for both RM and TM into a single one, so that there is only one task per entity (one for RM and one for TM) instead of starting two separate thread pools?