Skip to content

Commit b5b6cdf

Browse files
committed
Temp commit: Add support for failover
1 parent 487fdd6 commit b5b6cdf

11 files changed

+467
-140
lines changed

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionStrategy.java

Lines changed: 85 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.asyncer.r2dbc.mysql;
1818

1919
import io.asyncer.r2dbc.mysql.client.Client;
20+
import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
2021
import io.asyncer.r2dbc.mysql.constant.CompressionAlgorithm;
2122
import io.asyncer.r2dbc.mysql.constant.SslMode;
2223
import io.netty.channel.ChannelOption;
@@ -27,13 +28,17 @@
2728
import io.netty.util.concurrent.EventExecutor;
2829
import io.netty.util.internal.logging.InternalLogger;
2930
import io.netty.util.internal.logging.InternalLoggerFactory;
31+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
32+
import org.jetbrains.annotations.Nullable;
3033
import reactor.core.publisher.Mono;
3134
import reactor.netty.resources.LoopResources;
3235
import reactor.netty.tcp.TcpClient;
3336

3437
import java.net.InetSocketAddress;
3538
import java.time.Duration;
3639
import java.util.Set;
40+
import java.util.function.Function;
41+
import java.util.function.Supplier;
3742

3843
/**
3944
* An interface of a connection strategy that considers how to obtain a MySQL {@link Client} object.
@@ -50,7 +55,7 @@ interface ConnectionStrategy {
5055
*
5156
* @return a logged-in {@link Client} object.
5257
*/
53-
Mono<Client> connect();
58+
Mono<? extends Client> connect();
5459

