diff --git a/pom.xml b/pom.xml
index 384157aed5..22f3c32166 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
 
 	<groupId>org.springframework.data</groupId>
 	<artifactId>spring-data-redis</artifactId>
-	<version>3.2.0-SNAPSHOT</version>
+	<version>3.2.x-2503-SNAPSHOT</version>
 
 	<name>Spring Data Redis</name>
 	<description>Spring Data module for Redis</description>
diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java
index b25a6e24fe..51741a0083 100644
--- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java
+++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java
@@ -34,6 +34,7 @@
 import java.util.LinkedHashSet;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.SSLParameters;
@@ -42,9 +43,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
-
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.SmartLifecycle;
 import org.springframework.dao.DataAccessException;
 import org.springframework.dao.InvalidDataAccessApiUsageException;
 import org.springframework.dao.InvalidDataAccessResourceUsageException;
@@ -85,7 +86,7 @@
  * @see JedisClientConfiguration
  * @see Jedis
  */
-public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
+public class JedisConnectionFactory implements RedisConnectionFactory, InitializingBean, DisposableBean, SmartLifecycle {
 
 	private final static Log log = LogFactory.getLog(JedisConnectionFactory.class);
 	private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
@@ -104,8 +105,11 @@ public class JedisConnectionFactory implements InitializingBean, DisposableBean,
 	private @Nullable ClusterTopologyProvider topologyProvider;
 	private @Nullable ClusterCommandExecutor clusterCommandExecutor;
 
-	private boolean initialized;
-	private boolean destroyed;
+	enum State {
+		CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
+	}
+
+	private AtomicReference<State> state = new AtomicReference<>(State.CREATED);
 
 	/**
 	 * Constructs a new {@link JedisConnectionFactory} instance with default settings (default connection pooling).
@@ -287,24 +291,80 @@ protected JedisConnection postProcessConnection(JedisConnection connection) {
 		return connection;
 	}
 
-	public void afterPropertiesSet() {
+	@Override
+	public void start() {
 
-		clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword());
+		State current = state.getAndUpdate(state -> {
+			if (State.CREATED.equals(state) || State.STOPPED.equals(state)) {
+				return State.STARTING;
+			}
+			return state;
+		});
 
-		if (getUsePool() && !isRedisClusterAware()) {
-			this.pool = createPool();
+		if (State.CREATED.equals(current) || State.STOPPED.equals(current)) {
+
+			if (getUsePool() && !isRedisClusterAware()) {
+				this.pool = createPool();
+			}
+
+			if (isRedisClusterAware()) {
+
+				this.cluster = createCluster();
+				this.topologyProvider = createTopologyProvider(this.cluster);
+				this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
+						new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider),
+						EXCEPTION_TRANSLATION);
+			}
+
+			state.set(State.STARTED);
 		}
+	}
 
-		if (isRedisClusterAware()) {
+	@Override
+	public void stop() {
+
+		if (state.compareAndSet(State.STARTED, State.STOPPING)) {
+			if (getUsePool() && !isRedisClusterAware()) {
+				if (pool != null) {
+					try {
+						this.pool.close();
+					} catch (Exception ex) {
+						log.warn("Cannot properly close Jedis pool", ex);
+					}
+					this.pool = null;
+				}
+			}
+
+			if(this.clusterCommandExecutor != null) {
+				try {
+					this.clusterCommandExecutor.destroy();
+				} catch (Exception e) {
+					throw new RuntimeException(e);
+				}
+			}
 
-			this.cluster = createCluster();
-			this.topologyProvider = createTopologyProvider(this.cluster);
-			this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
-					new JedisClusterConnection.JedisClusterNodeResourceProvider(this.cluster, this.topologyProvider),
-					EXCEPTION_TRANSLATION);
+			if (this.cluster != null) {
+
+				this.topologyProvider = null;
+
+				try {
+					cluster.close();
+				} catch (Exception ex) {
+					log.warn("Cannot properly close Jedis cluster", ex);
+				}
+			}
+			state.set(State.STOPPED);
 		}
+	}
+
+	@Override
+	public boolean isRunning() {
+		return State.STARTED.equals(state.get());
+	}
 
-		this.initialized = true;
+	@Override
+	public void afterPropertiesSet() {
+		clientConfig = createClientConfig(getDatabase(), getRedisUsername(), getRedisPassword());
 	}
 
 	JedisClientConfig createSentinelClientConfig(SentinelConfiguration sentinelConfiguration) {
@@ -415,32 +475,8 @@ protected JedisCluster createCluster(RedisClusterConfiguration clusterConfig,
 
 	public void destroy() {
 
-		if (getUsePool() && pool != null) {
-
-			try {
-				pool.destroy();
-			} catch (Exception ex) {
-				log.warn("Cannot properly close Jedis pool", ex);
-			}
-			pool = null;
-		}
-
-		if (cluster != null) {
-
-			try {
-				cluster.close();
-			} catch (Exception ex) {
-				log.warn("Cannot properly close Jedis cluster", ex);
-			}
-
-			try {
-				clusterCommandExecutor.destroy();
-			} catch (Exception ex) {
-				log.warn("Cannot properly close cluster command executor", ex);
-			}
-		}
-
-		this.destroyed = true;
+		stop();
+		state.set(State.DESTROYED);
 	}
 
 	public RedisConnection getConnection() {
@@ -866,8 +902,19 @@ private MutableJedisClientConfiguration getMutableConfiguration() {
 	}
 
 	private void assertInitialized() {
-		Assert.state(this.initialized, "JedisConnectionFactory was not initialized through afterPropertiesSet()");
-		Assert.state(!this.destroyed, "JedisConnectionFactory was destroyed and cannot be used anymore");
+
+		State current = state.get();
+
+		if (State.STARTED.equals(current)) {
+			return;
+		}
+
+		switch (current) {
+			case CREATED, STOPPED -> throw new IllegalStateException(String.format("JedisConnectionFactory has been %s. Use start() to initialize it", current));
+			case DESTROYED -> throw new IllegalStateException(
+					"JedisConnectionFactory was destroyed and cannot be used anymore");
+			default -> throw new IllegalStateException(String.format("JedisConnectionFactory is %s", current));
+		}
 	}
 
 	/**
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
index 6ddeb7f350..2a77891a24 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
@@ -26,11 +26,13 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.SmartLifecycle;
 import org.springframework.dao.DataAccessException;
 import org.springframework.dao.InvalidDataAccessApiUsageException;
 import org.springframework.data.redis.ExceptionTranslationStrategy;
@@ -116,8 +118,8 @@
  * @author Andrea Como
  * @author Chris Bono
  */
-public class LettuceConnectionFactory
-		implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {
+public class LettuceConnectionFactory implements RedisConnectionFactory, ReactiveRedisConnectionFactory,
+		InitializingBean, DisposableBean, SmartLifecycle {
 
 	private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
 			LettuceExceptionConverter.INSTANCE);
@@ -144,8 +146,11 @@ public class LettuceConnectionFactory
 
 	private @Nullable ClusterCommandExecutor clusterCommandExecutor;
 
-	private boolean initialized;
-	private boolean destroyed;
+	enum State {
+		CREATED, STARTING, STARTED, STOPPING, STOPPED, DESTROYED;
+	}
+
+	private AtomicReference<State> state = new AtomicReference<>(State.CREATED);
 
 	/**
 	 * Constructs a new {@link LettuceConnectionFactory} instance with default settings.
@@ -333,33 +338,78 @@ public static RedisConfiguration createRedisConfiguration(RedisURI redisUri) {
 		return LettuceConverters.createRedisStandaloneConfiguration(redisUri);
 	}
 
-	public void afterPropertiesSet() {
+	@Override
+	public void start() {
+
+		State current = state.getAndUpdate(state -> {
+			if (State.CREATED.equals(state) || State.STOPPED.equals(state)) {
+				return State.STARTING;
+			}
+			return state;
+		});
 
-		this.client = createClient();
+		if (State.CREATED.equals(current) || State.STOPPED.equals(current)) {
 
-		this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC));
-		this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
-				createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));
+			this.client = createClient();
 
-		if (isClusterAware()) {
+			this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC));
+			this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
+					createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));
+
+			if (isClusterAware()) {
+
+				this.clusterCommandExecutor = new ClusterCommandExecutor(
+						new LettuceClusterTopologyProvider((RedisClusterClient) client),
+						new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
+						EXCEPTION_TRANSLATION);
+			}
+
+			state.set(State.STARTED);
 
-			this.clusterCommandExecutor = new ClusterCommandExecutor(
-					new LettuceClusterTopologyProvider((RedisClusterClient) client),
-					new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
-					EXCEPTION_TRANSLATION);
+			if (getEagerInitialization() && getShareNativeConnection()) {
+				initConnection();
+			}
 		}
+	}
+
+	@Override
+	public void stop() {
 
-		this.initialized = true;
+		if (state.compareAndSet(State.STARTED, State.STOPPING)) {
+			resetConnection();
+			dispose(connectionProvider);
+			dispose(reactiveConnectionProvider);
+			try {
+				Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
+				Duration timeout = clientConfiguration.getShutdownTimeout();
+				client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
+				state.set(State.STOPPED);
+			} catch (Exception e) {
 
-		if (getEagerInitialization() && getShareNativeConnection()) {
-			initConnection();
+				if (log.isWarnEnabled()) {
+					log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient")
+							+ " did not shut down gracefully.", e);
+				}
+			}
+			state.set(State.STOPPED);
 		}
 	}
 
-	public void destroy() {
+	@Override
+	public boolean isRunning() {
+		return State.STARTED.equals(state.get());
+	}
 
-		resetConnection();
+	@Override
+	public void afterPropertiesSet() {
+		// customization hook. initialization happens in start
+	}
 
+	@Override
+	public void destroy() {
+
+		stop();
+		client = null;
 		if (clusterCommandExecutor != null) {
 
 			try {
@@ -368,23 +418,7 @@ public void destroy() {
 				log.warn("Cannot properly close cluster command executor", ex);
 			}
 		}
-
-		dispose(connectionProvider);
-		dispose(reactiveConnectionProvider);
-
-		try {
-			Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
-			Duration timeout = clientConfiguration.getShutdownTimeout();
-			client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
-		} catch (Exception e) {
-
-			if (log.isWarnEnabled()) {
-				log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient")
-						+ " did not shut down gracefully.", e);
-			}
-		}
-
-		this.destroyed = true;
+		state.set(State.DESTROYED);
 	}
 
 	private void dispose(LettuceConnectionProvider connectionProvider) {
@@ -532,8 +566,6 @@ public void initConnection() {
 	 */
 	public void resetConnection() {
 
-		assertInitialized();
-
 		Optionals.toStream(Optional.ofNullable(connection), Optional.ofNullable(reactiveConnection))
 				.forEach(SharedConnection::resetConnection);
 
@@ -1267,8 +1299,19 @@ private RedisClient createBasicClient() {
 	}
 
 	private void assertInitialized() {
-		Assert.state(this.initialized, "LettuceConnectionFactory was not initialized through afterPropertiesSet()");
-		Assert.state(!this.destroyed, "LettuceConnectionFactory was destroyed and cannot be used anymore");
+
+		State current = state.get();
+
+		if (State.STARTED.equals(current)) {
+			return;
+		}
+
+		switch (current) {
+			case CREATED, STOPPED -> throw new IllegalStateException(String.format("LettuceConnectionFactory has been %s. Use start() to initialize it", current));
+			case DESTROYED -> throw new IllegalStateException(
+					"LettuceConnectionFactory was destroyed and cannot be used anymore");
+			default -> throw new IllegalStateException(String.format("LettuceConnectionFactory is %s", current));
+		}
 	}
 
 	private static void applyToAll(RedisURI source, Consumer<RedisURI> action) {
diff --git a/src/main/java/org/springframework/data/redis/support/collections/RedisCollectionFactoryBean.java b/src/main/java/org/springframework/data/redis/support/collections/RedisCollectionFactoryBean.java
index 38edda7a4b..3c0f8ca537 100644
--- a/src/main/java/org/springframework/data/redis/support/collections/RedisCollectionFactoryBean.java
+++ b/src/main/java/org/springframework/data/redis/support/collections/RedisCollectionFactoryBean.java
@@ -16,10 +16,11 @@
 package org.springframework.data.redis.support.collections;
 
 import org.springframework.beans.factory.BeanNameAware;
-import org.springframework.beans.factory.FactoryBean;
 import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.SmartFactoryBean;
 import org.springframework.data.redis.connection.DataType;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.util.Lazy;
 import org.springframework.lang.Nullable;
 import org.springframework.util.Assert;
 import org.springframework.util.StringUtils;
@@ -30,8 +31,9 @@
  * Otherwise uses the provided type (default is list).
  *
  * @author Costin Leau
+ * @author Christoph Strobl
  */
-public class RedisCollectionFactoryBean implements InitializingBean, BeanNameAware, FactoryBean<RedisStore> {
+public class RedisCollectionFactoryBean implements SmartFactoryBean<RedisStore>, BeanNameAware, InitializingBean {
 
 	/**
 	 * Collection types supported by this factory.
@@ -73,13 +75,15 @@ public DataType dataType() {
 		abstract DataType dataType();
 	}
 
-	private @Nullable RedisStore store;
+	private @Nullable Lazy<RedisStore> store;
 	private @Nullable CollectionType type = null;
 	private @Nullable RedisTemplate<String, ?> template;
 	private @Nullable String key;
 	private @Nullable String beanName;
 
+	@Override
 	public void afterPropertiesSet() {
+
 		if (!StringUtils.hasText(key)) {
 			key = beanName;
 		}
@@ -87,19 +91,23 @@ public void afterPropertiesSet() {
 		Assert.hasText(key, "Collection key is required - no key or bean name specified");
 		Assert.notNull(template, "Redis template is required");
 
-		DataType dt = template.type(key);
+		store = Lazy.of(() -> {
+
+			DataType dt = template.type(key);
 
-		// can't create store
-		Assert.isTrue(!DataType.STRING.equals(dt), "Cannot create store on keys of type 'string'");
+			// can't create store
+			Assert.isTrue(!DataType.STRING.equals(dt), "Cannot create store on keys of type 'string'");
 
-		store = createStore(dt);
+			RedisStore tmp = createStore(dt);
 
-		if (store == null) {
-			if (type == null) {
-				type = CollectionType.LIST;
+			if (tmp == null) {
+				if (type == null) {
+					type = CollectionType.LIST;
+				}
+				tmp = createStore(type.dataType());
 			}
-			store = createStore(type.dataType());
-		}
+			return tmp;
+		});
 	}
 
 	@SuppressWarnings("unchecked")
@@ -123,18 +131,17 @@ private RedisStore createStore(DataType dt) {
 		return null;
 	}
 
+	@Override
 	public RedisStore getObject() {
-		return store;
+		return store.get();
 	}
 
+	@Override
 	public Class<?> getObjectType() {
-		return (store != null ? store.getClass() : RedisStore.class);
-	}
-
-	public boolean isSingleton() {
-		return true;
+		return (store != null ? store.get().getClass() : RedisStore.class);
 	}
 
+	@Override
 	public void setBeanName(String name) {
 		this.beanName = name;
 	}
diff --git a/src/test/java/org/springframework/data/redis/ConnectionFactoryTracker.java b/src/test/java/org/springframework/data/redis/ConnectionFactoryTracker.java
index d63b83e583..489fab2c03 100644
--- a/src/test/java/org/springframework/data/redis/ConnectionFactoryTracker.java
+++ b/src/test/java/org/springframework/data/redis/ConnectionFactoryTracker.java
@@ -21,6 +21,7 @@
 import java.util.Set;
 
 import org.springframework.beans.factory.DisposableBean;
+import org.springframework.context.SmartLifecycle;
 import org.springframework.data.redis.connection.RedisConnectionFactory;
 
 /**
@@ -40,6 +41,12 @@ public static void add(RedisConnectionFactory factory) {
 			throw new UnsupportedOperationException("Cannot track managed resource");
 		}
 
+		if(factory instanceof SmartLifecycle smartLifecycle) {
+			if(!smartLifecycle.isRunning() && smartLifecycle.isAutoStartup()) {
+				smartLifecycle.start();
+			}
+		}
+
 		connFactories.add(factory);
 	}
 
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisAclIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisAclIntegrationTests.java
index 133b6269ab..0fd16e36e5 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisAclIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisAclIntegrationTests.java
@@ -21,19 +21,20 @@
 import java.util.Collections;
 
 import org.junit.jupiter.api.Test;
-
-import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.ConnectionFactoryTracker;
 import org.springframework.data.redis.connection.RedisSentinelConfiguration;
 import org.springframework.data.redis.connection.RedisSentinelConnection;
 import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
 import org.springframework.data.redis.test.condition.EnabledOnRedisAvailable;
 import org.springframework.data.redis.test.condition.EnabledOnRedisSentinelAvailable;
 import org.springframework.data.redis.test.condition.EnabledOnRedisVersion;
+import org.springframework.data.redis.util.ConnectionVerifier;
 
 /**
  * Integration tests for Redis 6 ACL.
  *
  * @author Mark Paluch
+ * @author Christoph Strobl
  */
 @EnabledOnRedisVersion("6.0")
 @EnabledOnRedisAvailable(6382)
@@ -45,15 +46,11 @@ void shouldConnectWithDefaultAuthentication() {
 		RedisStandaloneConfiguration standaloneConfiguration = new RedisStandaloneConfiguration("localhost", 6382);
 		standaloneConfiguration.setPassword("foobared");
 
-		JedisConnectionFactory connectionFactory = new JedisConnectionFactory(standaloneConfiguration);
-		connectionFactory.afterPropertiesSet();
-
-		RedisConnection connection = connectionFactory.getConnection();
-
-		assertThat(connection.ping()).isEqualTo("PONG");
-		connection.close();
-
-		connectionFactory.destroy();
+		ConnectionVerifier.create(new JedisConnectionFactory(standaloneConfiguration)) //
+				.execute(connection -> {
+					assertThat(connection.ping()).isEqualTo("PONG");
+				}) //
+				.verifyAndClose();
 	}
 
 	@Test // DATAREDIS-1046
@@ -63,15 +60,11 @@ void shouldConnectStandaloneWithAclAuthentication() {
 		standaloneConfiguration.setUsername("spring");
 		standaloneConfiguration.setPassword("data");
 
-		JedisConnectionFactory connectionFactory = new JedisConnectionFactory(standaloneConfiguration);
-		connectionFactory.afterPropertiesSet();
-
-		RedisConnection connection = connectionFactory.getConnection();
-
-		assertThat(connection.ping()).isEqualTo("PONG");
-		connection.close();
-
-		connectionFactory.destroy();
+		ConnectionVerifier.create(new JedisConnectionFactory(standaloneConfiguration)) //
+				.execute(connection -> {
+					assertThat(connection.ping()).isEqualTo("PONG");
+				}) //
+				.verifyAndClose();
 	}
 
 	@Test // DATAREDIS-1145
@@ -86,11 +79,11 @@ void shouldConnectSentinelWithAclAuthentication() throws IOException {
 
 		JedisConnectionFactory connectionFactory = new JedisConnectionFactory(sentinelConfiguration);
 		connectionFactory.afterPropertiesSet();
+		ConnectionFactoryTracker.add(connectionFactory);
 
-		RedisSentinelConnection connection = connectionFactory.getSentinelConnection();
-
-		assertThat(connection.masters()).isNotEmpty();
-		connection.close();
+		try (RedisSentinelConnection connection = connectionFactory.getSentinelConnection()) {
+			assertThat(connection.masters()).isNotEmpty();
+		}
 
 		connectionFactory.destroy();
 	}
@@ -102,15 +95,13 @@ void shouldConnectStandaloneWithAclAuthenticationAndPooling() {
 		standaloneConfiguration.setUsername("spring");
 		standaloneConfiguration.setPassword("data");
 
-		JedisConnectionFactory connectionFactory = new JedisConnectionFactory(standaloneConfiguration);
-		connectionFactory.setUsePool(true);
-		connectionFactory.afterPropertiesSet();
-
-		RedisConnection connection = connectionFactory.getConnection();
+		JedisConnectionFactory connectionFactory = new JedisConnectionFactory(standaloneConfiguration,
+				JedisClientConfiguration.builder().usePooling().build());
 
-		assertThat(connection.ping()).isEqualTo("PONG");
-		connection.close();
-
-		connectionFactory.destroy();
+		ConnectionVerifier.create(connectionFactory) //
+				.execute(connection -> {
+					assertThat(connection.ping()).isEqualTo("PONG");
+				}) //
+				.verifyAndClose();
 	}
 }
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactoryIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactoryIntegrationTests.java
index f5e4c94207..61aabedfa4 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactoryIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactoryIntegrationTests.java
@@ -29,6 +29,7 @@
  * Integration tests for {@link JedisConnectionFactory}.
  *
  * @author Mark Paluch
+ * @author Christoph Strobl
  */
 class JedisConnectionFactoryIntegrationTests {
 
@@ -49,8 +50,11 @@ void shouldInitializeWithStandaloneConfiguration() {
 				new RedisStandaloneConfiguration(SettingsUtils.getHost(), SettingsUtils.getPort()),
 				JedisClientConfiguration.defaultConfiguration());
 		factory.afterPropertiesSet();
+		factory.start();
 
-		assertThat(factory.getConnection().ping()).isEqualTo("PONG");
+		try (RedisConnection connection = factory.getConnection()) {
+			assertThat(connection.ping()).isEqualTo("PONG");
+		}
 	}
 
 	@Test // DATAREDIS-575
@@ -60,9 +64,32 @@ void connectionAppliesClientName() {
 				new RedisStandaloneConfiguration(SettingsUtils.getHost(), SettingsUtils.getPort()),
 				JedisClientConfiguration.builder().clientName("clientName").build());
 		factory.afterPropertiesSet();
+		factory.start();
 
 		RedisConnection connection = factory.getConnection();
 
 		assertThat(connection.getClientName()).isEqualTo("clientName");
 	}
+
+	@Test // GH-2503
+	void startStopStartConnectionFactory() {
+
+		factory = new JedisConnectionFactory(
+				new RedisStandaloneConfiguration(SettingsUtils.getHost(), SettingsUtils.getPort()),
+				JedisClientConfiguration.defaultConfiguration());
+		factory.afterPropertiesSet();
+
+		factory.start();
+		assertThat(factory.isRunning()).isTrue();
+
+		factory.stop();
+		assertThat(factory.isRunning()).isFalse();
+		assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> factory.getConnection());
+
+		factory.start();
+		assertThat(factory.isRunning()).isTrue();
+		try (RedisConnection connection = factory.getConnection()) {
+			assertThat(connection.ping()).isEqualTo("PONG");
+		}
+	}
 }
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactorySentinelIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactorySentinelIntegrationTests.java
index ba6be53c4e..f9a8d7b4a0 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactorySentinelIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactorySentinelIntegrationTests.java
@@ -21,7 +21,6 @@
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
-
 import org.springframework.data.redis.connection.RedisConnection;
 import org.springframework.data.redis.connection.RedisSentinelConfiguration;
 import org.springframework.data.redis.connection.RedisSentinelConnection;
@@ -60,14 +59,16 @@ void shouldConnectDataNodeCorrectly() {
 
 		factory = new JedisConnectionFactory(configuration);
 		factory.afterPropertiesSet();
+		factory.start();
+
+		try (RedisConnection connection = factory.getConnection()) {
 
-		RedisConnection connection = factory.getConnection();
-		connection.flushAll();
-		connection.set("key5".getBytes(), "value5".getBytes());
+			connection.serverCommands().flushAll();
+			connection.stringCommands().set("key5".getBytes(), "value5".getBytes());
 
-		connection.select(0);
-		assertThat(connection.exists("key5".getBytes())).isFalse();
-		connection.close();
+			connection.select(0);
+			assertThat(connection.keyCommands().exists("key5".getBytes())).isFalse();
+		}
 	}
 
 	@Test // GH-2103
@@ -79,10 +80,11 @@ void shouldConnectSentinelNodeCorrectly() throws IOException {
 
 		factory = new JedisConnectionFactory(configuration);
 		factory.afterPropertiesSet();
+		factory.start();
 
-		RedisSentinelConnection sentinelConnection = factory.getSentinelConnection();
-		assertThat(sentinelConnection.masters()).isNotNull();
-		sentinelConnection.close();
+		try (RedisSentinelConnection sentinelConnection = factory.getSentinelConnection()) {
+			assertThat(sentinelConnection.masters()).isNotNull();
+		}
 	}
 
 	@Test // DATAREDIS-574, DATAREDIS-765
@@ -94,11 +96,13 @@ void shouldInitializeWithSentinelConfiguration() {
 
 		factory = new JedisConnectionFactory(SENTINEL_CONFIG, clientConfiguration);
 		factory.afterPropertiesSet();
+		factory.start();
 
-		RedisConnection connection = factory.getConnection();
+		try (RedisConnection connection = factory.getConnection()) {
 
-		assertThat(factory.getUsePool()).isTrue();
-		assertThat(connection.getClientName()).isEqualTo("clientName");
+			assertThat(factory.getUsePool()).isTrue();
+			assertThat(connection.getClientName()).isEqualTo("clientName");
+		}
 	}
 
 	@Test // DATAREDIS-324
@@ -106,8 +110,11 @@ void shouldSendCommandCorrectlyViaConnectionFactoryUsingSentinel() {
 
 		factory = new JedisConnectionFactory(SENTINEL_CONFIG);
 		factory.afterPropertiesSet();
+		factory.start();
 
-		assertThat(factory.getConnection().ping()).isEqualTo("PONG");
+		try (RedisConnection connection = factory.getConnection()) {
+			assertThat(connection.ping()).isEqualTo("PONG");
+		}
 	}
 
 	@Test // DATAREDIS-552
@@ -116,18 +123,25 @@ void getClientNameShouldEqualWithFactorySetting() {
 		factory = new JedisConnectionFactory(SENTINEL_CONFIG);
 		factory.setClientName("clientName");
 		factory.afterPropertiesSet();
+		factory.start();
 
-		assertThat(factory.getConnection().getClientName()).isEqualTo("clientName");
+		try (RedisConnection connection = factory.getConnection()) {
+			assertThat(connection.serverCommands().getClientName()).isEqualTo("clientName");
+		}
 	}
 
 	@Test // DATAREDIS-1127
-	void shouldNotFailOnFirstSentinelDown() {
+	void shouldNotFailOnFirstSentinelDown() throws IOException {
 
 		RedisSentinelConfiguration oneDownSentinelConfig = new RedisSentinelConfiguration().master("mymaster")
 				.sentinel("127.0.0.1", 1).sentinel("127.0.0.1", 26379);
 
 		factory = new JedisConnectionFactory(oneDownSentinelConfig);
 		factory.afterPropertiesSet();
-		assertThat(factory.getSentinelConnection().isOpen()).isTrue();
+		factory.start();
+
+		try (RedisSentinelConnection sentinelConnection = factory.getSentinelConnection()) {
+			assertThat(sentinelConnection.isOpen()).isTrue();
+		}
 	}
 }
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactoryUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactoryUnitTests.java
index f5b558bb72..1fdbe738a0 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactoryUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionFactoryUnitTests.java
@@ -26,6 +26,7 @@
 import java.security.NoSuchAlgorithmException;
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLContext;
@@ -39,6 +40,7 @@
 import org.springframework.data.redis.connection.RedisPassword;
 import org.springframework.data.redis.connection.RedisSentinelConfiguration;
 import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory.State;
 import org.springframework.test.util.ReflectionTestUtils;
 
 /**
@@ -62,6 +64,7 @@ void shouldInitSentinelPoolWhenSentinelConfigPresent() {
 
 		connectionFactory = initSpyedConnectionFactory(SINGLE_SENTINEL_CONFIG, new JedisPoolConfig());
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
 		verify(connectionFactory, times(1)).createRedisSentinelPool(eq(SINGLE_SENTINEL_CONFIG));
 		verify(connectionFactory, never()).createRedisPool();
@@ -72,6 +75,7 @@ void shouldInitJedisPoolWhenNoSentinelConfigPresent() {
 
 		connectionFactory = initSpyedConnectionFactory((RedisSentinelConfiguration) null, new JedisPoolConfig());
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
 		verify(connectionFactory, times(1)).createRedisPool();
 		verify(connectionFactory, never()).createRedisSentinelPool(any(RedisSentinelConfiguration.class));
@@ -90,6 +94,7 @@ void shouldInitConnectionCorrectlyWhenClusterConfigPresent() {
 
 		connectionFactory = initSpyedConnectionFactory(CLUSTER_CONFIG, new JedisPoolConfig());
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
 		verify(connectionFactory, times(1)).createCluster(eq(CLUSTER_CONFIG), any(GenericObjectPoolConfig.class));
 		verify(connectionFactory, never()).createRedisPool();
@@ -101,6 +106,7 @@ void shouldCloseClusterCorrectlyOnFactoryDestruction() throws IOException {
 		JedisCluster clusterMock = mock(JedisCluster.class);
 		JedisConnectionFactory factory = new JedisConnectionFactory();
 		ReflectionTestUtils.setField(factory, "cluster", clusterMock);
+		ReflectionTestUtils.setField(factory, "state", new AtomicReference(State.STARTED));
 
 		factory.destroy();
 
@@ -321,6 +327,16 @@ void getConnectionShouldFailIfNotInitialized() {
 		assertThatIllegalStateException().isThrownBy(connectionFactory::getSentinelConnection);
 	}
 
+	@Test // GH-2503
+	void afterPropertiesSetDoesNotTriggerConnectionInitialization() {
+
+		JedisConnectionFactory connectionFactory = new JedisConnectionFactory();
+		connectionFactory.afterPropertiesSet();
+
+		assertThat(connectionFactory.isRunning()).isFalse();
+		assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> connectionFactory.getConnection());
+	}
+
 	private JedisConnectionFactory initSpyedConnectionFactory(RedisSentinelConfiguration sentinelConfig,
 			JedisPoolConfig poolConfig) {
 
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionIntegrationTests.java
index 24601dfa84..b531db79ed 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionIntegrationTests.java
@@ -32,7 +32,6 @@
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-
 import org.springframework.dao.InvalidDataAccessApiUsageException;
 import org.springframework.data.redis.RedisConnectionFailureException;
 import org.springframework.data.redis.SettingsUtils;
@@ -46,6 +45,7 @@
 import org.springframework.data.redis.connection.ReturnType;
 import org.springframework.data.redis.connection.StringRedisConnection.StringTuple;
 import org.springframework.data.redis.test.condition.EnabledOnRedisSentinelAvailable;
+import org.springframework.data.redis.util.ConnectionVerifier;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 import org.springframework.test.util.ReflectionTestUtils;
@@ -99,12 +99,13 @@ public void testEvalShaArrayBytes() {
 
 	@Test
 	void testCreateConnectionWithDb() {
+
 		JedisConnectionFactory factory2 = new JedisConnectionFactory();
 		factory2.setDatabase(1);
-		factory2.afterPropertiesSet();
-		// No way to really verify we are in the selected DB
-		factory2.getConnection().ping();
-		factory2.destroy();
+
+		ConnectionVerifier.create(factory2) //
+				.execute(RedisConnection::ping) //
+				.verifyAndClose();
 	}
 
 	@Test // DATAREDIS-714
@@ -113,6 +114,7 @@ void testCreateConnectionWithDbFailure() {
 		JedisConnectionFactory factory2 = new JedisConnectionFactory();
 		factory2.setDatabase(77);
 		factory2.afterPropertiesSet();
+		factory2.start();
 
 		try {
 			assertThatExceptionOfType(RedisConnectionFailureException.class).isThrownBy(factory2::getConnection);
@@ -132,11 +134,16 @@ void testClosePool() {
 		factory2.setHostName(SettingsUtils.getHost());
 		factory2.setPort(SettingsUtils.getPort());
 		factory2.afterPropertiesSet();
+		factory2.start();
 
-		RedisConnection conn2 = factory2.getConnection();
-		conn2.close();
-		factory2.getConnection();
-		factory2.destroy();
+		try {
+
+			RedisConnection conn2 = factory2.getConnection();
+			conn2.close();
+			factory2.getConnection();
+		} finally {
+			factory2.destroy();
+		}
 	}
 
 	@Test
@@ -330,15 +337,17 @@ void testPoolNPE() {
 		factory2.setHostName(SettingsUtils.getHost());
 		factory2.setPort(SettingsUtils.getPort());
 		factory2.afterPropertiesSet();
+		factory2.start();
 
-		RedisConnection conn = factory2.getConnection();
-		try {
+		try (RedisConnection conn = factory2.getConnection()) {
 			conn.get(null);
-		} catch (Exception e) {}
-		conn.close();
-		// Make sure we don't end up with broken connection
-		factory2.getConnection().dbSize();
-		factory2.destroy();
+		} catch (Exception e) {
+
+		} finally {
+			// Make sure we don't end up with broken connection
+			factory2.getConnection().dbSize();
+			factory2.destroy();
+		}
 	}
 
 	@Test // GH-2356
@@ -351,19 +360,17 @@ void closeWithFailureShouldReleaseConnection() {
 		factory.setUsePool(true);
 		factory.setHostName(SettingsUtils.getHost());
 		factory.setPort(SettingsUtils.getPort());
-		factory.afterPropertiesSet();
-
-		RedisConnection conn = factory.getConnection();
-
-		JedisSubscription subscriptionMock = mock(JedisSubscription.class);
-		doThrow(new IllegalStateException()).when(subscriptionMock).close();
-		ReflectionTestUtils.setField(conn, "subscription", subscriptionMock);
-
-		conn.close();
 
-		// Make sure we don't end up with broken connection
-		factory.getConnection().dbSize();
-		factory.destroy();
+		ConnectionVerifier.create(factory) //
+				.execute(connection -> {
+					JedisSubscription subscriptionMock = mock(JedisSubscription.class);
+					doThrow(new IllegalStateException()).when(subscriptionMock).close();
+					ReflectionTestUtils.setField(connection, "subscription", subscriptionMock);
+				}) //
+				.verifyAndRun(connectionFactory -> {
+					connectionFactory.getConnection().dbSize();
+					connectionFactory.destroy();
+				});
 	}
 
 	@SuppressWarnings("unchecked")
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionPipelineIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionPipelineIntegrationTests.java
index 74f3139bd8..9eeb5e4c95 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionPipelineIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionPipelineIntegrationTests.java
@@ -23,11 +23,10 @@
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-
 import org.springframework.dao.InvalidDataAccessApiUsageException;
 import org.springframework.data.redis.SettingsUtils;
 import org.springframework.data.redis.connection.AbstractConnectionPipelineIntegrationTests;
-import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.util.ConnectionVerifier;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 
@@ -68,12 +67,13 @@ public void testClosePoolPipelinedDbSelect() {
 		factory2.setHostName(SettingsUtils.getHost());
 		factory2.setPort(SettingsUtils.getPort());
 		factory2.setDatabase(1);
-		factory2.afterPropertiesSet();
-		RedisConnection conn2 = factory2.getConnection();
-		conn2.openPipeline();
-		conn2.close();
-		factory2.getConnection();
-		factory2.destroy();
+
+		ConnectionVerifier.create(factory2) //
+				.execute(conn2 -> conn2.openPipeline()) //
+				.verifyAndRun(connectionFactory -> {
+					connectionFactory.getConnection();
+					connectionFactory.destroy();
+				});
 	}
 
 	// Unsupported Ops
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/extension/JedisConnectionFactoryExtension.java b/src/test/java/org/springframework/data/redis/connection/jedis/extension/JedisConnectionFactoryExtension.java
index bba8ce1fb8..3b7e07cf4f 100644
--- a/src/test/java/org/springframework/data/redis/connection/jedis/extension/JedisConnectionFactoryExtension.java
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/extension/JedisConnectionFactoryExtension.java
@@ -64,6 +64,7 @@ public class JedisConnectionFactoryExtension implements ParameterResolver {
 				CLIENT_CONFIGURATION);
 
 		factory.afterPropertiesSet();
+		factory.start();
 		ShutdownQueue.register(factory);
 
 		return factory;
@@ -75,6 +76,7 @@ public class JedisConnectionFactoryExtension implements ParameterResolver {
 				CLIENT_CONFIGURATION);
 
 		factory.afterPropertiesSet();
+		factory.start();
 		ShutdownQueue.register(factory);
 
 		return factory;
@@ -86,6 +88,7 @@ public class JedisConnectionFactoryExtension implements ParameterResolver {
 				CLIENT_CONFIGURATION);
 
 		factory.afterPropertiesSet();
+		factory.start();
 		ShutdownQueue.register(factory);
 
 		return factory;
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceAclIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceAclIntegrationTests.java
index 612ed09199..be92112ea6 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceAclIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceAclIntegrationTests.java
@@ -24,7 +24,6 @@
 import java.util.Collections;
 
 import org.junit.jupiter.api.Test;
-import org.springframework.data.redis.connection.RedisConnection;
 import org.springframework.data.redis.connection.RedisSentinelConfiguration;
 import org.springframework.data.redis.connection.RedisSentinelConnection;
 import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
@@ -33,6 +32,7 @@
 import org.springframework.data.redis.test.condition.EnabledOnRedisAvailable;
 import org.springframework.data.redis.test.condition.EnabledOnRedisSentinelAvailable;
 import org.springframework.data.redis.test.extension.LettuceTestClientResources;
+import org.springframework.data.redis.util.ConnectionVerifier;
 
 /**
  * Integration tests for Redis 6 ACL.
@@ -50,16 +50,17 @@ void shouldConnectWithDefaultAuthentication() {
 		RedisStandaloneConfiguration standaloneConfiguration = new RedisStandaloneConfiguration("localhost", 6382);
 		standaloneConfiguration.setPassword("foobared");
 
-		LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(standaloneConfiguration);
-		connectionFactory.setClientResources(LettuceTestClientResources.getSharedClientResources());
-		connectionFactory.afterPropertiesSet();
-
-		RedisConnection connection = connectionFactory.getConnection();
+		LettuceClientConfiguration clientConfiguration = LettuceClientConfiguration.builder()
+				.clientResources(LettuceTestClientResources.getSharedClientResources()).build();
 
-		assertThat(connection.ping()).isEqualTo("PONG");
-		connection.close();
+		LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(standaloneConfiguration,
+				clientConfiguration);
 
-		connectionFactory.destroy();
+		ConnectionVerifier.create(connectionFactory) //
+				.execute(connection -> {
+					assertThat(connection.ping()).isEqualTo("PONG");
+				}) //
+				.verifyAndClose();
 	}
 
 	@Test // DATAREDIS-1046
@@ -69,16 +70,17 @@ void shouldConnectStandaloneWithAclAuthentication() {
 		standaloneConfiguration.setUsername("spring");
 		standaloneConfiguration.setPassword("data");
 
-		LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(standaloneConfiguration);
-		connectionFactory.setClientResources(LettuceTestClientResources.getSharedClientResources());
-		connectionFactory.afterPropertiesSet();
-
-		RedisConnection connection = connectionFactory.getConnection();
+		LettuceClientConfiguration clientConfiguration = LettuceClientConfiguration.builder()
+				.clientResources(LettuceTestClientResources.getSharedClientResources()).build();
 
-		assertThat(connection.ping()).isEqualTo("PONG");
-		connection.close();
+		LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(standaloneConfiguration,
+				clientConfiguration);
 
-		connectionFactory.destroy();
+		ConnectionVerifier.create(connectionFactory) //
+				.execute(connection -> {
+					assertThat(connection.ping()).isEqualTo("PONG");
+				}) //
+				.verifyAndClose();
 	}
 
 	@Test // DATAREDIS-1145
@@ -96,13 +98,13 @@ void shouldConnectSentinelWithAuthentication() throws IOException {
 
 		LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(sentinelConfiguration, configuration);
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
-		RedisSentinelConnection connection = connectionFactory.getSentinelConnection();
-
-		assertThat(connection.masters()).isNotEmpty();
-		connection.close();
-
-		connectionFactory.destroy();
+		try (RedisSentinelConnection connection = connectionFactory.getSentinelConnection()) {
+			assertThat(connection.masters()).isNotEmpty();
+		} finally {
+			connectionFactory.destroy();
+		}
 	}
 
 	@Test // DATAREDIS-1046
@@ -113,15 +115,16 @@ void shouldConnectMasterReplicaWithAclAuthentication() {
 		masterReplicaConfiguration.setUsername("spring");
 		masterReplicaConfiguration.setPassword("data");
 
-		LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(masterReplicaConfiguration);
-		connectionFactory.setClientResources(LettuceTestClientResources.getSharedClientResources());
-		connectionFactory.afterPropertiesSet();
-
-		RedisConnection connection = connectionFactory.getConnection();
+		LettuceClientConfiguration clientConfiguration = LettuceClientConfiguration.builder()
+				.clientResources(LettuceTestClientResources.getSharedClientResources()).build();
 
-		assertThat(connection.ping()).isEqualTo("PONG");
-		connection.close();
+		LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory(masterReplicaConfiguration,
+				clientConfiguration);
 
-		connectionFactory.destroy();
+		ConnectionVerifier.create(connectionFactory) //
+				.execute(connection -> {
+					assertThat(connection.ping()).isEqualTo("PONG");
+				}) //
+				.verifyAndClose();
 	}
 }
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionTests.java
index 59f5bc608a..4b6b50d3e3 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionTests.java
@@ -67,6 +67,7 @@
 import org.springframework.data.redis.test.extension.LettuceExtension;
 import org.springframework.data.redis.test.extension.LettuceTestClientResources;
 import org.springframework.data.redis.test.util.HexStringUtils;
+import org.springframework.data.redis.util.ConnectionVerifier;
 
 /**
  * @author Christoph Strobl
@@ -156,14 +157,11 @@ private static LettuceConnectionFactory createConnectionFactory() {
 	void shouldCreateConnectionWithPooling() {
 
 		LettuceConnectionFactory factory = createConnectionFactory();
-		factory.afterPropertiesSet();
-
-		RedisConnection connection = factory.getConnection();
-
-		assertThat(connection.ping()).isEqualTo("PONG");
-		connection.close();
 
-		factory.destroy();
+		ConnectionVerifier.create(factory) //
+				.execute(connection -> {
+					assertThat(connection.ping()).isEqualTo("PONG");
+				}).verifyAndClose();
 	}
 
 	@Test // DATAREDIS-775
@@ -171,13 +169,17 @@ void shouldCreateClusterConnectionWithPooling() {
 
 		LettuceConnectionFactory factory = createConnectionFactory();
 		factory.afterPropertiesSet();
+		try {
 
-		RedisClusterConnection clusterConnection = factory.getClusterConnection();
+			factory.start();
+			RedisClusterConnection clusterConnection = factory.getClusterConnection();
 
-		assertThat(clusterConnection.ping(ClusterTestVariables.CLUSTER_NODE_1)).isEqualTo("PONG");
-		clusterConnection.close();
+			assertThat(clusterConnection.ping(ClusterTestVariables.CLUSTER_NODE_1)).isEqualTo("PONG");
+			clusterConnection.close();
+		} finally {
+			factory.destroy();
+		}
 
-		factory.destroy();
 	}
 
 	@Test // DATAREDIS-315
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java
index 035153ab02..ed3c4d31ed 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyspaceNotificationsTests.java
@@ -68,6 +68,7 @@ static void beforeAll() throws Exception {
 		factory = new CustomLettuceConnectionFactory(SettingsUtils.clusterConfiguration());
 		factory.setClientResources(LettuceTestClientResources.getSharedClientResources());
 		factory.afterPropertiesSet();
+		factory.start();
 	}
 
 	@BeforeEach
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java
index 1be106f79b..3f5d348408 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryTests.java
@@ -71,6 +71,7 @@ void setUp() {
 		factory.setClientResources(LettuceTestClientResources.getSharedClientResources());
 		factory.afterPropertiesSet();
 		factory.setShutdownTimeout(0);
+		factory.start();
 		connection = new DefaultStringRedisConnection(factory.getConnection());
 	}
 
@@ -212,7 +213,7 @@ private static LettuceConnectionFactory newConnectionFactory(Consumer<LettuceCon
 		connectionFactory.setClientResources(LettuceTestClientResources.getSharedClientResources());
 		connectionFactory.setShutdownTimeout(0);
 		customizer.accept(connectionFactory);
-		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
 		return connectionFactory;
 	}
@@ -264,9 +265,9 @@ void testResetAndInitConnection() {
 
 	@Test
 	void testGetConnectionException() {
-		factory.resetConnection();
+		factory.stop();
 		factory.setHostName("fakeHost");
-		factory.afterPropertiesSet();
+		factory.start();
 		try {
 			factory.getConnection();
 			fail("Expected connection failure exception");
@@ -277,7 +278,7 @@ void testGetConnectionException() {
 	void testGetConnectionNotSharedBadHostname() {
 		factory.setShareNativeConnection(false);
 		factory.setHostName("fakeHost");
-		factory.afterPropertiesSet();
+		factory.start();
 		factory.getConnection();
 	}
 
@@ -285,7 +286,7 @@ void testGetConnectionNotSharedBadHostname() {
 	void testGetSharedConnectionNotShared() {
 		factory.setShareNativeConnection(false);
 		factory.setHostName("fakeHost");
-		factory.afterPropertiesSet();
+		factory.start();
 		assertThat(factory.getSharedConnection()).isNull();
 	}
 
@@ -295,7 +296,7 @@ void dbIndexShouldBePropagatedCorrectly() {
 		LettuceConnectionFactory factory = new LettuceConnectionFactory();
 		factory.setClientResources(LettuceTestClientResources.getSharedClientResources());
 		factory.setDatabase(2);
-		factory.afterPropertiesSet();
+		factory.start();
 
 		ConnectionFactoryTracker.add(factory);
 
@@ -319,7 +320,7 @@ void factoryWorksWithoutClientResources() {
 
 		LettuceConnectionFactory factory = new LettuceConnectionFactory();
 		factory.setShutdownTimeout(0);
-		factory.afterPropertiesSet();
+		factory.start();
 
 		ConnectionFactoryTracker.add(factory);
 
@@ -337,7 +338,7 @@ void factoryShouldReturnReactiveConnectionWhenCorrectly() {
 
 		LettuceConnectionFactory factory = new LettuceConnectionFactory();
 		factory.setClientResources(LettuceTestClientResources.getSharedClientResources());
-		factory.afterPropertiesSet();
+		factory.start();
 
 		ConnectionFactoryTracker.add(factory);
 
@@ -355,7 +356,7 @@ void factoryCreatesPooledConnections() {
 
 		LettuceConnectionFactory factory = new LettuceConnectionFactory(new RedisStandaloneConfiguration(), configuration);
 		factory.setShareNativeConnection(false);
-		factory.afterPropertiesSet();
+		factory.start();
 
 		ConnectionFactoryTracker.add(factory);
 
@@ -385,7 +386,7 @@ void connectsThroughRedisSocket() {
 
 		LettuceConnectionFactory factory = new LettuceConnectionFactory(SettingsUtils.socketConfiguration(), configuration);
 		factory.setShareNativeConnection(false);
-		factory.afterPropertiesSet();
+		factory.start();
 
 		RedisConnection connection = factory.getConnection();
 		assertThat(connection.ping()).isEqualTo("PONG");
@@ -407,7 +408,7 @@ void factoryUsesElastiCacheMasterReplicaConnections() {
 				SettingsUtils.getHost()).node(SettingsUtils.getHost(), SettingsUtils.getPort() + 1);
 
 		LettuceConnectionFactory factory = new LettuceConnectionFactory(elastiCache, configuration);
-		factory.afterPropertiesSet();
+		factory.start();
 
 		RedisConnection connection = factory.getConnection();
 
@@ -432,7 +433,7 @@ void pubSubDoesNotSupportMasterReplicaConnections() {
 
 		LettuceConnectionFactory factory = new LettuceConnectionFactory(elastiCache,
 				LettuceTestClientConfiguration.defaultConfiguration());
-		factory.afterPropertiesSet();
+		factory.start();
 
 		RedisConnection connection = factory.getConnection();
 
@@ -456,7 +457,7 @@ void factoryUsesElastiCacheMasterWithoutMaster() {
 				SettingsUtils.getHost(), SettingsUtils.getPort() + 1);
 
 		LettuceConnectionFactory factory = new LettuceConnectionFactory(elastiCache, configuration);
-		factory.afterPropertiesSet();
+		factory.start();
 
 		RedisConnection connection = factory.getConnection();
 
@@ -484,7 +485,7 @@ void factoryUsesMasterReplicaConnections() {
 
 		LettuceConnectionFactory factory = new LettuceConnectionFactory(SettingsUtils.standaloneConfiguration(),
 				configuration);
-		factory.afterPropertiesSet();
+		factory.start();
 
 		RedisConnection connection = factory.getConnection();
 
@@ -506,7 +507,7 @@ void connectionAppliesClientName() {
 
 		LettuceConnectionFactory factory = new LettuceConnectionFactory(new RedisStandaloneConfiguration(), configuration);
 		factory.setShareNativeConnection(false);
-		factory.afterPropertiesSet();
+		factory.start();
 
 		ConnectionFactoryTracker.add(factory);
 
@@ -522,7 +523,7 @@ void getClientNameShouldEqualWithFactorySetting() {
 		LettuceConnectionFactory factory = new LettuceConnectionFactory(new RedisStandaloneConfiguration());
 		factory.setClientResources(LettuceTestClientResources.getSharedClientResources());
 		factory.setClientName("clientName");
-		factory.afterPropertiesSet();
+		factory.start();
 
 		ConnectionFactoryTracker.add(factory);
 
@@ -542,7 +543,7 @@ void shouldInitializeMasterReplicaConnectionsEagerly() {
 
 		LettuceConnectionFactory factory = new LettuceConnectionFactory(elastiCache, configuration);
 		factory.setEagerInitialization(true);
-		factory.afterPropertiesSet();
+		factory.start();
 
 		assertThat(factory.getSharedConnection()).isNotNull();
 		assertThat(factory.getSharedClusterConnection()).isNull();
@@ -560,7 +561,7 @@ void shouldInitializeClusterConnectionsEagerly() {
 		LettuceConnectionFactory factory = new LettuceConnectionFactory(SettingsUtils.clusterConfiguration(),
 				configuration);
 		factory.setEagerInitialization(true);
-		factory.afterPropertiesSet();
+		factory.start();
 
 		assertThat(factory.getSharedConnection()).isNull();
 		assertThat(factory.getSharedClusterConnection()).isNotNull();
@@ -569,4 +570,17 @@ void shouldInitializeClusterConnectionsEagerly() {
 		factory.destroy();
 	}
 
+	@Test // GH-2503
+	void startStopStartConnectionFactory() {
+
+		assertThat(factory.isRunning()).isTrue();
+		factory.stop();
+		assertThat(factory.isRunning()).isFalse();
+		assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> factory.getConnection());
+		factory.start();
+		assertThat(factory.isRunning()).isTrue();
+		try (RedisConnection connection = factory.getConnection()) {
+			assertThat(connection.ping()).isEqualTo("PONG");
+		}
+	}
 }
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryUnitTests.java
index 1c68c5d3ad..209e7cd578 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactoryUnitTests.java
@@ -412,6 +412,7 @@ void verifyPeerOptionShouldBeSetCorrectlyOnClient() {
 				LettuceTestClientConfiguration.builder().useSsl().disablePeerVerification().build());
 		connectionFactory.afterPropertiesSet();
 		ConnectionFactoryTracker.add(connectionFactory);
+		connectionFactory.start();
 
 		AbstractRedisClient client = (AbstractRedisClient) getField(connectionFactory, "client");
 		assertThat(client).isInstanceOf(RedisClient.class);
@@ -429,6 +430,7 @@ void startTLSOptionShouldBeSetCorrectlyOnClient() {
 				LettuceTestClientConfiguration.builder().useSsl().startTls().build());
 		connectionFactory.afterPropertiesSet();
 		ConnectionFactoryTracker.add(connectionFactory);
+		connectionFactory.start();
 
 		AbstractRedisClient client = (AbstractRedisClient) getField(connectionFactory, "client");
 		assertThat(client).isInstanceOf(RedisClient.class);
@@ -867,6 +869,7 @@ protected AbstractRedisClient createClient() {
 		};
 
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
 		new DirectFieldAccessor(connectionFactory).setPropertyValue("client", clientMock);
 
@@ -898,6 +901,7 @@ protected AbstractRedisClient createClient() {
 
 		connectionFactory.setValidateConnection(true);
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
 		try (RedisConnection connection = connectionFactory.getConnection()) {
 			connection.ping();
@@ -927,6 +931,7 @@ protected AbstractRedisClient createClient() {
 
 		connectionFactory.setValidateConnection(true);
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
 		connectionFactory.getConnection().close();
 
@@ -954,6 +959,7 @@ protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClie
 		connectionFactory.setEagerInitialization(true);
 
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
 		verify(connectionProviderMock, times(2)).getConnection(StatefulConnection.class);
 	}
@@ -974,6 +980,7 @@ protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClie
 		};
 		connectionFactory.setClientResources(LettuceTestClientResources.getSharedClientResources());
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
 		assertThatExceptionOfType(RedisConnectionFailureException.class)
 				.isThrownBy(() -> connectionFactory.getConnection().ping()).withCauseInstanceOf(PoolException.class);
@@ -994,6 +1001,7 @@ protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClie
 
 		connectionFactory.setClientResources(getSharedClientResources());
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 		connectionFactory.destroy();
 
 		verify((DisposableBean) connectionProviderMock, times(2)).destroy();
@@ -1079,6 +1087,7 @@ protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClie
 		};
 		connectionFactory.setClientResources(getSharedClientResources());
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
 		LettuceReactiveRedisConnection reactiveConnection = connectionFactory.getReactiveConnection();
 
@@ -1091,12 +1100,14 @@ void getNativeClientShouldReturnClient() {
 		LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory();
 		connectionFactory.setClientResources(getSharedClientResources());
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
 		assertThat(connectionFactory.getNativeClient()).isInstanceOf(RedisClient.class);
 
 		connectionFactory = new LettuceConnectionFactory(clusterConfig);
 		connectionFactory.setClientResources(getSharedClientResources());
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
 		assertThat(connectionFactory.getRequiredNativeClient()).isInstanceOf(RedisClusterClient.class);
 	}
@@ -1107,7 +1118,7 @@ void getNativeClientShouldFailIfNotInitialized() {
 		LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory();
 
 		assertThatIllegalStateException().isThrownBy(connectionFactory::getRequiredNativeClient)
-				.withMessageContaining("was not initialized through");
+				.withMessageContaining("Use start() to initialize");
 	}
 
 	@Test // GH-2057
@@ -1226,6 +1237,16 @@ void createFullRedisSentinelConfiguration() {
 		assertThat(configuration).isEqualTo(expected);
 	}
 
+	@Test // GH-2503
+	void afterPropertiesSetDoesNotTriggerConnectionInitialization() {
+
+		LettuceConnectionFactory connectionFactory = new LettuceConnectionFactory();
+		connectionFactory.afterPropertiesSet();
+
+		assertThat(connectionFactory.isRunning()).isFalse();
+		assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> connectionFactory.getConnection());
+	}
+
 	static class CustomRedisConfiguration implements RedisConfiguration, WithHostAndPort {
 
 		private String hostName;
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java
index b18e5283ee..360178415a 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionIntegrationTests.java
@@ -125,6 +125,7 @@ void testCloseNonPooledConnectionNotShared() {
 		factory2.setShutdownTimeout(0);
 		factory2.setShareNativeConnection(false);
 		factory2.afterPropertiesSet();
+		factory2.start();
 		RedisConnection connection = factory2.getConnection();
 		// Use the connection to make sure the channel is initialized, else nothing happens on close
 		connection.ping();
@@ -134,7 +135,10 @@ void testCloseNonPooledConnectionNotShared() {
 			connection.set("foo".getBytes(), "bar".getBytes());
 			fail("Exception should be thrown trying to use a closed connection");
 		} catch (RedisSystemException e) {}
+		finally {
+
 		factory2.destroy();
+		}
 	}
 
 	@Test
@@ -153,6 +157,7 @@ public void testMove() {
 		factory2.setShutdownTimeout(0);
 		factory2.setDatabase(1);
 		factory2.afterPropertiesSet();
+		factory2.start();
 		StringRedisConnection conn2 = new DefaultStringRedisConnection(factory2.getConnection());
 		try {
 			assertThat(conn2.get("foo")).isEqualTo("bar");
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionPipelineIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionPipelineIntegrationTests.java
index 8fc9e52cc0..aca02152c5 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionPipelineIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionPipelineIntegrationTests.java
@@ -57,6 +57,7 @@ public void testMove() {
 				LettuceTestClientConfiguration.builder().build());
 		factory2.setDatabase(1);
 		factory2.afterPropertiesSet();
+		factory2.start();
 		StringRedisConnection conn2 = new DefaultStringRedisConnection(factory2.getConnection());
 		try {
 			assertThat(conn2.get("foo")).isEqualTo("bar");
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionTransactionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionTransactionIntegrationTests.java
index 331212e696..13f88fe454 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionTransactionIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionTransactionIntegrationTests.java
@@ -55,6 +55,8 @@ public void testMove() {
 		factory2.setShutdownTimeout(0);
 		factory2.setDatabase(1);
 		factory2.afterPropertiesSet();
+		factory2.start();
+
 		StringRedisConnection conn2 = new DefaultStringRedisConnection(factory2.getConnection());
 		try {
 			assertThat(conn2.get("foo")).isEqualTo("bar");
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSentinelIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSentinelIntegrationTests.java
index 5391381699..562a902244 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSentinelIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceSentinelIntegrationTests.java
@@ -120,14 +120,19 @@ void shouldUseSpecifiedDatabase() {
 		connectionFactory.setShareNativeConnection(false);
 		connectionFactory.setDatabase(5);
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
+
+		try(RedisConnection directConnection = connectionFactory.getConnection()) {
+
+			assertThat(directConnection.exists("foo".getBytes())).isFalse();
+			directConnection.select(0);
+
+			assertThat(directConnection.exists("foo".getBytes())).isTrue();
+		} finally {
+			connectionFactory.destroy();
+		}
 
-		RedisConnection directConnection = connectionFactory.getConnection();
-		assertThat(directConnection.exists("foo".getBytes())).isFalse();
-		directConnection.select(0);
 
-		assertThat(directConnection.exists("foo".getBytes())).isTrue();
-		directConnection.close();
-		connectionFactory.destroy();
 	}
 
 	@Test // DATAREDIS-973
@@ -144,16 +149,19 @@ void reactiveShouldUseSpecifiedDatabase() {
 		connectionFactory.setShareNativeConnection(false);
 		connectionFactory.setDatabase(5);
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
-		LettuceReactiveRedisConnection reactiveConnection = connectionFactory.getReactiveConnection();
+		try(LettuceReactiveRedisConnection reactiveConnection = connectionFactory.getReactiveConnection()) {
 
-		reactiveConnection.keyCommands().exists(ByteBuffer.wrap("foo".getBytes())) //
-				.as(StepVerifier::create) //
-				.expectNext(false) //
-				.verifyComplete();
+			reactiveConnection.keyCommands().exists(ByteBuffer.wrap("foo".getBytes())) //
+					.as(StepVerifier::create) //
+					.expectNext(false) //
+					.verifyComplete();
+
+		} finally {
+			connectionFactory.destroy();
+		}
 
-		reactiveConnection.close();
-		connectionFactory.destroy();
 	}
 
 	@Test
@@ -253,16 +261,14 @@ void factoryUsesMasterReplicaConnections() {
 
 		LettuceConnectionFactory factory = new LettuceConnectionFactory(SENTINEL_CONFIG, configuration);
 		factory.afterPropertiesSet();
+		factory.start();
 
-		RedisConnection connection = factory.getConnection();
+		try(RedisConnection connection = factory.getConnection()) {
 
-		try {
 			assertThat(connection.ping()).isEqualTo("PONG");
 			assertThat(connection.info().getProperty("role")).isEqualTo("slave");
 		} finally {
-			connection.close();
+			factory.destroy();
 		}
-
-		factory.destroy();
 	}
 }
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/extension/LettuceConnectionFactoryExtension.java b/src/test/java/org/springframework/data/redis/connection/lettuce/extension/LettuceConnectionFactoryExtension.java
index b11dfed222..ff9e0be5de 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/extension/LettuceConnectionFactoryExtension.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/extension/LettuceConnectionFactoryExtension.java
@@ -66,6 +66,8 @@ public class LettuceConnectionFactoryExtension implements ParameterResolver {
 		ManagedLettuceConnectionFactory factory = new ManagedLettuceConnectionFactory(
 				SettingsUtils.standaloneConfiguration(), configuration);
 		factory.afterPropertiesSet();
+		factory.start();
+
 		ShutdownQueue.register(factory);
 
 		return factory;
@@ -78,6 +80,7 @@ public class LettuceConnectionFactoryExtension implements ParameterResolver {
 		ManagedLettuceConnectionFactory factory = new ManagedLettuceConnectionFactory(SettingsUtils.sentinelConfiguration(),
 				configuration);
 		factory.afterPropertiesSet();
+		factory.start();
 		ShutdownQueue.register(factory);
 
 		return factory;
@@ -90,6 +93,7 @@ public class LettuceConnectionFactoryExtension implements ParameterResolver {
 		ManagedLettuceConnectionFactory factory = new ManagedLettuceConnectionFactory(SettingsUtils.clusterConfiguration(),
 				configuration);
 		factory.afterPropertiesSet();
+		factory.start();
 		ShutdownQueue.register(factory);
 
 		return factory;
@@ -102,6 +106,7 @@ public class LettuceConnectionFactoryExtension implements ParameterResolver {
 		ManagedLettuceConnectionFactory factory = new ManagedLettuceConnectionFactory(
 				SettingsUtils.standaloneConfiguration(), configuration);
 		factory.afterPropertiesSet();
+		factory.start();
 		ShutdownQueue.register(factory);
 
 		return factory;
@@ -114,6 +119,7 @@ public class LettuceConnectionFactoryExtension implements ParameterResolver {
 		ManagedLettuceConnectionFactory factory = new ManagedLettuceConnectionFactory(SettingsUtils.sentinelConfiguration(),
 				configuration);
 		factory.afterPropertiesSet();
+		factory.start();
 		ShutdownQueue.register(factory);
 
 		return factory;
@@ -126,6 +132,7 @@ public class LettuceConnectionFactoryExtension implements ParameterResolver {
 		ManagedLettuceConnectionFactory factory = new ManagedLettuceConnectionFactory(SettingsUtils.clusterConfiguration(),
 				configuration);
 		factory.afterPropertiesSet();
+		factory.start();
 		ShutdownQueue.register(factory);
 
 		return factory;
diff --git a/src/test/java/org/springframework/data/redis/core/AbstractOperationsTestParams.java b/src/test/java/org/springframework/data/redis/core/AbstractOperationsTestParams.java
index 123ae9054d..f3bc58c96f 100644
--- a/src/test/java/org/springframework/data/redis/core/AbstractOperationsTestParams.java
+++ b/src/test/java/org/springframework/data/redis/core/AbstractOperationsTestParams.java
@@ -67,7 +67,6 @@ public static Collection<Object[]> testParams(RedisConnectionFactory connectionF
 		ObjectFactory<byte[]> rawFactory = new RawObjectFactory();
 		ObjectFactory<Person> personFactory = new PersonObjectFactory();
 
-
 		RedisTemplate<String, String> stringTemplate = new StringRedisTemplate();
 		stringTemplate.setConnectionFactory(connectionFactory);
 		stringTemplate.afterPropertiesSet();
diff --git a/src/test/java/org/springframework/data/redis/core/DefaultReactiveHashOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultReactiveHashOperationsIntegrationTests.java
index 6d42fc64d8..b0aa6c085f 100644
--- a/src/test/java/org/springframework/data/redis/core/DefaultReactiveHashOperationsIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/core/DefaultReactiveHashOperationsIntegrationTests.java
@@ -82,6 +82,7 @@ public static Collection<Object[]> testParams() {
 		lettuceConnectionFactory.setPort(SettingsUtils.getPort());
 		lettuceConnectionFactory.setHostName(SettingsUtils.getHost());
 		lettuceConnectionFactory.afterPropertiesSet();
+		lettuceConnectionFactory.start();
 
 		RedisSerializationContext<String, String> serializationContext = RedisSerializationContext
 				.fromSerializer(StringRedisSerializer.UTF_8);
diff --git a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerFailureIntegrationTests.java b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerFailureIntegrationTests.java
index 72931fff5d..f2df685c88 100644
--- a/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerFailureIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerFailureIntegrationTests.java
@@ -66,6 +66,7 @@ void setUp() {
 
 		connectionFactory = new JedisConnectionFactory(configuration);
 		connectionFactory.afterPropertiesSet();
+		connectionFactory.start();
 
 		container = new RedisMessageListenerContainer();
 		container.setConnectionFactory(connectionFactory);
diff --git a/src/test/java/org/springframework/data/redis/repository/RedisRepositoryClusterIntegrationTests.java b/src/test/java/org/springframework/data/redis/repository/RedisRepositoryClusterIntegrationTests.java
index 40c2b3a3ef..2bf3b60b47 100644
--- a/src/test/java/org/springframework/data/redis/repository/RedisRepositoryClusterIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/repository/RedisRepositoryClusterIntegrationTests.java
@@ -27,10 +27,12 @@
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.FilterType;
 import org.springframework.data.redis.connection.RedisClusterConfiguration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
 import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
 import org.springframework.data.redis.test.condition.EnabledOnRedisClusterAvailable;
+import org.springframework.lang.NonNullApi;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 
@@ -54,17 +56,19 @@ class RedisRepositoryClusterIntegrationTests extends RedisRepositoryIntegrationT
 	static class Config {
 
 		@Bean
-		RedisTemplate<?, ?> redisTemplate() {
-
-			RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration(CLUSTER_NODES);
-			JedisConnectionFactory connectionFactory = new JedisConnectionFactory(clusterConfig);
-
-			connectionFactory.afterPropertiesSet();
+		RedisTemplate<?, ?> redisTemplate(RedisConnectionFactory connectionFactory) {
 
 			RedisTemplate<byte[], byte[]> template = new RedisTemplate<>();
 			template.setConnectionFactory(connectionFactory);
 
 			return template;
 		}
+
+		@Bean
+		RedisConnectionFactory connectionFactory() {
+			RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration(CLUSTER_NODES);
+			JedisConnectionFactory connectionFactory = new JedisConnectionFactory(clusterConfig);
+			return connectionFactory;
+		}
 	}
 }
diff --git a/src/test/java/org/springframework/data/redis/repository/RedisRepositoryIntegrationTests.java b/src/test/java/org/springframework/data/redis/repository/RedisRepositoryIntegrationTests.java
index 3f9510960a..13c850d02e 100644
--- a/src/test/java/org/springframework/data/redis/repository/RedisRepositoryIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/repository/RedisRepositoryIntegrationTests.java
@@ -30,6 +30,7 @@
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.FilterType;
 import org.springframework.data.convert.ConfigurableTypeInformationMapper;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
 import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 import org.springframework.data.redis.core.RedisOperations;
 import org.springframework.data.redis.core.RedisTemplate;
@@ -61,10 +62,12 @@ public class RedisRepositoryIntegrationTests extends RedisRepositoryIntegrationT
 	static class Config {
 
 		@Bean
-		RedisTemplate<?, ?> redisTemplate() {
+		RedisConnectionFactory connectionFactory() {
+			return new JedisConnectionFactory();
+		}
 
-			JedisConnectionFactory connectionFactory = new JedisConnectionFactory();
-			connectionFactory.afterPropertiesSet();
+		@Bean
+		RedisTemplate<?, ?> redisTemplate(RedisConnectionFactory connectionFactory) {
 
 			RedisTemplate<String, String> template = new RedisTemplate<>();
 			template.setDefaultSerializer(StringRedisSerializer.UTF_8);
diff --git a/src/test/java/org/springframework/data/redis/util/ConnectionVerifier.java b/src/test/java/org/springframework/data/redis/util/ConnectionVerifier.java
new file mode 100644
index 0000000000..3effdb5ba0
--- /dev/null
+++ b/src/test/java/org/springframework/data/redis/util/ConnectionVerifier.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2023 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.redis.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.SmartLifecycle;
+import org.springframework.dao.DataAccessResourceFailureException;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+
+/**
+ * @author Christoph Strobl
+ */
+public class ConnectionVerifier<T extends RedisConnectionFactory> {
+
+	private final T connectionFactory;
+	private final List<Consumer<RedisConnection>> steps = new ArrayList<>(3);
+
+	private Consumer<T> initFactoryFunction = this::initializeFactoryIfRequired;
+
+	ConnectionVerifier(T connectionFactory) {
+		this.connectionFactory = connectionFactory;
+	}
+
+	public static <V extends RedisConnectionFactory> ConnectionVerifier<V> create(V connectionFactory) {
+		return new ConnectionVerifier<>(connectionFactory);
+	}
+
+	public ConnectionVerifier<T> initializeFactory(Consumer<T> initFunction) {
+
+		this.initFactoryFunction = initFunction;
+		return this;
+	}
+
+	public ConnectionVerifier<T> execute(Consumer<RedisConnection> connectionConsumer) {
+		this.steps.add(connectionConsumer);
+		return this;
+	}
+
+	public void verify() {
+		verifyAndRun(it -> {});
+	}
+
+	public void verifyAndClose() {
+		verifyAndRun(this::disposeFactoryIfNeeded);
+	}
+
+	public void verifyAndRun(Consumer<T> disposeFunction) {
+
+		initFactoryFunction.accept(connectionFactory);
+
+		try (RedisConnection connection = connectionFactory.getConnection()) {
+			steps.forEach(step -> step.accept(connection));
+		} finally {
+			disposeFunction.accept(connectionFactory);
+		}
+	}
+
+	private void initializeFactoryIfRequired(T factory) {
+
+		if (factory instanceof InitializingBean initializingBean) {
+			try {
+				initializingBean.afterPropertiesSet();
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		}
+		if (factory instanceof SmartLifecycle smartLifecycle) {
+			if (smartLifecycle.isAutoStartup() && !smartLifecycle.isRunning()) {
+				smartLifecycle.start();
+			}
+		}
+	}
+
+	private void disposeFactoryIfNeeded(T it) {
+
+		if (it instanceof DisposableBean bean) {
+			try {
+				bean.destroy();
+			} catch (Exception e) {
+				throw new DataAccessResourceFailureException("Cannot close resource", e);
+			}
+		} else if (it instanceof Closeable closeable) {
+			try {
+				closeable.close();
+			} catch (IOException e) {
+				throw new DataAccessResourceFailureException("Cannot close resource", e);
+			}
+		} else if (it instanceof SmartLifecycle smartLifecycle && smartLifecycle.isRunning()) {
+			smartLifecycle.stop();
+		}
+	}
+}