Skip to content

Commit 9b54407

Browse files
committed
Polishing.
Dispose ClusterCommandExecutor before disposing connection provider. Extract RedisClient disposal to method. Use AtomicBoolean guard instead of state and Lifecycle to dispose LettucePoolingConnectionProvider. See #3100 Original pull request: #3164
1 parent 461343e commit 9b54407

File tree

3 files changed

+93
-102
lines changed

3 files changed

+93
-102
lines changed

src/main/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -761,38 +761,16 @@ public void stop() {
761761
if (this.state.compareAndSet(State.STARTED, State.STOPPING)) {
762762

763763
if (getUsePool() && !isRedisClusterAware()) {
764-
if (this.pool != null) {
765-
try {
766-
this.pool.close();
767-
this.pool = null;
768-
} catch (Exception ex) {
769-
log.warn("Cannot properly close Jedis pool", ex);
770-
}
771-
}
772-
}
773-
774-
ClusterCommandExecutor clusterCommandExecutor = this.clusterCommandExecutor;
775-
776-
if (clusterCommandExecutor != null) {
777-
try {
778-
clusterCommandExecutor.destroy();
779-
this.clusterCommandExecutor = null;
780-
} catch (Exception ex) {
781-
throw new RuntimeException(ex);
782-
}
764+
dispose(pool);
765+
pool = null;
783766
}
784767

785-
if (this.cluster != null) {
768+
dispose(clusterCommandExecutor);
769+
clusterCommandExecutor = null;
786770

787-
this.topologyProvider = null;
788-
789-
try {
790-
this.cluster.close();
791-
this.cluster = null;
792-
} catch (Exception ex) {
793-
log.warn("Cannot properly close Jedis cluster", ex);
794-
}
795-
}
771+
dispose(cluster);
772+
topologyProvider = null;
773+
cluster = null;
796774

797775
this.state.set(State.STOPPED);
798776
}
@@ -882,6 +860,36 @@ public void destroy() {
882860
state.set(State.DESTROYED);
883861
}
884862

863+
private void dispose(@Nullable ClusterCommandExecutor commandExecutor) {
864+
if (commandExecutor != null) {
865+
try {
866+
commandExecutor.destroy();
867+
} catch (Exception ex) {
868+
log.warn("Cannot properly close cluster command executor", ex);
869+
}
870+
}
871+
}
872+
873+
private void dispose(@Nullable JedisCluster cluster) {
874+
if (cluster != null) {
875+
try {
876+
cluster.close();
877+
} catch (Exception ex) {
878+
log.warn("Cannot properly close Jedis cluster", ex);
879+
}
880+
}
881+
}
882+
883+
private void dispose(@Nullable Pool<Jedis> pool) {
884+
if (pool != null) {
885+
try {
886+
pool.close();
887+
} catch (Exception ex) {
888+
log.warn("Cannot properly close Jedis pool", ex);
889+
}
890+
}
891+
}
892+
885893
@Override
886894
public RedisConnection getConnection() {
887895

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -974,33 +974,23 @@ public void stop() {
974974

975975
resetConnection();
976976

977+
dispose(clusterCommandExecutor);
978+
clusterCommandExecutor = null;
979+
977980
dispose(connectionProvider);
978981
connectionProvider = null;
979982

980983
dispose(reactiveConnectionProvider);
981984
reactiveConnectionProvider = null;
982985

983-
dispose(clusterCommandExecutor);
984-
clusterCommandExecutor = null;
985-
986-
if (client != null) {
987-
try {
988-
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
989-
Duration timeout = clientConfiguration.getShutdownTimeout();
990-
991-
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
992-
client = null;
993-
} catch (Exception ex) {
994-
if (log.isWarnEnabled()) {
995-
log.warn(ClassUtils.getShortName(client.getClass()) + " did not shut down gracefully.", ex);
996-
}
997-
}
998-
}
986+
dispose(client);
987+
client = null;
999988
}
1000989

1001990
state.set(State.STOPPED);
1002991
}
1003992

993+
1004994
@Override
1005995
public boolean isRunning() {
1006996
return State.STARTED.equals(this.state.get());
@@ -1016,11 +1006,22 @@ public void afterPropertiesSet() {
10161006

10171007
@Override
10181008
public void destroy() {
1019-
stop();
10201009

1010+
stop();
10211011
this.state.set(State.DESTROYED);
10221012
}
10231013

1014+
private void dispose(@Nullable ClusterCommandExecutor commandExecutor) {
1015+
1016+
if (commandExecutor != null) {
1017+
try {
1018+
commandExecutor.destroy();
1019+
} catch (Exception ex) {
1020+
log.warn("Cannot properly close cluster command executor", ex);
1021+
}
1022+
}
1023+
}
1024+
10241025
private void dispose(@Nullable LettuceConnectionProvider connectionProvider) {
10251026

10261027
if (connectionProvider instanceof DisposableBean disposableBean) {
@@ -1034,12 +1035,18 @@ private void dispose(@Nullable LettuceConnectionProvider connectionProvider) {
10341035
}
10351036
}
10361037

1037-
private void dispose(@Nullable ClusterCommandExecutor commandExecutor) {
1038-
if (commandExecutor != null) {
1038+
private void dispose(@Nullable AbstractRedisClient client) {
1039+
1040+
if (client != null) {
10391041
try {
1040-
commandExecutor.destroy();
1042+
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
1043+
Duration timeout = clientConfiguration.getShutdownTimeout();
1044+
1045+
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
10411046
} catch (Exception ex) {
1042-
log.warn("Cannot properly close cluster command executor", ex);
1047+
if (log.isWarnEnabled()) {
1048+
log.warn(ClassUtils.getShortName(client.getClass()) + " did not shut down gracefully.", ex);
1049+
}
10431050
}
10441051
}
10451052
}

src/main/java/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java

Lines changed: 28 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@
3030
import java.util.concurrent.CompletableFuture;
3131
import java.util.concurrent.CompletionStage;
3232
import java.util.concurrent.ConcurrentHashMap;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3334

34-
import java.util.concurrent.atomic.AtomicReference;
3535
import org.apache.commons.logging.Log;
3636
import org.apache.commons.logging.LogFactory;
3737
import org.apache.commons.pool2.impl.GenericObjectPool;
3838
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
39+
3940
import org.springframework.beans.factory.DisposableBean;
40-
import org.springframework.context.SmartLifecycle;
4141
import org.springframework.data.redis.connection.PoolException;
4242
import org.springframework.util.Assert;
4343

@@ -62,12 +62,11 @@
6262
* @since 2.0
6363
* @see #getConnection(Class)
6464
*/
65-
class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean,
66-
SmartLifecycle {
65+
class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean {
6766

6867
private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class);
6968

70-
private final AtomicReference<State> state = new AtomicReference<>(State.CREATED);
69+
private final AtomicBoolean disposed = new AtomicBoolean();
7170
private final LettuceConnectionProvider connectionProvider;
7271
private final GenericObjectPoolConfig<StatefulConnection<?, ?>> poolConfig;
7372
private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap<>(
@@ -81,10 +80,6 @@ class LettucePoolingConnectionProvider implements LettuceConnectionProvider, Red
8180
private final Map<Class<?>, AsyncPool<StatefulConnection<?, ?>>> asyncPools = new ConcurrentHashMap<>(32);
8281
private final BoundedPoolConfig asyncPoolConfig;
8382

84-
enum State {
85-
CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
86-
}
87-
8883
LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider,
8984
LettucePoolingClientConfiguration clientConfiguration) {
9085

@@ -215,51 +210,43 @@ public CompletableFuture<Void> releaseAsync(StatefulConnection<?, ?> connection)
215210

216211
@Override
217212
public void destroy() throws Exception {
218-
stop();
219-
state.set(State.DESTROYED);
220-
}
221-
222213

223-
@Override
224-
public void start() {
225-
state.set(State.STARTED);
226-
}
214+
if (!disposed.compareAndSet(false, true)) {
215+
return;
216+
}
227217

228-
@Override
229-
public void stop() {
230-
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
231-
List<CompletableFuture<?>> futures = new ArrayList<>();
232-
if (!poolRef.isEmpty() || !asyncPoolRef.isEmpty()) {
233-
log.warn("LettucePoolingConnectionProvider contains unreleased connections");
234-
}
218+
List<CompletableFuture<?>> futures = new ArrayList<>();
219+
if (!poolRef.isEmpty() || !asyncPoolRef.isEmpty()) {
220+
log.warn("LettucePoolingConnectionProvider contains unreleased connections");
221+
}
235222

236-
if (!inProgressAsyncPoolRef.isEmpty()) {
223+
if (!inProgressAsyncPoolRef.isEmpty()) {
237224

238-
log.warn("LettucePoolingConnectionProvider has active connection retrievals");
239-
inProgressAsyncPoolRef.forEach((k, v) -> futures.add(k.thenApply(StatefulConnection::closeAsync)));
240-
}
225+
log.warn("LettucePoolingConnectionProvider has active connection retrievals");
226+
inProgressAsyncPoolRef.forEach((k, v) -> futures.add(k.thenApply(StatefulConnection::closeAsync)));
227+
}
241228

242-
if (!poolRef.isEmpty()) {
229+
if (!poolRef.isEmpty()) {
243230

244-
poolRef.forEach((connection, pool) -> pool.returnObject(connection));
245-
poolRef.clear();
246-
}
231+
poolRef.forEach((connection, pool) -> pool.returnObject(connection));
232+
poolRef.clear();
233+
}
247234

248-
if (!asyncPoolRef.isEmpty()) {
235+
if (!asyncPoolRef.isEmpty()) {
249236

250-
asyncPoolRef.forEach((connection, pool) -> futures.add(pool.release(connection)));
251-
asyncPoolRef.clear();
252-
}
237+
asyncPoolRef.forEach((connection, pool) -> futures.add(pool.release(connection)));
238+
asyncPoolRef.clear();
239+
}
253240

254-
pools.forEach((type, pool) -> pool.close());
241+
pools.forEach((type, pool) -> pool.close());
255242

256-
CompletableFuture
243+
CompletableFuture
257244
.allOf(futures.stream().map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors()))
258-
.toArray(CompletableFuture[]::new)) //
245+
.toArray(CompletableFuture[]::new)) //
259246
.thenCompose(ignored -> {
260247

261248
CompletableFuture[] poolClose = asyncPools.values().stream().map(AsyncPool::closeAsync)
262-
.map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors())).toArray(CompletableFuture[]::new);
249+
.map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors())).toArray(CompletableFuture[]::new);
263250

264251
return CompletableFuture.allOf(poolClose);
265252
}) //
@@ -269,18 +256,7 @@ public void stop() {
269256
}) //
270257
.join();
271258

272-
pools.clear();
273-
}
274-
state.set(State.STOPPED);
275-
}
276-
277-
@Override
278-
public boolean isRunning() {
279-
return State.STARTED.equals(this.state.get());
259+
pools.clear();
280260
}
281261

282-
@Override
283-
public boolean isAutoStartup() {
284-
return true;
285-
}
286262
}

0 commit comments

Comments
 (0)