Skip to content

Commit 7c9144a

Browse files
committedApr 5, 2024··
Temp commit: Add support for failover
1 parent e5bf1ac commit 7c9144a

11 files changed

+464
-146
lines changed
 

‎r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionStrategy.java

Lines changed: 85 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.asyncer.r2dbc.mysql;
1818

1919
import io.asyncer.r2dbc.mysql.client.Client;
20+
import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
2021
import io.asyncer.r2dbc.mysql.internal.util.StringUtils;
2122
import io.netty.channel.ChannelOption;
2223
import io.netty.resolver.AddressResolver;
@@ -26,13 +27,17 @@
2627
import io.netty.util.concurrent.EventExecutor;
2728
import io.netty.util.internal.logging.InternalLogger;
2829
import io.netty.util.internal.logging.InternalLoggerFactory;
30+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
31+
import org.jetbrains.annotations.Nullable;
2932
import reactor.core.publisher.Mono;
3033
import reactor.netty.resources.LoopResources;
3134
import reactor.netty.tcp.TcpClient;
3235

3336
import java.net.InetSocketAddress;
3437
import java.time.Duration;
3538
import java.time.ZoneId;
39+
import java.util.function.Function;
40+
import java.util.function.Supplier;
3641

3742
/**
3843
* An interface of a connection strategy that considers how to obtain a MySQL {@link Client} object.
@@ -49,7 +54,7 @@ interface ConnectionStrategy {
4954
*
5055
* @return a logged-in {@link Client} object.
5156
*/
52-
Mono<Client> connect();
57+
Mono<? extends Client> connect();
5358

