Skip to content

Multi node pipeline executor #4070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/main/java/redis/clients/jedis/ClusterPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.Duration;
import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.IOUtils;
Expand Down Expand Up @@ -40,6 +41,12 @@ public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects
this.provider = provider;
}

public ClusterPipeline(ClusterConnectionProvider provider, ClusterCommandObjects commandObjects,
ClusterPipelineExecutor executorService) {
super(commandObjects, executorService);
this.provider = provider;
}

private static ClusterCommandObjects createClusterCommandObjects(RedisProtocol protocol) {
ClusterCommandObjects cco = new ClusterCommandObjects();
if (protocol == RedisProtocol.RESP3) cco.setProtocol(protocol);
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/redis/clients/jedis/ClusterPipelineExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package redis.clients.jedis;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

/**
* Executor used for parallel syncing of multinode pipeline when provided in
* {@link DefaultJedisClientConfig.Builder#pipelineExecutorProvider(PipelineExecutorProvider)}
*/
public interface ClusterPipelineExecutor extends Executor, AutoCloseable {

/**
* To avoid following hte {@link JedisCluster} client lifecycle in shutting down the executor service
* provide your own implementation of this interface to {@link PipelineExecutorProvider}
*/
default void shutdown() {}

default void close() {
shutdown();
}

/**
* Wrap an executor service into a {@link ClusterPipelineExecutor} to allow clients to provide their
* desired implementation of the {@link ExecutorService} to support parallel syncing of {@link MultiNodePipelineBase}.
*
* @param executorService
* @return ClusterPipelineExecutor that will be shutdown alongside the {@link JedisCluster} client.
*/
static ClusterPipelineExecutor from(ExecutorService executorService) {
return new ClusterPipelineExecutor() {
@Override
public void execute(Runnable command) {
executorService.execute(command);
}

@Override
public void shutdown() {
executorService.shutdown();
}
};
}

}
16 changes: 16 additions & 0 deletions src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public final class DefaultJedisClientConfig implements JedisClientConfig {

private final AuthXManager authXManager;

private final PipelineExecutorProvider pipelineExecutorProvider;

private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) {
this.redisProtocol = builder.redisProtocol;
this.connectionTimeoutMillis = builder.connectionTimeoutMillis;
Expand All @@ -50,6 +52,7 @@ private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) {
this.clientSetInfoConfig = builder.clientSetInfoConfig;
this.readOnlyForRedisClusterReplicas = builder.readOnlyForRedisClusterReplicas;
this.authXManager = builder.authXManager;
this.pipelineExecutorProvider = builder.pipelineExecutorProvider;
}

@Override
Expand Down Expand Up @@ -143,6 +146,11 @@ public boolean isReadOnlyForRedisClusterReplicas() {
return readOnlyForRedisClusterReplicas;
}

@Override
public PipelineExecutorProvider getPipelineExecutorProvider() {
return pipelineExecutorProvider;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -175,6 +183,8 @@ public static class Builder {

private AuthXManager authXManager = null;

private PipelineExecutorProvider pipelineExecutorProvider = PipelineExecutorProvider.DEFAULT;

private Builder() {
}

Expand Down Expand Up @@ -297,6 +307,11 @@ public Builder authXManager(AuthXManager authXManager) {
return this;
}

public Builder pipelineExecutorProvider(PipelineExecutorProvider pipelineExecutorProvider) {
this.pipelineExecutorProvider = pipelineExecutorProvider;
return this;
}

public Builder from(JedisClientConfig instance) {
this.redisProtocol = instance.getRedisProtocol();
this.connectionTimeoutMillis = instance.getConnectionTimeoutMillis();
Expand All @@ -314,6 +329,7 @@ public Builder from(JedisClientConfig instance) {
this.clientSetInfoConfig = instance.getClientSetInfoConfig();
this.readOnlyForRedisClusterReplicas = instance.isReadOnlyForRedisClusterReplicas();
this.authXManager = instance.getAuthXManager();
this.pipelineExecutorProvider = instance.getPipelineExecutorProvider();
return this;
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,15 @@ default boolean isReadOnlyForRedisClusterReplicas() {
default ClientSetInfoConfig getClientSetInfoConfig() {
return ClientSetInfoConfig.DEFAULT;
}

/**
* If different then DEFAULT this will provide an Executor implementation that will sync/close multi node pipelines
* in parallel. This replaces the deprecated internal usage of new Executor Services for every pipeline, resulting in
* high thread creation rates and impact on latency.
*
* @return PipelineExecutorProvider
*/
default PipelineExecutorProvider getPipelineExecutorProvider() {
return PipelineExecutorProvider.DEFAULT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think there is an issue here with the multithread/multi-client/cluster cases compared to "as-is" implementation; they will need to wait each other to complete/release the executors which was not the case before.
i like it better the previous approach in this PR where we provide ExecutorService to each pipeline execution explicitly.

}
}
26 changes: 25 additions & 1 deletion src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import redis.clients.jedis.csc.Cache;
import redis.clients.jedis.csc.CacheConfig;
import redis.clients.jedis.csc.CacheFactory;
import redis.clients.jedis.util.IOUtils;
import redis.clients.jedis.util.JedisClusterCRC16;

public class JedisCluster extends UnifiedJedis {
Expand All @@ -29,6 +30,12 @@ public class JedisCluster extends UnifiedJedis {
*/
public static final int DEFAULT_MAX_ATTEMPTS = 5;

/**
* Executor used to close MultiNodePipeline in parallel. See {@link JedisClientConfig#getPipelineExecutorProvider()}
* for mor details on configuration.
*/
private ClusterPipelineExecutor clusterPipelineExecutor;

/**
* Creates a JedisCluster instance. The provided node is used to make the first contact with the cluster.
* <p>
Expand Down Expand Up @@ -251,6 +258,8 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol());
clientConfig.getPipelineExecutorProvider()
.getClusteredPipelineExecutor().ifPresent((executor) -> this.clusterPipelineExecutor = executor);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
Expand All @@ -268,13 +277,17 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol());
clientConfig.getPipelineExecutorProvider()
.getClusteredPipelineExecutor().ifPresent((executor) -> this.clusterPipelineExecutor = executor);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol());
clientConfig.getPipelineExecutorProvider()
.getClusteredPipelineExecutor().ifPresent((executor) -> this.clusterPipelineExecutor = executor);
}

// Uses a fetched connection to process protocol. Should be avoided if possible.
Expand Down Expand Up @@ -334,6 +347,12 @@ private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Durati
super(provider, maxAttempts, maxTotalRetriesDuration, protocol, clientSideCache);
}

@Override
public void close() {
super.close();
IOUtils.closeQuietly(this.clusterPipelineExecutor);
}

Comment on lines +350 to +355
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering JedisClientConfig could be reused among multiple clients. Closing the clusterPipelineExecutor when one of the clients completes, could cause other clients initialized from the same JedisClientConfig to fail.
I have concerns that the contract for who is managing the pipeline executor lifecycle is not clear. We can end up with the implementation of PipelineExecutorProvider which creates a new executor each time getPipelineExecutorProvider() is invoked, or another implementation which reuses the same instance.

/**
* Returns all nodes that were configured to connect to in key-value pairs ({@link Map}).<br>
* Key is the HOST:PORT and the value is the connection pool.
Expand Down Expand Up @@ -376,9 +395,14 @@ public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... cha

@Override
public ClusterPipeline pipelined() {
return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects);
if (clusterPipelineExecutor == null) {
Copy link
Contributor

@xrayw xrayw Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just provide an overload method with executor parameter?
It makes it more flexible and easy to use. We can specify different executors for different scenarios.

like

public ClusterPipeline pipelined(Executor pipelineExecutor) {
   xxx
}

return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects);
}
return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects,
clusterPipelineExecutor);
}


/**
* @param doMulti param
* @return nothing
Expand Down
87 changes: 52 additions & 35 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package redis.clients.jedis;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
Expand All @@ -24,11 +23,17 @@ public abstract class MultiNodePipelineBase extends PipelineBase {
* The number of processes for {@code sync()}. If you have enough cores for client (and you have
* more than 3 cluster nodes), you may increase this number of workers.
* Suggestion:&nbsp;&le;&nbsp;cluster&nbsp;nodes.
*
* @deprecated Client using this approach are paying the thread creation cost for every pipeline sync. Clients
* should use refer to {@link JedisClientConfig#getPipelineExecutorProvider()} to provide a single Executor for
* gain in performance.
*/
public static volatile int MULTI_NODE_PIPELINE_SYNC_WORKERS = 3;

private final Map<HostAndPort, Queue<Response<?>>> pipelinedResponses;
private final Map<HostAndPort, Connection> connections;
private ClusterPipelineExecutor clusterPipelineExecutor;
private boolean useSharedExecutor = false;
private volatile boolean syncing = false;

public MultiNodePipelineBase(CommandObjects commandObjects) {
Expand All @@ -37,6 +42,14 @@ public MultiNodePipelineBase(CommandObjects commandObjects) {
connections = new LinkedHashMap<>();
}

public MultiNodePipelineBase(CommandObjects commandObjects, ClusterPipelineExecutor executorService) {
super(commandObjects);
clusterPipelineExecutor = executorService;
useSharedExecutor = clusterPipelineExecutor != null;
pipelinedResponses = new LinkedHashMap<>();
connections = new LinkedHashMap<>();
}

protected abstract HostAndPort getNodeKey(CommandArguments args);

protected abstract Connection getConnection(HostAndPort nodeKey);
Expand Down Expand Up @@ -84,44 +97,48 @@ public final void sync() {
return;
}
syncing = true;

ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);

CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
= pipelinedResponses.entrySet().iterator();
while (pipelinedResponsesIterator.hasNext()) {
Map.Entry<HostAndPort, Queue<Response<?>>> entry = pipelinedResponsesIterator.next();
HostAndPort nodeKey = entry.getKey();
Queue<Response<?>> queue = entry.getValue();
Connection connection = connections.get(nodeKey);
executorService.submit(() -> {
try {
List<Object> unformatted = connection.getMany(queue.size());
for (Object o : unformatted) {
queue.poll().set(o);
}
} catch (JedisConnectionException jce) {
log.error("Error with connection to " + nodeKey, jce);
// cleanup the connection
pipelinedResponsesIterator.remove();
connections.remove(nodeKey);
IOUtils.closeQuietly(connection);
} finally {
countDownLatch.countDown();
}
});
}

ClusterPipelineExecutor executorService = getExecutorService();
CompletableFuture[] futures
= pipelinedResponses.entrySet().stream()
.map(response -> CompletableFuture.runAsync(() -> readCommandResponse(response), executorService))
.toArray(CompletableFuture[]::new);
CompletableFuture awaitAllCompleted = CompletableFuture.allOf(futures);
try {
countDownLatch.await();
awaitAllCompleted.get();
if (!useSharedExecutor) {
executorService.shutdown();
}
} catch (ExecutionException e) {
log.error("Failed execution.", e);
} catch (InterruptedException e) {
log.error("Thread is interrupted during sync.", e);
Thread.currentThread().interrupt();
}
syncing = false;
}

executorService.shutdownNow();
private ClusterPipelineExecutor getExecutorService() {
if (useSharedExecutor) {
return clusterPipelineExecutor;
}
return ClusterPipelineExecutor.from(
Executors.newFixedThreadPool(Math.min(this.pipelinedResponses.size(), MULTI_NODE_PIPELINE_SYNC_WORKERS)));
}

syncing = false;
private void readCommandResponse(Map.Entry<HostAndPort, Queue<Response<?>>> entry) {
HostAndPort nodeKey = entry.getKey();
Queue<Response<?>> queue = entry.getValue();
Connection connection = connections.get(nodeKey);
try {
List<Object> unformatted = connection.getMany(queue.size());
for (Object o : unformatted) {
queue.poll().set(o);
}
} catch (JedisConnectionException jce) {
log.error("Error with connection to " + nodeKey, jce);
connections.remove(nodeKey);
IOUtils.closeQuietly(connection);
}
}

@Deprecated
Expand Down
44 changes: 44 additions & 0 deletions src/main/java/redis/clients/jedis/PipelineExecutorProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package redis.clients.jedis;

import java.util.Optional;
import java.util.concurrent.Executors;

/**
* This provides a {@link ClusterPipelineExecutor} used for parallel syncing of {@link MultiNodePipelineBase}
*/
public class PipelineExecutorProvider {

static final PipelineExecutorProvider DEFAULT = new PipelineExecutorProvider();

private ClusterPipelineExecutor clusterPipelineExecutor;

/**
* Default constructor providing an empty {@link Optional} of {@link ClusterPipelineExecutor}
*/
private PipelineExecutorProvider() {}

/**
* Will provide a {@link ClusterPipelineExecutor} with the specified number of thread. The number of thread
* should be equal or higher than the number of master nodes in the cluster.
*
* @param threadCount
*/
public PipelineExecutorProvider(int threadCount) {
this.clusterPipelineExecutor = ClusterPipelineExecutor.from(Executors.newFixedThreadPool(threadCount));;
}

/**
* Allow clients to provide their own implementation of {@link ClusterPipelineExecutor}
* @param clusterPipelineExecutor
*/
public PipelineExecutorProvider(ClusterPipelineExecutor clusterPipelineExecutor) {
this.clusterPipelineExecutor = clusterPipelineExecutor;
}

/**
* @return an empty option by default, otherwise will return the configured value.
*/
Optional<ClusterPipelineExecutor> getClusteredPipelineExecutor() {
return Optional.ofNullable(clusterPipelineExecutor);
}
}
Loading