5560
/**
5661
* Creates a general-purpose {@link TcpClient} with the given {@link SocketClientConfiguration}.
@@ -88,7 +93,7 @@ static TcpClient createTcpClient(SocketClientConfiguration configuration, boolea
8893
* @param configuration a configuration that affects login behavior.
8994
* @return a logged-in {@link Client} object.
9095
*/
91-
static Mono<Client> login(
96+
static Mono<ReactorNettyClient> login(
9297
TcpClient tcpClient,
9398
Credential credential,
9499
MySqlConnectionConfiguration configuration
@@ -108,33 +113,91 @@ static Mono<Client> login(
108113
configuration.retrieveConnectionZoneId()
109114
);
110115

111-
return Client.connect(tcpClient, ssl, context).flatMap(client ->
116+
return ReactorNettyClient.connect(tcpClient, ssl, context).flatMap(client ->
112117
QueryFlow.login(client, sslMode, loginDb, credential, compressionAlgorithms, zstdLevel));
113118
}
114-
}
115-
116-
/**
117-
* Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses.
118-
*
119-
* @since 1.2.0
120-
*/
121-
final class BalancedResolverGroup extends AddressResolverGroup<InetSocketAddress> {
122119

123-
BalancedResolverGroup() {
120+
/**
121+
* Creates an exception that indicates a retry failure.
122+
*
123+
* @param message the message of the exception.
124+
* @param cause the last exception that caused the retry.
125+
* @return a retry failure exception.
126+
*/
127+
static R2dbcNonTransientResourceException retryFail(String message, @Nullable Throwable cause) {
128+
return new R2dbcNonTransientResourceException(
129+
message,
130+
"H1000",
131+
9000,
132+
cause
133+
);
124134
}
125135

126-
public static final BalancedResolverGroup INSTANCE;
136+
/**
137+
* Connect and login to a MySQL server with a specific TCP socket address.
138+
*
139+
* @since 1.2.0
140+
*/
141+
final class InetConnectFunction implements Function<Supplier<InetSocketAddress>, Mono<ReactorNettyClient>> {
142+
143+
private final boolean balancedDns;
144+
145+
private final boolean tcpKeepAlive;
146+
147+
private final boolean tcpNoDelay;
127148

128-
static {
129-
INSTANCE = new BalancedResolverGroup();
130-
Runtime.getRuntime().addShutdownHook(new Thread(
131-
INSTANCE::close,
132-
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
133-
));
149+
private final Credential credential;
150+
151+
private final MySqlConnectionConfiguration configuration;
152+
153+
InetConnectFunction(
154+
boolean balancedDns,
155+
boolean tcpKeepAlive,
156+
boolean tcpNoDelay,
157+
Credential credential,
158+
MySqlConnectionConfiguration configuration
159+
) {
160+
this.balancedDns = balancedDns;
161+
this.tcpKeepAlive = tcpKeepAlive;
162+
this.tcpNoDelay = tcpNoDelay;
163+
this.credential = credential;
164+
this.configuration = configuration;
165+
}
166+
167+
@Override
168+
public Mono<ReactorNettyClient> apply(Supplier<InetSocketAddress> address) {
169+
TcpClient cc = ConnectionStrategy.createTcpClient(configuration.getClient(), balancedDns)
170+
.option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
171+
.option(ChannelOption.TCP_NODELAY, tcpNoDelay)
172+
.remoteAddress(address);
173+
174+
return ConnectionStrategy.login(cc, credential, configuration);
175+
}
134176
}
135177

136-
@Override
137-
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) {
138-
return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver();
178+
/**
179+
* Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses.
180+
*
181+
* @since 1.2.0
182+
*/
183+
final class BalancedResolverGroup extends AddressResolverGroup<InetSocketAddress> {
184+
185+
BalancedResolverGroup() {
186+
}
187+
188+
public static final BalancedResolverGroup INSTANCE;
189+
190+
static {
191+
INSTANCE = new BalancedResolverGroup();
192+
Runtime.getRuntime().addShutdownHook(new Thread(
193+
INSTANCE::close,
194+
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
195+
));
196+
}
197+
198+
@Override
199+
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) {
200+
return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver();
201+
}
139202
}
140203
}

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MultiHostsConnectionStrategy.java

Lines changed: 103 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,17 @@
1717
package io.asyncer.r2dbc.mysql;
1818

1919
import io.asyncer.r2dbc.mysql.client.Client;
20+
import io.asyncer.r2dbc.mysql.client.FailoverClient;
21+
import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
2022
import io.asyncer.r2dbc.mysql.constant.ProtocolDriver;
2123
import io.asyncer.r2dbc.mysql.internal.NodeAddress;
2224
import io.asyncer.r2dbc.mysql.internal.util.InternalArrays;
23-
import io.netty.channel.ChannelOption;
2425
import io.netty.resolver.DefaultNameResolver;
2526
import io.netty.resolver.NameResolver;
2627
import io.netty.util.concurrent.Future;
27-
import io.r2dbc.spi.R2dbcNonTransientResourceException;
28-
import org.jetbrains.annotations.Nullable;
2928
import reactor.core.publisher.Flux;
3029
import reactor.core.publisher.Mono;
3130
import reactor.netty.resources.LoopResources;
32-
import reactor.netty.tcp.TcpClient;
3331
import reactor.netty.tcp.TcpResources;
3432

3533
import java.net.InetAddress;
@@ -46,101 +44,152 @@
4644
*/
4745
final class MultiHostsConnectionStrategy implements ConnectionStrategy {
4846

49-
private final Mono<Client> client;
47+
private final Mono<? extends Client> client;
5048

5149
MultiHostsConnectionStrategy(
52-
TcpSocketConfiguration tcp,
5350
MySqlConnectionConfiguration configuration,
54-
boolean shuffle
51+
List<NodeAddress> addresses,
52+
ProtocolDriver driver,
53+
int retriesAllDown,
54+
boolean shuffle,
55+
boolean tcpKeepAlive,
56+
boolean tcpNoDelay
5557
) {
56-
this.client = Mono.defer(() -> {
57-
if (ProtocolDriver.DNS_SRV.equals(tcp.getDriver())) {
58+
Mono<ReactorNettyClient> client = configuration.getCredential().flatMap(credential -> {
59+
if (ProtocolDriver.DNS_SRV.equals(driver)) {
60+
logger.debug("Resolve hosts via DNS SRV: {}", addresses);
61+
5862
LoopResources resources = configuration.getClient().getLoopResources();
5963
LoopResources loopResources = resources == null ? TcpResources.get() : resources;
60-
61-
return resolveAllHosts(loopResources, tcp.getAddresses(), shuffle)
62-
.flatMap(addresses -> connectHost(addresses, tcp, configuration, false, shuffle, 0));
64+
InetConnectFunction login = new InetConnectFunction(
65+
false,
66+
tcpKeepAlive,
67+
tcpNoDelay,
68+
credential,
69+
configuration
70+
);
71+
72+
return resolveAllHosts(loopResources, addresses, shuffle).flatMap(addrs -> {
73+
logger.debug("Connect to multiple addresses: {}", addrs);
74+
75+
return connectHost(
76+
addrs,
77+
login,
78+
shuffle,
79+
0,
80+
retriesAllDown
81+
);
82+
});
6383
} else {
64-
List<NodeAddress> availableHosts = copyAvailableAddresses(tcp.getAddresses(), shuffle);
84+
List<NodeAddress> availableHosts = copyAvailableAddresses(addresses, shuffle);
85+
logger.debug("Connect to multiple hosts: {}", availableHosts);
86+
6587
int size = availableHosts.size();
66-
InetSocketAddress[] addresses = new InetSocketAddress[availableHosts.size()];
88+
InetSocketAddress[] array = new InetSocketAddress[availableHosts.size()];
6789

6890
for (int i = 0; i < size; i++) {
69-
NodeAddress address = availableHosts.get(i);
70-
addresses[i] = InetSocketAddress.createUnresolved(address.getHost(), address.getPort());
91+
array[i] = availableHosts.get(i).toUnresolved();
7192
}
7293

73-
return connectHost(InternalArrays.asImmutableList(addresses), tcp, configuration, true, shuffle, 0);
94+
List<InetSocketAddress> addrs = InternalArrays.asImmutableList(array);
95+
InetConnectFunction login = new InetConnectFunction(
96+
true,
97+
tcpKeepAlive,
98+
tcpNoDelay,
99+
credential,
100+
configuration
101+
);
102+
103+
return connectHost(
104+
addrs,
105+
login,
106+
shuffle,
107+
0,
108+
retriesAllDown
109+
);
74110
}
75111
});
112+
113+
this.client = client.map(c -> new FailoverClient(c, client));
76114
}
77115

78116
@Override
79-
public Mono<Client> connect() {
117+
public Mono<? extends Client> connect() {
80118
return client;
81119
}
82120

83-
private Mono<Client> connectHost(
121+
private static Mono<ReactorNettyClient> connectHost(
84122
List<InetSocketAddress> addresses,
85-
TcpSocketConfiguration tcp,
86-
MySqlConnectionConfiguration configuration,
87-
boolean balancedDns,
123+
InetConnectFunction login,
88124
boolean shuffle,
89-
int attempts
125+
int attempts,
126+
int maxAttempts
90127
) {
91128
Iterator<InetSocketAddress> iter = addresses.iterator();
92129

93130
if (!iter.hasNext()) {
94-
return Mono.error(fail("Fail to establish connection: no available host", null));
131+
return Mono.error(ConnectionStrategy.retryFail("Fail to establish connection: no available host", null));
95132
}
96133

97-
return attemptConnect(iter.next(), tcp, configuration, balancedDns).onErrorResume(t ->
98-
resumeConnect(t, addresses, iter, tcp, configuration, balancedDns, shuffle, attempts));
134+
InetSocketAddress address = iter.next();
135+
136+
return login.apply(() -> address).onErrorResume(error -> resumeConnect(
137+
error,
138+
address,
139+
addresses,
140+
iter,
141+
login,
142+
shuffle,
143+
attempts,
144+
maxAttempts
145+
));
99146
}
100147

101-
private Mono<Client> resumeConnect(
148+
private static Mono<ReactorNettyClient> resumeConnect(
102149
Throwable t,
150+
InetSocketAddress failed,
103151
List<InetSocketAddress> addresses,
104152
Iterator<InetSocketAddress> iter,
105-
TcpSocketConfiguration tcp,
106-
MySqlConnectionConfiguration configuration,
107-
boolean balancedDns,
153+
InetConnectFunction login,
108154
boolean shuffle,
109-
int attempts
155+
int attempts,
156+
int maxAttempts
110157
) {
158+
logger.warn("Fail to connect to {}", failed, t);
159+
111160
if (!iter.hasNext()) {
112161
// The last host failed to connect
113-
if (attempts >= tcp.getRetriesAllDown()) {
114-
return Mono.error(fail(
115-
"Fail to establish connection, retried " + attempts + " times: " + t.getMessage(), t));
162+
if (attempts >= maxAttempts) {
163+
return Mono.error(ConnectionStrategy.retryFail(
164+
"Fail to establish connections, retried " + attempts + " times", t));
116165
}
117166

118-
logger.warn("All hosts failed to establish connections, auto-try again after 250ms.");
167+
logger.warn("All hosts failed to establish connections, auto-try again after 250ms.", t);
119168

120169
// Ignore waiting error, e.g. interrupted, scheduler rejected
121170
return Mono.delay(Duration.ofMillis(250))
122171
.onErrorComplete()
123-
.then(Mono.defer(() -> connectHost(addresses, tcp, configuration, balancedDns, shuffle, attempts + 1)));
172+
.then(Mono.defer(() -> connectHost(
173+
addresses,
174+
login,
175+
shuffle,
176+
attempts + 1,
177+
maxAttempts
178+
)));
124179
}
125180

126-
return attemptConnect(iter.next(), tcp, configuration, balancedDns).onErrorResume(tt ->
127-
resumeConnect(tt, addresses, iter, tcp, configuration, balancedDns, shuffle, attempts));
128-
}
129-
130-
private Mono<Client> attemptConnect(
131-
InetSocketAddress address,
132-
TcpSocketConfiguration tcp,
133-
MySqlConnectionConfiguration configuration,
134-
boolean balancedDns
135-
) {
136-
return configuration.getCredential().flatMap(credential -> {
137-
TcpClient tcpClient = ConnectionStrategy.createTcpClient(configuration.getClient(), balancedDns)
138-
.option(ChannelOption.SO_KEEPALIVE, tcp.isTcpKeepAlive())
139-
.option(ChannelOption.TCP_NODELAY, tcp.isTcpNoDelay())
140-
.remoteAddress(() -> address);
141-
142-
return ConnectionStrategy.login(tcpClient, credential, configuration);
143-
}).doOnError(e -> logger.warn("Fail to connect: ", e));
181+
InetSocketAddress address = iter.next();
182+
183+
return login.apply(() -> address).onErrorResume(error -> resumeConnect(
184+
error,
185+
address,
186+
addresses,
187+
iter,
188+
login,
189+
shuffle,
190+
attempts,
191+
maxAttempts
192+
));
144193
}
145194

146195
private static Mono<List<InetSocketAddress>> resolveAllHosts(
@@ -199,13 +248,4 @@ private static List<NodeAddress> copyAvailableAddresses(List<NodeAddress> addres
199248

200249
return InternalArrays.asImmutableList(addresses.toArray(new NodeAddress[0]));
201250
}
202-
203-
private static R2dbcNonTransientResourceException fail(String message, @Nullable Throwable cause) {
204-
return new R2dbcNonTransientResourceException(
205-
message,
206-
"H1000",
207-
9000,
208-
cause
209-
);
210-
}
211251
}

0 commit comments

Comments
 (0)