5459
/**
5560
* Creates a general-purpose {@link TcpClient} with the given {@link SocketClientConfiguration}.
@@ -87,7 +92,7 @@ static TcpClient createTcpClient(SocketClientConfiguration configuration, boolea
8792
* @param configuration a configuration that affects login behavior.
8893
* @return a logged-in {@link Client} object.
8994
*/
90-
static Mono<Client> connectWithInit(
95+
static Mono<ReactorNettyClient> connectWithInit(
9196
TcpClient tcpClient,
9297
Credential credential,
9398
MySqlConnectionConfiguration configuration
@@ -110,7 +115,7 @@ static Mono<Client> connectWithInit(
110115
configuration.isPreserveInstants(),
111116
connectionTimeZone
112117
);
113-
}).flatMap(context -> Client.connect(tcpClient, configuration.getSsl(), context)).flatMap(client -> {
118+
}).flatMap(ctx -> ReactorNettyClient.connect(tcpClient, configuration.getSsl(), ctx)).flatMap(client -> {
114119
// Lazy init database after handshake/login
115120
MySqlSslConfiguration ssl = configuration.getSsl();
116121
String loginDb = configuration.isCreateDatabaseIfNotExist() ? "" : configuration.getDatabase();
@@ -126,30 +131,88 @@ static Mono<Client> connectWithInit(
126131
).then(Mono.just(client)).onErrorResume(e -> client.forceClose().then(Mono.error(e)));
127132
});
128133
}
129-
}
130-
131-
/**
132-
* Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses.
133-
*
134-
* @since 1.2.0
135-
*/
136-
final class BalancedResolverGroup extends AddressResolverGroup<InetSocketAddress> {
137134

138-
BalancedResolverGroup() {
135+
/**
136+
* Creates an exception that indicates a retry failure.
137+
*
138+
* @param message the message of the exception.
139+
* @param cause the last exception that caused the retry.
140+
* @return a retry failure exception.
141+
*/
142+
static R2dbcNonTransientResourceException retryFail(String message, @Nullable Throwable cause) {
143+
return new R2dbcNonTransientResourceException(
144+
message,
145+
"H1000",
146+
9000,
147+
cause
148+
);
139149
}
140150

141-
public static final BalancedResolverGroup INSTANCE;
151+
/**
152+
* Connect and login to a MySQL server with a specific TCP socket address.
153+
*
154+
* @since 1.2.0
155+
*/
156+
final class InetConnectFunction implements Function<Supplier<InetSocketAddress>, Mono<ReactorNettyClient>> {
157+
158+
private final boolean balancedDns;
159+
160+
private final boolean tcpKeepAlive;
161+
162+
private final boolean tcpNoDelay;
163+
164+
private final Credential credential;
142165

143-
static {
144-
INSTANCE = new BalancedResolverGroup();
145-
Runtime.getRuntime().addShutdownHook(new Thread(
146-
INSTANCE::close,
147-
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
148-
));
166+
private final MySqlConnectionConfiguration configuration;
167+
168+
InetConnectFunction(
169+
boolean balancedDns,
170+
boolean tcpKeepAlive,
171+
boolean tcpNoDelay,
172+
Credential credential,
173+
MySqlConnectionConfiguration configuration
174+
) {
175+
this.balancedDns = balancedDns;
176+
this.tcpKeepAlive = tcpKeepAlive;
177+
this.tcpNoDelay = tcpNoDelay;
178+
this.credential = credential;
179+
this.configuration = configuration;
180+
}
181+
182+
@Override
183+
public Mono<ReactorNettyClient> apply(Supplier<InetSocketAddress> address) {
184+
TcpClient client = ConnectionStrategy.createTcpClient(configuration.getClient(), balancedDns)
185+
.option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
186+
.option(ChannelOption.TCP_NODELAY, tcpNoDelay)
187+
.remoteAddress(address);
188+
189+
return ConnectionStrategy.connectWithInit(client, credential, configuration);
190+
}
149191
}
150192

151-
@Override
152-
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) {
153-
return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver();
193+
/**
194+
* Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses.
195+
*
196+
* @since 1.2.0
197+
*/
198+
final class BalancedResolverGroup extends AddressResolverGroup<InetSocketAddress> {
199+
200+
BalancedResolverGroup() {
201+
}
202+
203+
public static final BalancedResolverGroup INSTANCE;
204+
205+
static {
206+
INSTANCE = new BalancedResolverGroup();
207+
Runtime.getRuntime().addShutdownHook(new Thread(
208+
INSTANCE::close,
209+
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
210+
));
211+
}
212+
213+
@Override
214+
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) {
215+
return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver();
216+
}
154217
}
155218
}

‎r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/InitFlow.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.asyncer.r2dbc.mysql.cache.PrepareCache;
2323
import io.asyncer.r2dbc.mysql.client.Client;
2424
import io.asyncer.r2dbc.mysql.client.FluxExchangeable;
25+
import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
2526
import io.asyncer.r2dbc.mysql.codec.Codecs;
2627
import io.asyncer.r2dbc.mysql.codec.CodecsBuilder;
2728
import io.asyncer.r2dbc.mysql.constant.CompressionAlgorithm;
@@ -75,7 +76,7 @@
7576
import java.util.function.Function;
7677

7778
/**
78-
* A message flow utility that can initializes the session of {@link Client}.
79+
* A message flow utility that can initializes the session of {@link ReactorNettyClient}.
7980
* <p>
8081
* It should not use server-side prepared statements, because {@link PrepareCache} will be initialized after the session
8182
* is initialized.
@@ -117,9 +118,9 @@ final class InitFlow {
117118
};
118119

119120
/**
120-
* Initializes handshake and login a {@link Client}.
121+
* Initializes handshake and login a {@link ReactorNettyClient}.
121122
*
122-
* @param client the {@link Client} to exchange messages with.
123+
* @param client the {@link ReactorNettyClient} to exchange messages with.
123124
* @param sslMode the {@link SslMode} defines SSL capability and behavior.
124125
* @param database the database that will be connected.
125126
* @param user the user that will be login.
@@ -128,7 +129,7 @@ final class InitFlow {
128129
* @param zstdCompressionLevel the zstd compression level.
129130
* @return a {@link Mono} that indicates the initialization is done, or an error if the initialization failed.
130131
*/
131-
static Mono<Void> initHandshake(Client client, SslMode sslMode, String database, String user,
132+
static Mono<Void> initHandshake(ReactorNettyClient client, SslMode sslMode, String database, String user,
132133
@Nullable CharSequence password, Set<CompressionAlgorithm> compressionAlgorithms, int zstdCompressionLevel) {
133134
return client.exchange(new HandshakeExchangeable(
134135
client,
@@ -488,7 +489,7 @@ final class HandshakeExchangeable extends FluxExchangeable<Void> {
488489
private final Sinks.Many<SubsequenceClientMessage> requests = Sinks.many().unicast()
489490
.onBackpressureBuffer(Queues.<SubsequenceClientMessage>one().get());
490491

491-
private final Client client;
492+
private final ReactorNettyClient client;
492493

493494
private final SslMode sslMode;
494495

@@ -511,7 +512,7 @@ final class HandshakeExchangeable extends FluxExchangeable<Void> {
511512

512513
private boolean sslCompleted;
513514

514-
HandshakeExchangeable(Client client, SslMode sslMode, String database, String user,
515+
HandshakeExchangeable(ReactorNettyClient client, SslMode sslMode, String database, String user,
515516
@Nullable CharSequence password, Set<CompressionAlgorithm> compressions,
516517
int zstdCompressionLevel) {
517518
this.client = client;

‎r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MultiHostsConnectionStrategy.java

Lines changed: 104 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,17 @@
1717
package io.asyncer.r2dbc.mysql;
1818

1919
import io.asyncer.r2dbc.mysql.client.Client;
20+
import io.asyncer.r2dbc.mysql.client.FailoverClient;
21+
import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
2022
import io.asyncer.r2dbc.mysql.constant.ProtocolDriver;
2123
import io.asyncer.r2dbc.mysql.internal.NodeAddress;
2224
import io.asyncer.r2dbc.mysql.internal.util.InternalArrays;
23-
import io.netty.channel.ChannelOption;
2425
import io.netty.resolver.DefaultNameResolver;
2526
import io.netty.resolver.NameResolver;
2627
import io.netty.util.concurrent.Future;
27-
import io.r2dbc.spi.R2dbcNonTransientResourceException;
28-
import org.jetbrains.annotations.Nullable;
2928
import reactor.core.publisher.Flux;
3029
import reactor.core.publisher.Mono;
3130
import reactor.netty.resources.LoopResources;
32-
import reactor.netty.tcp.TcpClient;
3331
import reactor.netty.tcp.TcpResources;
3432

3533
import java.net.InetAddress;
@@ -46,105 +44,153 @@
4644
*/
4745
final class MultiHostsConnectionStrategy implements ConnectionStrategy {
4846

49-
private final Mono<Client> client;
47+
private final Mono<? extends Client> client;
5048

5149
MultiHostsConnectionStrategy(
52-
TcpSocketConfiguration tcp,
5350
MySqlConnectionConfiguration configuration,
54-
boolean shuffle
51+
List<NodeAddress> addresses,
52+
ProtocolDriver driver,
53+
int retriesAllDown,
54+
boolean shuffle,
55+
boolean tcpKeepAlive,
56+
boolean tcpNoDelay
5557
) {
56-
this.client = Mono.defer(() -> {
57-
if (ProtocolDriver.DNS_SRV.equals(tcp.getDriver())) {
58+
Mono<ReactorNettyClient> client = configuration.getCredential().flatMap(credential -> {
59+
if (ProtocolDriver.DNS_SRV.equals(driver)) {
60+
logger.debug("Resolve hosts via DNS SRV: {}", addresses);
61+
5862
LoopResources resources = configuration.getClient().getLoopResources();
5963
LoopResources loopResources = resources == null ? TcpResources.get() : resources;
60-
61-
return resolveAllHosts(loopResources, tcp.getAddresses(), shuffle)
62-
.flatMap(addresses -> connectHost(addresses, tcp, configuration, false, shuffle, 0));
64+
InetConnectFunction login = new InetConnectFunction(
65+
false,
66+
tcpKeepAlive,
67+
tcpNoDelay,
68+
credential,
69+
configuration
70+
);
71+
72+
return resolveAllHosts(loopResources, addresses, shuffle).flatMap(addrs -> {
73+
logger.debug("Connect to multiple addresses: {}", addrs);
74+
75+
return connectHost(
76+
addrs,
77+
login,
78+
shuffle,
79+
0,
80+
retriesAllDown
81+
);
82+
});
6383
} else {
64-
List<NodeAddress> availableHosts = copyAvailableAddresses(tcp.getAddresses(), shuffle);
84+
List<NodeAddress> availableHosts = copyAvailableAddresses(addresses, shuffle);
85+
logger.debug("Connect to multiple hosts: {}", availableHosts);
86+
6587
int size = availableHosts.size();
66-
InetSocketAddress[] addresses = new InetSocketAddress[availableHosts.size()];
88+
InetSocketAddress[] array = new InetSocketAddress[availableHosts.size()];
6789

6890
for (int i = 0; i < size; i++) {
69-
NodeAddress address = availableHosts.get(i);
70-
addresses[i] = InetSocketAddress.createUnresolved(address.getHost(), address.getPort());
91+
array[i] = availableHosts.get(i).toUnresolved();
7192
}
7293

73-
return connectHost(InternalArrays.asImmutableList(addresses), tcp, configuration, true, shuffle, 0);
94+
List<InetSocketAddress> addrs = InternalArrays.asImmutableList(array);
95+
InetConnectFunction login = new InetConnectFunction(
96+
true,
97+
tcpKeepAlive,
98+
tcpNoDelay,
99+
credential,
100+
configuration
101+
);
102+
103+
return connectHost(
104+
addrs,
105+
login,
106+
shuffle,
107+
0,
108+
retriesAllDown
109+
);
74110
}
75111
});
112+
113+
this.client = client.map(c -> new FailoverClient(c, client));
76114
}
77115

78116
@Override
79-
public Mono<Client> connect() {
117+
public Mono<? extends Client> connect() {
80118
return client;
81119
}
82120

83-
private Mono<Client> connectHost(
121+
private static Mono<ReactorNettyClient> connectHost(
84122
List<InetSocketAddress> addresses,
85-
TcpSocketConfiguration tcp,
86-
MySqlConnectionConfiguration configuration,
87-
boolean balancedDns,
123+
InetConnectFunction login,
88124
boolean shuffle,
89-
int attempts
125+
int attempts,
126+
int maxAttempts
90127
) {
91128
Iterator<InetSocketAddress> iter = addresses.iterator();
92129

93130
if (!iter.hasNext()) {
94-
return Mono.error(fail("Fail to establish connection: no available host", null));
131+
return Mono.error(ConnectionStrategy.retryFail("Fail to establish connection: no available host", null));
95132
}
96133

97-
return configuration.getCredential().flatMap(credential -> attemptConnect(
98-
iter.next(), credential, tcp, configuration, balancedDns
99-
).onErrorResume(t -> resumeConnect(
100-
t, addresses, iter, credential, tcp, configuration, balancedDns, shuffle, attempts
101-
)));
134+
135+
InetSocketAddress address = iter.next();
136+
137+
return login.apply(() -> address).onErrorResume(error -> resumeConnect(
138+
error,
139+
address,
140+
addresses,
141+
iter,
142+
login,
143+
shuffle,
144+
attempts,
145+
maxAttempts
146+
));
102147
}
103148

104-
private Mono<Client> resumeConnect(
149+
private static Mono<ReactorNettyClient> resumeConnect(
105150
Throwable t,
151+
InetSocketAddress failed,
106152
List<InetSocketAddress> addresses,
107153
Iterator<InetSocketAddress> iter,
108-
Credential credential,
109-
TcpSocketConfiguration tcp,
110-
MySqlConnectionConfiguration configuration,
111-
boolean balancedDns,
154+
InetConnectFunction login,
112155
boolean shuffle,
113-
int attempts
156+
int attempts,
157+
int maxAttempts
114158
) {
159+
logger.warn("Fail to connect to {}", failed, t);
160+
115161
if (!iter.hasNext()) {
116162
// The last host failed to connect
117-
if (attempts >= tcp.getRetriesAllDown()) {
118-
return Mono.error(fail(
119-
"Fail to establish connection, retried " + attempts + " times: " + t.getMessage(), t));
163+
if (attempts >= maxAttempts) {
164+
return Mono.error(ConnectionStrategy.retryFail(
165+
"Fail to establish connections, retried " + attempts + " times", t));
120166
}
121167

122-
logger.warn("All hosts failed to establish connections, auto-try again after 250ms.");
168+
logger.warn("All hosts failed to establish connections, auto-try again after 250ms.", t);
123169

124170
// Ignore waiting error, e.g. interrupted, scheduler rejected
125171
return Mono.delay(Duration.ofMillis(250))
126172
.onErrorComplete()
127-
.then(Mono.defer(() -> connectHost(addresses, tcp, configuration, balancedDns, shuffle, attempts + 1)));
173+
.then(Mono.defer(() -> connectHost(
174+
addresses,
175+
login,
176+
shuffle,
177+
attempts + 1,
178+
maxAttempts
179+
)));
128180
}
129181

130-
return attemptConnect(iter.next(), credential, tcp, configuration, balancedDns).onErrorResume(tt ->
131-
resumeConnect(tt, addresses, iter, credential, tcp, configuration, balancedDns, shuffle, attempts));
132-
}
133-
134-
private Mono<Client> attemptConnect(
135-
InetSocketAddress address,
136-
Credential credential,
137-
TcpSocketConfiguration tcp,
138-
MySqlConnectionConfiguration configuration,
139-
boolean balancedDns
140-
) {
141-
TcpClient tcpClient = ConnectionStrategy.createTcpClient(configuration.getClient(), balancedDns)
142-
.option(ChannelOption.SO_KEEPALIVE, tcp.isTcpKeepAlive())
143-
.option(ChannelOption.TCP_NODELAY, tcp.isTcpNoDelay())
144-
.remoteAddress(() -> address);
145-
146-
return ConnectionStrategy.connectWithInit(tcpClient, credential, configuration)
147-
.doOnError(e -> logger.warn("Fail to connect: ", e));
182+
InetSocketAddress address = iter.next();
183+
184+
return login.apply(() -> address).onErrorResume(error -> resumeConnect(
185+
error,
186+
address,
187+
addresses,
188+
iter,
189+
login,
190+
shuffle,
191+
attempts,
192+
maxAttempts
193+
));
148194
}
149195

150196
private static Mono<List<InetSocketAddress>> resolveAllHosts(
@@ -203,13 +249,4 @@ private static List<NodeAddress> copyAvailableAddresses(List<NodeAddress> addres
203249

204250
return InternalArrays.asImmutableList(addresses.toArray(new NodeAddress[0]));
205251
}
206-
207-
private static R2dbcNonTransientResourceException fail(String message, @Nullable Throwable cause) {
208-
return new R2dbcNonTransientResourceException(
209-
message,
210-
"H1000",
211-
9000,
212-
cause
213-
);
214-
}
215252
}

‎r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public final class MySqlConnectionConfiguration {
6464

6565
private final MySqlSslConfiguration ssl;
6666

67+
private final boolean autoReconnect;
68+
6769
private final boolean preserveInstants;
6870

6971
private final String connectionTimeZone;
@@ -110,6 +112,7 @@ private MySqlConnectionConfiguration(
110112
SocketClientConfiguration client,
111113
SocketConfiguration socket,
112114
MySqlSslConfiguration ssl,
115+
boolean autoReconnect,
113116
ZeroDateOption zeroDateOption,
114117
boolean preserveInstants,
115118
String connectionTimeZone,
@@ -127,6 +130,7 @@ private MySqlConnectionConfiguration(
127130
this.client = requireNonNull(client, "client must not be null");
128131
this.socket = requireNonNull(socket, "socket must not be null");
129132
this.ssl = requireNonNull(ssl, "ssl must not be null");
133+
this.autoReconnect = autoReconnect;
130134
this.preserveInstants = preserveInstants;
131135
this.connectionTimeZone = requireNonNull(connectionTimeZone, "connectionTimeZone must not be null");
132136
this.forceConnectionTimeZoneToSession = forceConnectionTimeZoneToSession;
@@ -169,6 +173,10 @@ MySqlSslConfiguration getSsl() {
169173
return ssl;
170174
}
171175

176+
boolean isAutoReconnect() {
177+
return autoReconnect;
178+
}
179+
172180
ZeroDateOption getZeroDateOption() {
173181
return zeroDateOption;
174182
}
@@ -272,6 +280,7 @@ public boolean equals(Object o) {
272280
return client.equals(that.client) &&
273281
socket.equals(that.socket) &&
274282
ssl.equals(that.ssl) &&
283+
autoReconnect == that.autoReconnect &&
275284
preserveInstants == that.preserveInstants &&
276285
connectionTimeZone.equals(that.connectionTimeZone) &&
277286
forceConnectionTimeZoneToSession == that.forceConnectionTimeZoneToSession &&
@@ -298,6 +307,7 @@ public int hashCode() {
298307
return Objects.hash(
299308
client,
300309
socket, ssl,
310+
autoReconnect,
301311
preserveInstants,
302312
connectionTimeZone,
303313
forceConnectionTimeZoneToSession,
@@ -320,6 +330,7 @@ public String toString() {
320330
return "MySqlConnectionConfiguration{client=" + client +
321331
", socket=" + socket +
322332
", ssl=" + ssl +
333+
", autoReconnect=" + autoReconnect +
323334
", preserveInstants=" + preserveInstants +
324335
", connectionTimeZone='" + connectionTimeZone + '\'' +
325336
", forceConnectionTimeZoneToSession=" + forceConnectionTimeZoneToSession +
@@ -357,6 +368,8 @@ public static final class Builder {
357368

358369
private final MySqlSslConfiguration.Builder ssl = new MySqlSslConfiguration.Builder();
359370

371+
private boolean autoReconnect;
372+
360373
@Nullable
361374
private String database;
362375

@@ -434,6 +447,7 @@ public MySqlConnectionConfiguration build() {
434447
client.build(),
435448
socket,
436449
ssl.build(preferredSsl),
450+
autoReconnect,
437451
zeroDateOption,
438452
preserveInstants,
439453
connectionTimeZone,
@@ -600,6 +614,23 @@ public Builder protocol(HaProtocol protocol) {
600614
return this;
601615
}
602616

617+
/**
618+
* Configures whether to perform failover reconnection. Default is {@code false}.
619+
* <p>
620+
* It is not recommended due to it may lead to unexpected results. For example, it may recover a transaction
621+
* state from a failed server node to an available node, the user can not aware of it, and continuing to execute
622+
* more queries in the transaction will lead to unexpected inconsistencies.
623+
*
624+
* @param enabled {@code true} to enable failover reconnection.
625+
* @return {@link Builder this}
626+
* @see <a href="https://dev.mysql.com/doc/connector-j/en/connector-j-config-failover.html">JDBC Failover</a>
627+
* @since 1.2.0
628+
*/
629+
public Builder autoReconnect(boolean enabled) {
630+
this.autoReconnect = enabled;
631+
return this;
632+
}
633+
603634
/**
604635
* Configures the connection timeout. Default no timeout.
605636
*

‎r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,18 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
6666
*/
6767
public static final Option<String> UNIX_SOCKET = Option.valueOf("unixSocket");
6868

69+
/**
70+
* Option to whether to perform failover reconnection. Default to {@code false}.
71+
* <p>
72+
* It is not recommended due to it may lead to unexpected results. For example, it may recover a transaction state
73+
* from a failed server node to an available node, the user can not aware of it, and continuing to execute more
74+
* queries in the transaction will lead to unexpected inconsistencies or errors. Or, user set a self-defined
75+
* variable in the session, it may not be recovered to the new node due to the driver can not aware of it.
76+
*
77+
* @since 1.2.0
78+
*/
79+
public static final Option<Boolean> AUTO_RECONNECT = Option.valueOf("autoReconnect");
80+
6981
/**
7082
* Option to set the time zone conversion. Default to {@code true} means enable conversion between JVM and
7183
* {@link #CONNECTION_TIME_ZONE}.
@@ -361,6 +373,8 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) {
361373

362374
mapper.optional(FORCE_CONNECTION_TIME_ZONE_TO_SESSION).asBoolean()
363375
.to(builder::forceConnectionTimeZoneToSession);
376+
mapper.optional(AUTO_RECONNECT).asBoolean()
377+
.to(builder::autoReconnect);
364378
mapper.optional(TCP_KEEP_ALIVE).asBoolean()
365379
.to(builder::tcpKeepAlive);
366380
mapper.optional(TCP_NO_DELAY).asBoolean()

‎r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SingleHostConnectionStrategy.java

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717
package io.asyncer.r2dbc.mysql;
1818

1919
import io.asyncer.r2dbc.mysql.client.Client;
20+
import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
2021
import io.asyncer.r2dbc.mysql.internal.NodeAddress;
21-
import io.netty.channel.ChannelOption;
2222
import reactor.core.publisher.Mono;
23-
import reactor.netty.tcp.TcpClient;
23+
24+
import java.time.Duration;
2425

2526
/**
2627
* An implementation of {@link ConnectionStrategy} that connects to a single host. It can be wrapped to a
@@ -30,23 +31,66 @@ final class SingleHostConnectionStrategy implements ConnectionStrategy {
3031

3132
private final Mono<Client> client;
3233

33-
SingleHostConnectionStrategy(TcpSocketConfiguration socket, MySqlConnectionConfiguration configuration) {
34+
SingleHostConnectionStrategy(
35+
MySqlConnectionConfiguration configuration,
36+
NodeAddress address,
37+
boolean tcpKeepAlive,
38+
boolean tcpNoDelay
39+
) {
3440
this.client = configuration.getCredential().flatMap(credential -> {
35-
NodeAddress address = socket.getFirstAddress();
36-
3741
logger.debug("Connect to a single host: {}", address);
3842

39-
TcpClient tcpClient = ConnectionStrategy.createTcpClient(configuration.getClient(), true)
40-
.option(ChannelOption.SO_KEEPALIVE, socket.isTcpKeepAlive())
41-
.option(ChannelOption.TCP_NODELAY, socket.isTcpNoDelay())
42-
.remoteAddress(address::toUnresolved);
43+
InetConnectFunction login = new InetConnectFunction(
44+
true,
45+
tcpKeepAlive,
46+
tcpNoDelay,
47+
credential,
48+
configuration
49+
);
4350

44-
return ConnectionStrategy.connectWithInit(tcpClient, credential, configuration);
51+
return connectHost(login, address, 0, 3);
4552
});
4653
}
4754

4855
@Override
4956
public Mono<Client> connect() {
5057
return client;
5158
}
59+
60+
private static Mono<ReactorNettyClient> connectHost(
61+
InetConnectFunction login,
62+
NodeAddress address,
63+
int attempts,
64+
int maxAttempts
65+
) {
66+
return login.apply(address::toUnresolved)
67+
.onErrorResume(t -> resumeConnect(t, address, login, attempts, maxAttempts));
68+
}
69+
70+
private static Mono<ReactorNettyClient> resumeConnect(
71+
Throwable t,
72+
NodeAddress address,
73+
InetConnectFunction login,
74+
int attempts,
75+
int maxAttempts
76+
) {
77+
logger.warn("Fail to connect to {}", address, t);
78+
79+
if (attempts >= maxAttempts) {
80+
return Mono.error(ConnectionStrategy.retryFail(
81+
"Fail to establish connection, retried " + attempts + " times", t));
82+
}
83+
84+
logger.warn("Failed to establish connection, auto-try again after 250ms.", t);
85+
86+
// Ignore waiting error, e.g. interrupted, scheduler rejected
87+
return Mono.delay(Duration.ofMillis(250))
88+
.onErrorComplete()
89+
.then(Mono.defer(() -> connectHost(
90+
login,
91+
address,
92+
attempts + 1,
93+
maxAttempts
94+
)));
95+
}
5296
}

‎r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/TcpSocketConfiguration.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,16 +219,48 @@ public ConnectionStrategy strategy(MySqlConnectionConfiguration configuration) {
219219
case REPLICATION:
220220
ConnectionStrategy.logger.warn(
221221
"R2DBC Connection cannot be set to read-only, replication protocol will use the first host");
222-
return new SingleHostConnectionStrategy(this, configuration);
222+
return new MultiHostsConnectionStrategy(
223+
configuration,
224+
Collections.singletonList(getFirstAddress()),
225+
driver,
226+
retriesAllDown,
227+
false,
228+
tcpKeepAlive,
229+
tcpNoDelay
230+
);
223231
case SEQUENTIAL:
224-
return new MultiHostsConnectionStrategy(this, configuration, false);
232+
return new MultiHostsConnectionStrategy(
233+
configuration,
234+
addresses,
235+
driver,
236+
retriesAllDown,
237+
false,
238+
tcpKeepAlive,
239+
tcpNoDelay
240+
);
225241
case LOAD_BALANCE:
226-
return new MultiHostsConnectionStrategy(this, configuration, true);
242+
return new MultiHostsConnectionStrategy(
243+
configuration,
244+
addresses,
245+
driver,
246+
retriesAllDown,
247+
true,
248+
tcpKeepAlive,
249+
tcpNoDelay
250+
);
227251
default:
228252
if (ProtocolDriver.MYSQL == driver && addresses.size() == 1) {
229-
return new SingleHostConnectionStrategy(this, configuration);
253+
return new SingleHostConnectionStrategy(configuration, getFirstAddress(), tcpKeepAlive, tcpNoDelay);
230254
} else {
231-
return new MultiHostsConnectionStrategy(this, configuration, false);
255+
return new MultiHostsConnectionStrategy(
256+
configuration,
257+
addresses,
258+
driver,
259+
retriesAllDown,
260+
false,
261+
tcpKeepAlive,
262+
tcpNoDelay
263+
);
232264
}
233265
}
234266
}

‎r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/Client.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.asyncer.r2dbc.mysql.client;
1818

1919
import io.asyncer.r2dbc.mysql.ConnectionContext;
20-
import io.asyncer.r2dbc.mysql.MySqlSslConfiguration;
2120
import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
2221
import io.asyncer.r2dbc.mysql.message.server.ServerMessage;
2322
import io.netty.buffer.ByteBufAllocator;
@@ -26,12 +25,9 @@
2625
import reactor.core.publisher.Flux;
2726
import reactor.core.publisher.Mono;
2827
import reactor.core.publisher.SynchronousSink;
29-
import reactor.netty.tcp.TcpClient;
3028

3129
import java.util.function.BiConsumer;
3230

33-
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
34-
3531
/**
3632
* An abstraction that wraps the networking part of exchanging methods.
3733
*/
@@ -99,31 +95,4 @@ public interface Client {
9995
* @return if connection is valid
10096
*/
10197
boolean isConnected();
102-
103-
/**
104-
* Sends a signal to the connection, which means server does not support SSL.
105-
*/
106-
void sslUnsupported();
107-
108-
/**
109-
* Sends a signal to {@link Client this}, which means login has succeeded.
110-
*/
111-
void loginSuccess();
112-
113-
/**
114-
* Connects to a MySQL server using the provided {@link TcpClient} and {@link MySqlSslConfiguration}.
115-
*
116-
* @param tcpClient the configured TCP client
117-
* @param ssl the SSL configuration
118-
* @param context the connection context
119-
* @return A {@link Mono} that will emit a connected {@link Client}.
120-
* @throws IllegalArgumentException if {@code tcpClient}, {@code ssl} or {@code context} is {@code null}.
121-
*/
122-
static Mono<Client> connect(TcpClient tcpClient, MySqlSslConfiguration ssl, ConnectionContext context) {
123-
requireNonNull(tcpClient, "tcpClient must not be null");
124-
requireNonNull(ssl, "ssl must not be null");
125-
requireNonNull(context, "context must not be null");
126-
127-
return tcpClient.connect().map(conn -> new ReactorNettyClient(conn, ssl, context));
128-
}
12998
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2024 asyncer.io projects
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.asyncer.r2dbc.mysql.client;
18+
19+
import io.asyncer.r2dbc.mysql.ConnectionContext;
20+
import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
21+
import io.asyncer.r2dbc.mysql.message.server.ServerMessage;
22+
import io.netty.buffer.ByteBufAllocator;
23+
import reactor.core.publisher.Flux;
24+
import reactor.core.publisher.Mono;
25+
import reactor.core.publisher.SynchronousSink;
26+
27+
import java.util.concurrent.atomic.AtomicReference;
28+
import java.util.function.BiConsumer;
29+
30+
/**
31+
* An implementation of {@link Client} that supports failover.
32+
*/
33+
public final class FailoverClient implements Client {
34+
35+
private final Mono<ReactorNettyClient> failover;
36+
37+
private final AtomicReference<ReactorNettyClient> client;
38+
39+
public FailoverClient(ReactorNettyClient client, Mono<ReactorNettyClient> failover) {
40+
this.client = new AtomicReference<>(client);
41+
this.failover = failover;
42+
}
43+
44+
private Mono<ReactorNettyClient> reconnectIfNecessary() {
45+
return Mono.defer(() -> {
46+
ReactorNettyClient client = this.client.get();
47+
48+
if (client.isChannelOpen() || client.isClosingOrClosed()) {
49+
// Open, or closed by user
50+
return Mono.just(client);
51+
}
52+
53+
return this.failover.flatMap(c -> {
54+
if (this.client.compareAndSet(client, c)) {
55+
// TODO: re-init session variables, transaction state, clear prepared cache, etc.
56+
return Mono.just(c);
57+
}
58+
59+
// Reconnected by other thread, close this one and retry
60+
return c.forceClose().then(reconnectIfNecessary());
61+
});
62+
});
63+
}
64+
65+
@Override
66+
public <T> Flux<T> exchange(ClientMessage request, BiConsumer<ServerMessage, SynchronousSink<T>> handler) {
67+
return reconnectIfNecessary().flatMapMany(c -> c.exchange(request, handler));
68+
}
69+
70+
@Override
71+
public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {
72+
return reconnectIfNecessary().flatMapMany(c -> c.exchange(exchangeable));
73+
}
74+
75+
@Override
76+
public Mono<Void> close() {
77+
return Mono.fromSupplier(this.client::get).flatMap(ReactorNettyClient::close);
78+
}
79+
80+
@Override
81+
public Mono<Void> forceClose() {
82+
return Mono.fromSupplier(this.client::get).flatMap(ReactorNettyClient::forceClose);
83+
}
84+
85+
@Override
86+
public ByteBufAllocator getByteBufAllocator() {
87+
return this.client.get().getByteBufAllocator();
88+
}
89+
90+
@Override
91+
public ConnectionContext getContext() {
92+
return this.client.get().getContext();
93+
}
94+
95+
@Override
96+
public boolean isConnected() {
97+
return this.client.get().isConnected();
98+
}
99+
}

‎r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import reactor.core.publisher.SynchronousSink;
4242
import reactor.netty.Connection;
4343
import reactor.netty.FutureMono;
44+
import reactor.netty.tcp.TcpClient;
4445
import reactor.util.context.Context;
4546
import reactor.util.context.ContextView;
4647

@@ -54,7 +55,7 @@
5455
/**
5556
* An implementation of client based on the Reactor Netty project.
5657
*/
57-
final class ReactorNettyClient implements Client {
58+
public final class ReactorNettyClient implements Client {
5859

5960
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ReactorNettyClient.class);
6061

@@ -250,12 +251,10 @@ public boolean isConnected() {
250251
return state < ST_CLOSED && connection.channel().isOpen();
251252
}
252253

253-
@Override
254254
public void sslUnsupported() {
255255
connection.channel().pipeline().fireUserEventTriggered(SslState.UNSUPPORTED);
256256
}
257257

258-
@Override
259258
public void loginSuccess() {
260259
if (context.getCapability().isCompression()) {
261260
connection.channel().pipeline().fireUserEventTriggered(PacketEvent.USE_COMPRESSION);
@@ -264,6 +263,14 @@ public void loginSuccess() {
264263
}
265264
}
266265

266+
boolean isClosingOrClosed() {
267+
return state >= ST_CLOSING;
268+
}
269+
270+
boolean isChannelOpen() {
271+
return connection.channel().isOpen();
272+
}
273+
267274
private static void resetSequence(Connection connection) {
268275
connection.channel().pipeline().fireUserEventTriggered(PacketEvent.RESET_SEQUENCE);
269276
}
@@ -324,6 +331,27 @@ private void handleClose() {
324331
}
325332
}
326333

334+
/**
335+
* Connects to a MySQL server using the provided {@link TcpClient} and {@link MySqlSslConfiguration}.
336+
*
337+
* @param tcpClient the configured TCP client
338+
* @param ssl the SSL configuration
339+
* @param context the connection context
340+
* @return A {@link Mono} that will emit a connected {@link Client}.
341+
* @throws IllegalArgumentException if {@code tcpClient}, {@code ssl} or {@code context} is {@code null}.
342+
*/
343+
public static Mono<ReactorNettyClient> connect(
344+
TcpClient tcpClient,
345+
MySqlSslConfiguration ssl,
346+
ConnectionContext context
347+
) {
348+
requireNonNull(tcpClient, "tcpClient must not be null");
349+
requireNonNull(ssl, "ssl must not be null");
350+
requireNonNull(context, "context must not be null");
351+
352+
return tcpClient.connect().map(conn -> new ReactorNettyClient(conn, ssl, context));
353+
}
354+
327355
private final class ResponseSubscriber implements CoreSubscriber<Object> {
328356

329357
private final ResponseSink sink;

‎r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/constant/HaProtocol.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ public enum HaProtocol {
6262
* <p>
6363
* Using: I want to use the first node for read-write if connection is set to read-write, and other nodes if
6464
* connection is set to read-only. R2DBC can not set a {@link io.r2dbc.spi.Connection Connection} to read-only mode.
65-
* So it will always use the first host. R2DBC does not recommend this mutability. Perhaps in the future, R2DBC will
66-
* support using read-only mode to create a connection instead of modifying an existing connection.
65+
* So it will always use the first host. Perhaps in the future, R2DBC will support using read-only mode to create a
66+
* connection instead of modifying an existing connection.
6767
* <p>
6868
* Reconnect: I want to reconnect to the current node if the current node is unavailable and
6969
* {@code autoReconnect=true}.

0 commit comments

Comments
 (0)
Please sign in to comment.