-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Multi node pipeline executor #4070
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
c196d7a
a50568a
1e9a4fb
8205d3a
5d3398f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package redis.clients.jedis; | ||
|
||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.ExecutorService; | ||
|
||
/** | ||
* Executor used for parallel syncing of multinode pipeline when provided in | ||
* {@link DefaultJedisClientConfig.Builder#pipelineExecutorProvider(PipelineExecutorProvider)} | ||
*/ | ||
public interface ClusterPipelineExecutor extends Executor, AutoCloseable { | ||
|
||
/** | ||
* To avoid following hte {@link JedisCluster} client lifecycle in shutting down the executor service | ||
* provide your own implementation of this interface to {@link PipelineExecutorProvider} | ||
*/ | ||
default void shutdown() {} | ||
|
||
default void close() { | ||
shutdown(); | ||
} | ||
|
||
/** | ||
* Wrap an executor service into a {@link ClusterPipelineExecutor} to allow clients to provide their | ||
* desired implementation of the {@link ExecutorService} to support parallel syncing of {@link MultiNodePipelineBase}. | ||
* | ||
* @param executorService | ||
* @return ClusterPipelineExecutor that will be shutdown alongside the {@link JedisCluster} client. | ||
*/ | ||
static ClusterPipelineExecutor from(ExecutorService executorService) { | ||
return new ClusterPipelineExecutor() { | ||
@Override | ||
public void execute(Runnable command) { | ||
executorService.execute(command); | ||
} | ||
|
||
@Override | ||
public void shutdown() { | ||
executorService.shutdown(); | ||
} | ||
}; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
import redis.clients.jedis.csc.Cache; | ||
import redis.clients.jedis.csc.CacheConfig; | ||
import redis.clients.jedis.csc.CacheFactory; | ||
import redis.clients.jedis.util.IOUtils; | ||
import redis.clients.jedis.util.JedisClusterCRC16; | ||
|
||
public class JedisCluster extends UnifiedJedis { | ||
|
@@ -29,6 +30,12 @@ public class JedisCluster extends UnifiedJedis { | |
*/ | ||
public static final int DEFAULT_MAX_ATTEMPTS = 5; | ||
|
||
/** | ||
* Executor used to close MultiNodePipeline in parallel. See {@link JedisClientConfig#getPipelineExecutorProvider()} | ||
* for mor details on configuration. | ||
*/ | ||
private ClusterPipelineExecutor clusterPipelineExecutor; | ||
|
||
/** | ||
* Creates a JedisCluster instance. The provided node is used to make the first contact with the cluster. | ||
* <p> | ||
|
@@ -251,6 +258,8 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi | |
Duration maxTotalRetriesDuration) { | ||
this(new ClusterConnectionProvider(clusterNodes, clientConfig), maxAttempts, maxTotalRetriesDuration, | ||
clientConfig.getRedisProtocol()); | ||
clientConfig.getPipelineExecutorProvider() | ||
.getClusteredPipelineExecutor().ifPresent((executor) -> this.clusterPipelineExecutor = executor); | ||
} | ||
|
||
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, | ||
|
@@ -268,13 +277,17 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi | |
Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) { | ||
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration, | ||
clientConfig.getRedisProtocol()); | ||
clientConfig.getPipelineExecutorProvider() | ||
.getClusteredPipelineExecutor().ifPresent((executor) -> this.clusterPipelineExecutor = executor); | ||
} | ||
|
||
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, | ||
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts, | ||
Duration maxTotalRetriesDuration) { | ||
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod), | ||
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol()); | ||
clientConfig.getPipelineExecutorProvider() | ||
.getClusteredPipelineExecutor().ifPresent((executor) -> this.clusterPipelineExecutor = executor); | ||
} | ||
|
||
// Uses a fetched connection to process protocol. Should be avoided if possible. | ||
|
@@ -334,6 +347,12 @@ private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Durati | |
super(provider, maxAttempts, maxTotalRetriesDuration, protocol, clientSideCache); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
super.close(); | ||
IOUtils.closeQuietly(this.clusterPipelineExecutor); | ||
} | ||
|
||
Comment on lines
+350
to
+355
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Considering |
||
/** | ||
* Returns all nodes that were configured to connect to in key-value pairs ({@link Map}).<br> | ||
* Key is the HOST:PORT and the value is the connection pool. | ||
|
@@ -376,9 +395,14 @@ public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... cha | |
|
||
@Override | ||
public ClusterPipeline pipelined() { | ||
return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects); | ||
if (clusterPipelineExecutor == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just provide an overload method with executor parameter? like public ClusterPipeline pipelined(Executor pipelineExecutor) {
xxx
} |
||
return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects); | ||
} | ||
return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects, | ||
clusterPipelineExecutor); | ||
} | ||
|
||
|
||
/** | ||
* @param doMulti param | ||
* @return nothing | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package redis.clients.jedis; | ||
|
||
import java.util.Optional; | ||
import java.util.concurrent.Executors; | ||
|
||
/** | ||
* This provides a {@link ClusterPipelineExecutor} used for parallel syncing of {@link MultiNodePipelineBase} | ||
*/ | ||
public class PipelineExecutorProvider { | ||
|
||
static final PipelineExecutorProvider DEFAULT = new PipelineExecutorProvider(); | ||
|
||
private ClusterPipelineExecutor clusterPipelineExecutor; | ||
|
||
/** | ||
* Default constructor providing an empty {@link Optional} of {@link ClusterPipelineExecutor} | ||
*/ | ||
private PipelineExecutorProvider() {} | ||
|
||
/** | ||
* Will provide a {@link ClusterPipelineExecutor} with the specified number of thread. The number of thread | ||
* should be equal or higher than the number of master nodes in the cluster. | ||
* | ||
* @param threadCount | ||
*/ | ||
public PipelineExecutorProvider(int threadCount) { | ||
this.clusterPipelineExecutor = ClusterPipelineExecutor.from(Executors.newFixedThreadPool(threadCount));; | ||
} | ||
|
||
/** | ||
* Allow clients to provide their own implementation of {@link ClusterPipelineExecutor} | ||
* @param clusterPipelineExecutor | ||
*/ | ||
public PipelineExecutorProvider(ClusterPipelineExecutor clusterPipelineExecutor) { | ||
this.clusterPipelineExecutor = clusterPipelineExecutor; | ||
} | ||
|
||
/** | ||
* @return an empty option by default, otherwise will return the configured value. | ||
*/ | ||
Optional<ClusterPipelineExecutor> getClusteredPipelineExecutor() { | ||
return Optional.ofNullable(clusterPipelineExecutor); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think there is an issue here with the multithread/multi-client/cluster cases compared to "as-is" implementation; they will need to wait each other to complete/release the executors which was not the case before.
i like it better the previous approach in this PR where we provide
ExecutorService
to each pipeline execution explicitly.