diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java
index 831a46ece2..6bfddb2a4d 100644
--- a/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java
+++ b/src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java
@@ -82,6 +82,7 @@
  * @author ihaohong
  * @author Dennis Neufeld
  * @author Shyngys Sapraliyev
+ * @author Jeonggyu Choi
  */
 @SuppressWarnings({ "ConstantConditions", "deprecation" })
 public class DefaultStringRedisConnection implements StringRedisConnection, DecoratedRedisConnection {
@@ -2968,12 +2969,26 @@ public PendingMessages xPending(String key, String groupName, String consumer,
 				Converters.identityConverter());
 	}
 
+	@Override
+	public PendingMessages xPending(String key, String groupName, String consumerName,
+			org.springframework.data.domain.Range<String> range, Long count, Duration minIdleTime) {
+		return convertAndReturn(delegate.xPending(serialize(key), groupName, consumerName, range, count, minIdleTime),
+				Converters.identityConverter());
+	}
+
 	@Override
 	public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
 			Long count) {
 		return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), Converters.identityConverter());
 	}
 
+	@Override
+	public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
+			Long count, Duration minIdleTime) {
+		return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, minIdleTime),
+				Converters.identityConverter());
+	}
+
 	@Override
 	public PendingMessages xPending(String key, String groupName, XPendingOptions options) {
 		return convertAndReturn(delegate.xPending(serialize(key), groupName, options), Converters.identityConverter());
diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java
index 2860dc691c..520a0b4317 100644
--- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java
@@ -60,6 +60,7 @@
  * @author Dengliming
  * @author Mark John Moreno
  * @author jinkshower
+ * @author Jeonggyu Choi
  * @since 2.2
  */
 public interface ReactiveStreamCommands {
@@ -747,6 +748,27 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?
 				.map(CommandResponse::getOutput);
 	}
 
+	/**
+	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
+	 * {@literal consumer group} and over a given {@link Duration} of idle time.
+	 *
+	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	 * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+	 * @param range the range of messages ids to search within. Must not be {@literal null}.
+	 * @param count limit the number of results. Must not be {@literal null}.
+	 * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+	 * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
+	 *         transaction.
+	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	 * @since 3.5
+	 */
+	default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count,
+			Duration minIdleTime) {
+		return xPending(
+				Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).minIdleTime(minIdleTime))).next()
+				.map(CommandResponse::getOutput);
+	}
+
 	/**
 	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
 	 * {@link Consumer} within a {@literal consumer group}.
@@ -763,6 +785,24 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
 		return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
 	}
 
+	/**
+	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
+	 * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
+	 *
+	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	 * @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
+	 * @param range the range of messages ids to search within. Must not be {@literal null}.
+	 * @param count limit the number of results. Must not be {@literal null}.
+	 * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+	 * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
+	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	 * @since 3.5
+	 */
+	default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<?> range, Long count,
+			Duration minIdleTime) {
+		return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime);
+	}
+
 	/**
 	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
 	 * {@literal consumer} within a {@literal consumer group}.
@@ -783,6 +823,27 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
 				.next().map(CommandResponse::getOutput);
 	}
 
+	/**
+	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
+	 * {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
+	 *
+	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	 * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+	 * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
+	 * @param range the range of messages ids to search within. Must not be {@literal null}.
+	 * @param count limit the number of results. Must not be {@literal null}.
+	 * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+	 * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
+	 *         when used in pipeline / transaction.
+	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	 * @since 3.5
+	 */
+	default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
+			Long count, Duration minIdleTime) {
+		return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count)
+				.minIdleTime(minIdleTime))).next().map(CommandResponse::getOutput);
+	}
+
 	/**
 	 * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
 	 * options}.
@@ -798,24 +859,26 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
 	 * Value Object holding parameters for obtaining pending messages.
 	 *
 	 * @author Christoph Strobl
+	 * @author Jeonggyu Choi
 	 * @since 2.3
 	 */
 	class PendingRecordsCommand extends KeyCommand {
 
 		private final String groupName;
-		private final @Nullable String consumerName;
-		private final Range<?> range;
-		private final @Nullable Long count;
+		private final XPendingOptions options;
 
 		private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String consumerName, Range<?> range,
-				@Nullable Long count) {
+				@Nullable Long count, @Nullable Duration minIdleTime) {
+
+			this(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime));
+		}
+
+		private PendingRecordsCommand(ByteBuffer key, String groupName, XPendingOptions options) {
 
 			super(key);
 
 			this.groupName = groupName;
-			this.consumerName = consumerName;
-			this.range = range;
-			this.count = count;
+			this.options = options;
 		}
 
 		/**
@@ -826,7 +889,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, @Nullable String
 		 * @return new instance of {@link PendingRecordsCommand}.
 		 */
 		static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
-			return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null);
+			return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null);
 		}
 
 		/**
@@ -841,7 +904,7 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
 			Assert.notNull(range, "Range must not be null");
 			Assert.isTrue(count > -1, "Count must not be negative");
 
-			return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
+			return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.range(range, count));
 		}
 
 		/**
@@ -851,7 +914,20 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
 		 * @return new instance of {@link PendingRecordsCommand}.
 		 */
 		public PendingRecordsCommand consumer(String consumerName) {
-			return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
+			return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.unbounded().consumer(consumerName));
+		}
+
+		/**
+		 * Append given minimum idle time.
+		 *
+		 * @param minIdleTime must not be {@literal null}.
+		 * @return new instance of {@link PendingRecordsCommand}.
+		 */
+		public PendingRecordsCommand minIdleTime(Duration minIdleTime) {
+
+			Assert.notNull(minIdleTime, "Idle must not be null");
+
+			return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.unbounded().minIdleTime(minIdleTime));
 		}
 
 		public String getGroupName() {
@@ -863,14 +939,14 @@ public String getGroupName() {
 		 */
 		@Nullable
 		public String getConsumerName() {
-			return consumerName;
+			return options.getConsumerName();
 		}
 
 		/**
 		 * @return never {@literal null}.
 		 */
 		public Range<?> getRange() {
-			return range;
+			return options.getRange();
 		}
 
 		/**
@@ -878,21 +954,36 @@ public Range<?> getRange() {
 		 */
 		@Nullable
 		public Long getCount() {
-			return count;
+			return options.getCount();
+		}
+
+		/**
+		 * @return can be {@literal null}.
+		 */
+		@Nullable
+		public Duration getMinIdleTime() {
+			return options.getMinIdleTime();
 		}
 
 		/**
 		 * @return {@literal true} if a consumer name is present.
 		 */
 		public boolean hasConsumer() {
-			return StringUtils.hasText(consumerName);
+			return StringUtils.hasText(options.getConsumerName());
 		}
 
 		/**
 		 * @return {@literal true} count is set.
 		 */
 		public boolean isLimited() {
-			return count != null;
+			return options.getCount() != null;
+		}
+
+		/**
+		 * @return {@literal true} if idle is set.
+		 */
+		public boolean hasIdle() {
+			return options.getMinIdleTime() != null;
 		}
 	}
 
diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java
index 8385d70d34..9d2477dc96 100644
--- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java
@@ -41,6 +41,7 @@
  * @author Tugdual Grall
  * @author Dengliming
  * @author Mark John Moreno
+ * @author Jeonggyu Choi
  * @see <a href="https://redis.io/topics/streams-intro">Redis Documentation - Streams</a>
  * @since 2.2
  */
@@ -706,6 +707,25 @@ default PendingMessages xPending(byte[] key, String groupName, Range<?> range, L
 		return xPending(key, groupName, XPendingOptions.range(range, count));
 	}
 
+	/**
+	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
+	 * {@literal consumer group} and over a given {@link Duration} of idle time.
+	 *
+	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	 * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+	 * @param range the range of messages ids to search within. Must not be {@literal null}.
+	 * @param count limit the number of results. Must not be {@literal null}.
+	 * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+	 * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
+	 *         transaction.
+	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	 * @since 3.5
+	 */
+	@Nullable
+	default PendingMessages xPending(byte[] key, String groupName, Range<?> range, Long count, Duration minIdleTime) {
+		return xPending(key, groupName, XPendingOptions.range(range, count).minIdleTime(minIdleTime));
+	}
+
 	/**
 	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
 	 * {@link Consumer} within a {@literal consumer group}.
@@ -723,6 +743,24 @@ default PendingMessages xPending(byte[] key, Consumer consumer, Range<?> range,
 		return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
 	}
 
+	/**
+	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
+	 * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
+	 *
+	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	 * @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
+	 * @param range the range of messages ids to search within. Must not be {@literal null}.
+	 * @param count limit the number of results. Must not be {@literal null}.
+	 * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+	 * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
+	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	 * @since 3.5
+	 */
+	@Nullable
+	default PendingMessages xPending(byte[] key, Consumer consumer, Range<?> range, Long count, Duration minIdleTime) {
+		return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime);
+	}
+
 	/**
 	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
 	 * {@literal consumer} within a {@literal consumer group}.
@@ -742,6 +780,28 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa
 		return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName));
 	}
 
+	/**
+	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
+	 * {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
+	 *
+	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	 * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+	 * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
+	 * @param range the range of messages ids to search within. Must not be {@literal null}.
+	 * @param count limit the number of results. Must not be {@literal null}.
+	 * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+	 * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
+	 *         when used in pipeline / transaction.
+	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	 * @since 3.5
+	 */
+	@Nullable
+	default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range<?> range, Long count,
+			Duration minIdleTime) {
+		return xPending(key, groupName,
+				XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime));
+	}
+
 	/**
 	 * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
 	 * options}.
@@ -761,6 +821,7 @@ default PendingMessages xPending(byte[] key, String groupName, String consumerNa
 	 * Value Object holding parameters for obtaining pending messages.
 	 *
 	 * @author Christoph Strobl
+	 * @author Jeonggyu Choi
 	 * @since 2.3
 	 */
 	class XPendingOptions {
@@ -768,12 +829,15 @@ class XPendingOptions {
 		private final @Nullable String consumerName;
 		private final Range<?> range;
 		private final @Nullable Long count;
+		private final @Nullable Duration minIdleTime;
 
-		private XPendingOptions(@Nullable String consumerName, Range<?> range, @Nullable Long count) {
+		private XPendingOptions(@Nullable String consumerName, Range<?> range, @Nullable Long count,
+				@Nullable Duration minIdleTime) {
 
 			this.range = range;
 			this.count = count;
 			this.consumerName = consumerName;
+			this.minIdleTime = minIdleTime;
 		}
 
 		/**
@@ -782,7 +846,7 @@ private XPendingOptions(@Nullable String consumerName, Range<?> range, @Nullable
 		 * @return new instance of {@link XPendingOptions}.
 		 */
 		public static XPendingOptions unbounded() {
-			return new XPendingOptions(null, Range.unbounded(), null);
+			return new XPendingOptions(null, Range.unbounded(), null, null);
 		}
 
 		/**
@@ -795,7 +859,7 @@ public static XPendingOptions unbounded(Long count) {
 
 			Assert.isTrue(count > -1, "Count must not be negative");
 
-			return new XPendingOptions(null, Range.unbounded(), count);
+			return new XPendingOptions(null, Range.unbounded(), count, null);
 		}
 
 		/**
@@ -810,7 +874,7 @@ public static XPendingOptions range(Range<?> range, Long count) {
 			Assert.notNull(range, "Range must not be null");
 			Assert.isTrue(count > -1, "Count must not be negative");
 
-			return new XPendingOptions(null, range, count);
+			return new XPendingOptions(null, range, count, null);
 		}
 
 		/**
@@ -820,7 +884,23 @@ public static XPendingOptions range(Range<?> range, Long count) {
 		 * @return new instance of {@link XPendingOptions}.
 		 */
 		public XPendingOptions consumer(String consumerName) {
-			return new XPendingOptions(consumerName, range, count);
+
+			Assert.notNull(consumerName, "Consumer name must not be null");
+
+			return new XPendingOptions(consumerName, range, count, minIdleTime);
+		}
+
+		/**
+		 * Append given minimum idle time.
+		 *
+		 * @param minIdleTime must not be {@literal null}.
+		 * @return new instance of {@link XPendingOptions}.
+		 */
+		public XPendingOptions minIdleTime(Duration minIdleTime) {
+
+			Assert.notNull(minIdleTime, "Idle must not be null");
+
+			return new XPendingOptions(consumerName, range, count, minIdleTime);
 		}
 
 		/**
@@ -846,6 +926,26 @@ public String getConsumerName() {
 			return consumerName;
 		}
 
+		/**
+		 * @return can be {@literal null}.
+		 */
+		@Nullable
+		public Duration getMinIdleTime() {
+			return minIdleTime;
+		}
+
+		/**
+		 * @return can be {@literal null}.
+		 */
+		@Nullable
+		public Long getIdleMillis() {
+			if (minIdleTime == null) {
+				return null;
+			}
+
+			return minIdleTime.toMillis();
+		}
+
 		/**
 		 * @return {@literal true} if a consumer name is present.
 		 */
@@ -859,6 +959,13 @@ public boolean hasConsumer() {
 		public boolean isLimited() {
 			return count != null;
 		}
+
+		/**
+		 * @return {@literal true} if idle time is set.
+		 */
+		public boolean hasIdle() {
+			return minIdleTime != null;
+		}
 	}
 
 	/**
diff --git a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java
index 1069e430c8..9bd7121afa 100644
--- a/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java
+++ b/src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java
@@ -71,6 +71,7 @@
  * @author Andrey Shlykov
  * @author ihaohong
  * @author Shyngys Sapraliyev
+ * @author Jeonggyu Choi
  * @see RedisCallback
  * @see RedisSerializer
  * @see StringRedisTemplate
@@ -3134,6 +3135,35 @@ default Long xDel(String key, String... entryIds) {
 	@Nullable
 	PendingMessagesSummary xPending(String key, String groupName);
 
+	// /**
+	// * Obtained detailed information about all pending messages for a given {@link Consumer}.
+	// *
+	// * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	// * @param consumer the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}.
+	// * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
+	// * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	// * @since 3.5
+	// */
+	// @Nullable
+	// default PendingMessages xPending(String key, Consumer consumer) {
+	// return xPending(key, consumer.getGroup(), consumer.getName());
+	// }
+
+	// /**
+	// * Obtained detailed information about all pending messages for a given {@literal consumer}.
+	// *
+	// * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	// * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+	// * @param consumerName the consumer to fetch {@link PendingMessages} for. Must not be {@literal null}.
+	// * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
+	// * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	// * @since 3.5
+	// */
+	// @Nullable
+	// default PendingMessages xPending(String key, String groupName, String consumerName) {
+	// return xPending(key, groupName, XPendingOptions.unbounded().consumer(consumerName));
+	// }
+
 	/**
 	 * Obtain detailed information about pending {@link PendingMessage messages} for a given
 	 * {@link org.springframework.data.domain.Range} within a {@literal consumer group}.
@@ -3152,6 +3182,64 @@ default Long xDel(String key, String... entryIds) {
 	PendingMessages xPending(String key, String groupName, String consumerName,
 			org.springframework.data.domain.Range<String> range, Long count);
 
+	/**
+	 * Obtain detailed information about pending {@link PendingMessage messages} for a given
+	 * {@link org.springframework.data.domain.Range} and {@literal consumer} within a {@literal consumer group} and over a
+	 * given {@link Duration} of idle time.
+	 *
+	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	 * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+	 * @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
+	 * @param range the range of messages ids to search within. Must not be {@literal null}.
+	 * @param count limit the number of results. Must not be {@literal null}.
+	 * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+	 * @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
+	 *         when used in pipeline / transaction.
+	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	 * @since 3.5
+	 */
+	@Nullable
+	PendingMessages xPending(String key, String groupName, String consumerName,
+			org.springframework.data.domain.Range<String> range, Long count, Duration minIdleTime);
+
+	/**
+	 * Obtain detailed information about pending {@link PendingMessage messages} for a given
+	 * {@link org.springframework.data.domain.Range} and {@link Consumer} within a {@literal consumer group}.
+	 *
+	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	 * @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
+	 * @param range the range of messages ids to search within. Must not be {@literal null}.
+	 * @param count limit the number of results. Must not be {@literal null}.
+	 * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
+	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	 * @since 3.5
+	 */
+	@Nullable
+	default PendingMessages xPending(String key, Consumer consumer, org.springframework.data.domain.Range<String> range,
+			Long count) {
+		return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
+	}
+
+	/**
+	 * Obtain detailed information about pending {@link PendingMessage messages} for a given
+	 * {@link org.springframework.data.domain.Range} and {@link Consumer} within a {@literal consumer group} and over a
+	 * given {@link Duration} of idle time.
+	 *
+	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	 * @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
+	 * @param range the range of messages ids to search within. Must not be {@literal null}.
+	 * @param count limit the number of results. Must not be {@literal null}.
+	 * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+	 * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
+	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	 * @since 3.5
+	 */
+	@Nullable
+	default PendingMessages xPending(String key, Consumer consumer, org.springframework.data.domain.Range<String> range,
+			Long count, Duration minIdleTime) {
+		return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime);
+	}
+
 	/**
 	 * Obtain detailed information about pending {@link PendingMessage messages} for a given
 	 * {@link org.springframework.data.domain.Range} within a {@literal consumer group}.
@@ -3169,6 +3257,25 @@ PendingMessages xPending(String key, String groupName, String consumerName,
 	PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
 			Long count);
 
+	/**
+	 * Obtain detailed information about pending {@link PendingMessage messages} for a given
+	 * {@link org.springframework.data.domain.Range} within a {@literal consumer group} and over a given {@link Duration}
+	 * of idle time.
+	 *
+	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	 * @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
+	 * @param range the range of messages ids to search within. Must not be {@literal null}.
+	 * @param count limit the number of results. Must not be {@literal null}.
+	 * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+	 * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
+	 *         transaction.
+	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	 * @since 3.5
+	 */
+	@Nullable
+	PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
+			Long count, Duration minIdleTime);
+
 	/**
 	 * Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
 	 * options}.
diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java
index 9d26a6cd8d..c99ceb3c54 100644
--- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java
@@ -48,6 +48,7 @@
 
 /**
  * @author Dengliming
+ * @author Jeonggyu Choi
  * @since 2.3
  */
 class JedisClusterStreamCommands implements RedisStreamCommands {
@@ -283,6 +284,10 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op
 				pendingParams = pendingParams.consumer(consumerName);
 			}
 
+			if (options.hasIdle()) {
+				pendingParams = pendingParams.idle(options.getIdleMillis());
+			}
+
 			List<Object> response = connection.getCluster().xpending(key, group, pendingParams);
 
 			return StreamConverters.toPendingMessages(groupName, range,
diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java
index a68eef451a..2466832c29 100644
--- a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java
+++ b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java
@@ -55,6 +55,7 @@
  *
  * @author dengliming
  * @author Mark Paluch
+ * @author Jeonggyu Choi
  * @since 2.3
  */
 class StreamConverters {
@@ -306,6 +307,9 @@ public static XPendingParams toXPendingParams(RedisStreamCommands.XPendingOption
 		if (options.hasConsumer()) {
 			xPendingParams.consumer(options.getConsumerName());
 		}
+		if (options.hasIdle()) {
+			xPendingParams.idle(options.getIdleMillis());
+		}
 
 		return xPendingParams;
 	}
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java
index 1f45c373cd..d98d1fd1c8 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java
@@ -18,6 +18,7 @@
 import io.lettuce.core.XAddArgs;
 import io.lettuce.core.XClaimArgs;
 import io.lettuce.core.XGroupCreateArgs;
+import io.lettuce.core.XPendingArgs;
 import io.lettuce.core.XReadArgs;
 import io.lettuce.core.XReadArgs.StreamOffset;
 import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
@@ -56,6 +57,7 @@
  * @author Tugdual Grall
  * @author Dengliming
  * @author Mark John Moreno
+ * @author Jeonggyu Choi
  * @since 2.2
  */
 class LettuceReactiveStreamCommands implements ReactiveStreamCommands {
@@ -235,9 +237,17 @@ public Flux<CommandResponse<PendingRecordsCommand, PendingMessages>> xPending(
 			io.lettuce.core.Limit limit = command.isLimited() ? io.lettuce.core.Limit.from(command.getCount())
 					: io.lettuce.core.Limit.unlimited();
 
-			Flux<PendingMessage> publisher = command.hasConsumer() ? cmd.xpending(command.getKey(),
-					io.lettuce.core.Consumer.from(groupName, ByteUtils.getByteBuffer(command.getConsumerName())), range, limit)
-					: cmd.xpending(command.getKey(), groupName, range, limit);
+			XPendingArgs<ByteBuffer> xPendingArgs = XPendingArgs.Builder.xpending(groupName, range, limit);
+			if (command.hasConsumer()) {
+				io.lettuce.core.Consumer<ByteBuffer> consumer = io.lettuce.core.Consumer.from(groupName,
+						ByteUtils.getByteBuffer(command.getConsumerName()));
+				xPendingArgs.consumer(consumer);
+			}
+			if (command.hasIdle()) {
+				xPendingArgs.idle(command.getMinIdleTime());
+			}
+
+			Flux<PendingMessage> publisher = cmd.xpending(command.getKey(), xPendingArgs);
 
 			return publisher.collectList().map(it -> {
 
diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java
index ad5c2281c2..4a9b42eb90 100644
--- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java
@@ -18,6 +18,7 @@
 import io.lettuce.core.XAddArgs;
 import io.lettuce.core.XClaimArgs;
 import io.lettuce.core.XGroupCreateArgs;
+import io.lettuce.core.XPendingArgs;
 import io.lettuce.core.XReadArgs;
 import io.lettuce.core.api.async.RedisStreamAsyncCommands;
 import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
@@ -50,6 +51,7 @@
  * @author Dejan Jankov
  * @author Dengliming
  * @author Mark John Moreno
+ * @author Jeonggyu Choi
  * @since 2.2
  */
 class LettuceStreamCommands implements RedisStreamCommands {
@@ -217,15 +219,17 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op
 		io.lettuce.core.Limit limit = options.isLimited() ? io.lettuce.core.Limit.from(options.getCount())
 				: io.lettuce.core.Limit.unlimited();
 
+		XPendingArgs<byte[]> xPendingArgs = XPendingArgs.Builder.xpending(group, range, limit);
 		if (options.hasConsumer()) {
-
-			return connection.invoke()
-					.from(RedisStreamAsyncCommands::xpending, key,
-							io.lettuce.core.Consumer.from(group, LettuceConverters.toBytes(options.getConsumerName())), range, limit)
-					.get(it -> StreamConverters.toPendingMessages(groupName, options.getRange(), it));
+			io.lettuce.core.Consumer<byte[]> consumer = io.lettuce.core.Consumer.from(group,
+					LettuceConverters.toBytes(options.getConsumerName()));
+			xPendingArgs.consumer(consumer);
+		}
+		if (options.hasIdle()) {
+			xPendingArgs.idle(options.getMinIdleTime());
 		}
 
-		return connection.invoke().from(RedisStreamAsyncCommands::xpending, key, group, range, limit)
+		return connection.invoke().from(RedisStreamAsyncCommands::xpending, key, xPendingArgs)
 				.get(it -> StreamConverters.toPendingMessages(groupName, options.getRange(), it));
 	}
 
diff --git a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java
index 3cb27d1dcd..8716a10c82 100644
--- a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java
@@ -16,6 +16,7 @@
 package org.springframework.data.redis.core;
 
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -56,6 +57,7 @@
  * @author Marcin Zielinski
  * @author John Blum
  * @author jinkshower
+ * @author Jeonggyu Choi
  * @since 2.2
  */
 class DefaultStreamOperations<K, HK, HV> extends AbstractOperations<K, Object> implements StreamOperations<K, HK, HV> {
@@ -222,6 +224,13 @@ public PendingMessages pending(K key, String group, Range<?> range, long count)
 		return execute(connection -> connection.xPending(rawKey, group, range, count));
 	}
 
+	@Override
+	public PendingMessages pending(K key, String group, Range<?> range, long count, Duration minIdleTime) {
+
+		byte[] rawKey = rawKey(key);
+		return execute(connection -> connection.xPending(rawKey, group, range, count, minIdleTime));
+	}
+
 	@Override
 	public PendingMessages pending(K key, Consumer consumer, Range<?> range, long count) {
 
@@ -229,6 +238,13 @@ public PendingMessages pending(K key, Consumer consumer, Range<?> range, long co
 		return execute(connection -> connection.xPending(rawKey, consumer, range, count));
 	}
 
+	@Override
+	public PendingMessages pending(K key, Consumer consumer, Range<?> range, long count, Duration minIdleTime) {
+
+		byte[] rawKey = rawKey(key);
+		return execute(connection -> connection.xPending(rawKey, consumer, range, count, minIdleTime));
+	}
+
 	@Override
 	public PendingMessagesSummary pending(K key, String group) {
 
diff --git a/src/main/java/org/springframework/data/redis/core/StreamOperations.java b/src/main/java/org/springframework/data/redis/core/StreamOperations.java
index 59a0647f29..5caacc7d71 100644
--- a/src/main/java/org/springframework/data/redis/core/StreamOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/StreamOperations.java
@@ -55,6 +55,7 @@
  * @author Marcin Zielinski
  * @author John Blum
  * @author jinkshower
+ * @author Jeonggyu Choi
  * @since 2.2
  */
 public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV> {
@@ -376,6 +377,22 @@ default PendingMessages pending(K key, Consumer consumer) {
 	 */
 	PendingMessages pending(K key, String group, Range<?> range, long count);
 
+	/**
+	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
+	 * {@literal consumer group} and over a given {@link Duration} of idle time.
+	 *
+	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	 * @param group the name of the {@literal consumer group}. Must not be {@literal null}.
+	 * @param range the range of messages ids to search within. Must not be {@literal null}.
+	 * @param count limit the number of results. Must not be {@literal null}.
+	 * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+	 * @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
+	 *         transaction.
+	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	 * @since 3.5
+	 */
+	PendingMessages pending(K key, String group, Range<?> range, long count, Duration minIdleTime);
+
 	/**
 	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
 	 * {@link Consumer} within a {@literal consumer group}.
@@ -390,6 +407,21 @@ default PendingMessages pending(K key, Consumer consumer) {
 	 */
 	PendingMessages pending(K key, Consumer consumer, Range<?> range, long count);
 
+	/**
+	 * Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
+	 * {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
+	 *
+	 * @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
+	 * @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
+	 * @param range the range of messages ids to search within. Must not be {@literal null}.
+	 * @param count limit the number of results. Must not be {@literal null}.
+	 * @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
+	 * @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
+	 * @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
+	 * @since 3.5
+	 */
+	PendingMessages pending(K key, Consumer consumer, Range<?> range, long count, Duration minIdleTime);
+
 	/**
 	 * Get the length of a stream.
 	 *
diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java
index 573f105247..a0e0001d65 100644
--- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java
@@ -139,6 +139,7 @@
  * @author Shyngys Sapraliyev
  * @author Roman Osadchuk
  * @author Tihomir Mateev
+ * @author Jeonggyu Choi
  */
 public abstract class AbstractConnectionIntegrationTests {
 
@@ -4102,16 +4103,97 @@ void xPendingShouldLoadEmptyOverviewCorrectly() {
 		assertThat(info.getPendingMessagesPerConsumer()).isEmpty();
 	}
 
+	// @Test // GH-2046
+	// @EnabledOnCommand("XADD")
+	// void xPendingShouldLoadPendingMessagesForConsumer() {
+	//
+	// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+	// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
+	// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
+	// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
+	//
+	// actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer")));
+	//
+	// List<Object> results = getResults();
+	// assertThat(results).hasSize(4);
+	// PendingMessages pending = (PendingMessages) results.get(3);
+	//
+	// assertThat(pending.size()).isOne();
+	// assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
+	// assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
+	// assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
+	// assertThat(pending.get(0).getIdAsString()).isNotNull();
+	// }
+
+	// @Test // GH-2046
+	// @EnabledOnCommand("XADD")
+	// void xPendingShouldLoadEmptyPendingMessagesForNotExistingConsumer() {
+	//
+	// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+	// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
+	// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
+	// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
+	//
+	// actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer2")));
+	//
+	// List<Object> results = getResults();
+	// assertThat(results).hasSize(4);
+	// PendingMessages pending = (PendingMessages) results.get(3);
+	//
+	// assertThat(pending.size()).isZero();
+	// }
+
+	// @Test // GH-2046
+	// @EnabledOnCommand("XADD")
+	// void xPendingShouldLoadPendingMessagesForGroupNameAndConsumerName() {
+	//
+	// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+	// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
+	// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
+	// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
+	//
+	// actual.add(connection.xPending(KEY_1, "my-group", "my-consumer"));
+	//
+	// List<Object> results = getResults();
+	// assertThat(results).hasSize(4);
+	// PendingMessages pending = (PendingMessages) results.get(3);
+	//
+	// assertThat(pending.size()).isOne();
+	// assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
+	// assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
+	// assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
+	// assertThat(pending.get(0).getIdAsString()).isNotNull();
+	// }
+
+	// @Test // GH-2046
+	// @EnabledOnCommand("XADD")
+	// void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerName() {
+	//
+	// actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+	// actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
+	// actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
+	// StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
+	//
+	// actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2"));
+	//
+	// List<Object> results = getResults();
+	// assertThat(results).hasSize(4);
+	// PendingMessages pending = (PendingMessages) results.get(3);
+	//
+	// assertThat(pending.size()).isZero();
+	// }
+
 	@Test // DATAREDIS-1084
 	@EnabledOnCommand("XADD")
-	public void xPendingShouldLoadPendingMessages() {
+	public void xPendingShouldLoadPendingMessagesForConsumerNameWithRange() {
 
 		actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
 		actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
 		actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
 				StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
 
-		actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L));
+		actual.add(
+				connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(), 10L));
 
 		List<Object> results = getResults();
 		assertThat(results).hasSize(4);
@@ -4124,16 +4206,36 @@ public void xPendingShouldLoadPendingMessages() {
 		assertThat(pending.get(0).getIdAsString()).isNotNull();
 	}
 
-	@Test // DATAREDIS-1207
+	@Test // DATAREDIS-1084
 	@EnabledOnCommand("XADD")
-	public void xPendingShouldWorkWithBoundedRange() {
+	public void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerNameWithRange() {
 
 		actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
 		actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
 		actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
 				StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
 
-		actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.closed("0-0", "+"), 10L));
+		actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2",
+				org.springframework.data.domain.Range.unbounded(), 10L));
+
+		List<Object> results = getResults();
+		assertThat(results).hasSize(4);
+		PendingMessages pending = (PendingMessages) results.get(3);
+
+		assertThat(pending.size()).isZero();
+	}
+
+	@Test // GH-2046
+	@EnabledOnCommand("XADD")
+	void xPendingShouldLoadPendingMessagesForIdle() {
+
+		actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+		actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
+		actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
+				StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
+
+		actual.add(connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(),
+				10L, Duration.ZERO));
 
 		List<Object> results = getResults();
 		assertThat(results).hasSize(4);
@@ -4146,17 +4248,37 @@ public void xPendingShouldWorkWithBoundedRange() {
 		assertThat(pending.get(0).getIdAsString()).isNotNull();
 	}
 
-	@Test // DATAREDIS-1084
+	@Test // GH-2046
 	@EnabledOnCommand("XADD")
-	public void xPendingShouldLoadPendingMessagesForConsumer() {
+	void xPendingShouldLoadEmptyPendingMessagesForNonOverIdle() {
 
 		actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
 		actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
 		actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
 				StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
 
-		actual.add(
-				connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(), 10L));
+		Duration nonOverIdle = Duration.ofSeconds(10);
+		actual.add(connection.xPending(KEY_1, "my-group", "my-consumer", org.springframework.data.domain.Range.unbounded(),
+				10L, nonOverIdle));
+
+		List<Object> results = getResults();
+		assertThat(results).hasSize(4);
+		PendingMessages pending = (PendingMessages) results.get(3);
+
+		assertThat(pending.size()).isZero();
+	}
+
+	@Test // GH-2046
+	@EnabledOnCommand("XADD")
+	void xPendingShouldLoadPendingMessagesForConsumerWithRange() {
+
+		actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+		actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
+		actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
+				StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
+
+		actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"),
+				org.springframework.data.domain.Range.unbounded(), 10L));
 
 		List<Object> results = getResults();
 		assertThat(results).hasSize(4);
@@ -4169,16 +4291,16 @@ public void xPendingShouldLoadPendingMessagesForConsumer() {
 		assertThat(pending.get(0).getIdAsString()).isNotNull();
 	}
 
-	@Test // DATAREDIS-1084
+	@Test // GH-2046
 	@EnabledOnCommand("XADD")
-	public void xPendingShouldLoadPendingMessagesForNonExistingConsumer() {
+	void xPendingShouldLoadEmptyPendingMessagesForNonExistingConsumerWithRange() {
 
 		actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
 		actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
 		actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
 				StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
 
-		actual.add(connection.xPending(KEY_1, "my-group", "my-consumer-2",
+		actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer-2"),
 				org.springframework.data.domain.Range.unbounded(), 10L));
 
 		List<Object> results = getResults();
@@ -4188,9 +4310,96 @@ public void xPendingShouldLoadPendingMessagesForNonExistingConsumer() {
 		assertThat(pending.size()).isZero();
 	}
 
+	@Test // GH-2046
+	@EnabledOnCommand("XADD")
+	void xPendingShouldLoadPendingMessagesForIdleWithConsumer() {
+
+		actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+		actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
+		actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
+				StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
+
+		actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"),
+				org.springframework.data.domain.Range.unbounded(), 10L, Duration.ZERO));
+
+		List<Object> results = getResults();
+		assertThat(results).hasSize(4);
+		PendingMessages pending = (PendingMessages) results.get(3);
+
+		assertThat(pending.size()).isOne();
+		assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
+		assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
+		assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
+		assertThat(pending.get(0).getIdAsString()).isNotNull();
+	}
+
+	@Test // GH-2046
+	@EnabledOnCommand("XADD")
+	void xPendingShouldLoadEmptyPendingMessagesForNonOverIdleWithConsumer() {
+
+		actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+		actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
+		actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
+				StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
+
+		Duration nonOverIdle = Duration.ofSeconds(10);
+		actual.add(connection.xPending(KEY_1, Consumer.from("my-group", "my-consumer"),
+				org.springframework.data.domain.Range.unbounded(), 10L, nonOverIdle));
+
+		List<Object> results = getResults();
+		assertThat(results).hasSize(4);
+		PendingMessages pending = (PendingMessages) results.get(3);
+
+		assertThat(pending.size()).isZero();
+	}
+
+	@Test // DATAREDIS-1207
+	@EnabledOnCommand("XADD")
+	public void xPendingShouldWorkWithBoundedRange() {
+
+		actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+		actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
+		actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
+				StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
+
+		actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.closed("0-0", "+"), 10L));
+
+		List<Object> results = getResults();
+		assertThat(results).hasSize(4);
+		PendingMessages pending = (PendingMessages) results.get(3);
+
+		assertThat(pending.size()).isOne();
+		assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
+		assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
+		assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
+		assertThat(pending.get(0).getIdAsString()).isNotNull();
+	}
+
 	@Test // DATAREDIS-1084
 	@EnabledOnCommand("XADD")
-	void xPendingShouldLoadEmptyPendingMessages() {
+	public void xPendingShouldLoadPendingMessagesForGroupNameWithRange() {
+
+		actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+		actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
+		actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
+				StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
+
+		actual.add(connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L));
+
+		List<Object> results = getResults();
+		assertThat(results).hasSize(4);
+		PendingMessages pending = (PendingMessages) results.get(3);
+
+		assertThat(pending.size()).isOne();
+		assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
+		assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
+		assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
+		assertThat(pending.get(0).getIdAsString()).isNotNull();
+	}
+
+	@Test // DATAREDIS-1084
+	@EnabledOnCommand("XADD")
+	void xPendingShouldLoadEmptyPendingMessagesForGroupNameWithRange() {
 
 		actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
 		actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
@@ -4204,6 +4413,49 @@ void xPendingShouldLoadEmptyPendingMessages() {
 		assertThat(pending.size()).isZero();
 	}
 
+	@Test // GH-2046
+	@EnabledOnCommand("XADD")
+	void xPendingShouldLoadPendingMessagesForIdleWithGroupName() {
+
+		actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+		actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
+		actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
+				StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
+
+		actual.add(
+				connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L, Duration.ZERO));
+
+		List<Object> results = getResults();
+		assertThat(results).hasSize(4);
+		PendingMessages pending = (PendingMessages) results.get(3);
+
+		assertThat(pending.size()).isOne();
+		assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer");
+		assertThat(pending.get(0).getGroupName()).isEqualTo("my-group");
+		assertThat(pending.get(0).getTotalDeliveryCount()).isOne();
+		assertThat(pending.get(0).getIdAsString()).isNotNull();
+	}
+
+	@Test // GH-2046
+	@EnabledOnCommand("XADD")
+	void xPendingShouldLoadEmptyPendingMessagesForNonOverIdleWithGroupName() {
+
+		actual.add(connection.xAdd(KEY_1, Collections.singletonMap(KEY_2, VALUE_2)));
+		actual.add(connection.xGroupCreate(KEY_1, ReadOffset.from("0"), "my-group"));
+		actual.add(connection.xReadGroupAsString(Consumer.from("my-group", "my-consumer"),
+				StreamOffset.create(KEY_1, ReadOffset.lastConsumed())));
+
+		Duration nonOverIdle = Duration.ofSeconds(10);
+		actual.add(
+				connection.xPending(KEY_1, "my-group", org.springframework.data.domain.Range.unbounded(), 10L, nonOverIdle));
+
+		List<Object> results = getResults();
+		assertThat(results).hasSize(4);
+		PendingMessages pending = (PendingMessages) results.get(3);
+
+		assertThat(pending.size()).isZero();
+	}
+
 	@Test // DATAREDIS-1084
 	@EnabledOnCommand("XADD")
 	public void xClaim() throws InterruptedException {
diff --git a/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java
index dedaf0c84d..b5445247ef 100644
--- a/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/ReactiveStreamCommandsUnitTests.java
@@ -28,6 +28,7 @@
  * Unit tests for {@link ReactiveStreamCommands}.
  *
  * @author jinkshower
+ * @author Jeonggyu Choi
  */
 class ReactiveStreamCommandsUnitTests {
 
@@ -53,4 +54,14 @@ void pendingRecordsCommandRangeShouldThrowExceptionWhenCountIsNegative() {
 
 		assertThatIllegalArgumentException().isThrownBy(() -> command.range(range, -1L));
 	}
+
+	@Test // GH-2046
+	void pendingRecordsCommandIdleShouldThrowExceptionWhenIdleIsNull() {
+		ByteBuffer key = ByteBuffer.wrap("my-stream".getBytes());
+		String groupName = "my-group";
+
+		PendingRecordsCommand command = PendingRecordsCommand.pending(key, groupName);
+
+		assertThatIllegalArgumentException().isThrownBy(() -> command.minIdleTime(null));
+	}
 }
diff --git a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java
index 74579d57a6..6d332b9aae 100644
--- a/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/RedisStreamCommandsUnitTests.java
@@ -26,6 +26,7 @@
  * Unit tests for {@link RedisStreamCommands}.
  *
  * @author jinkshower
+ * @author Jeonggyu Choi
  */
 class RedisStreamCommandsUnitTests {
 
@@ -46,4 +47,18 @@ void xPendingOptionsRangeShouldThrowExceptionWhenCountIsNegative() {
 
 		assertThatIllegalArgumentException().isThrownBy(() -> XPendingOptions.range(range, -1L));
 	}
+
+	@Test // GH-2046
+	void xPendingOptionsIdleShouldThrowExceptionWhenIdleIsNull() {
+		XPendingOptions xPendingOptions = XPendingOptions.unbounded();
+
+		assertThatIllegalArgumentException().isThrownBy(() -> xPendingOptions.minIdleTime(null));
+	}
+
+	@Test // GH-2046
+	void xPendingOptionsConsumerShouldThrowExceptionWhenConsumerNameIsNull() {
+		XPendingOptions xPendingOptions = XPendingOptions.unbounded();
+
+		assertThatIllegalArgumentException().isThrownBy(() -> xPendingOptions.consumer(null));
+	}
 }
diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java b/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java
new file mode 100644
index 0000000000..5d75615fee
--- /dev/null
+++ b/src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java
@@ -0,0 +1,30 @@
+package org.springframework.data.redis.connection.jedis;
+
+import static org.assertj.core.api.Assertions.*;
+
+import redis.clients.jedis.params.XPendingParams;
+
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
+
+/**
+ * @author Jeonggyu Choi
+ */
+class StreamConvertersUnitTest {
+
+	@Test // GH-2046
+	void shouldConvertIdle() throws NoSuchFieldException, IllegalAccessException {
+		XPendingOptions options = XPendingOptions.unbounded(5L).minIdleTime(Duration.of(1, ChronoUnit.HOURS));
+
+		XPendingParams xPendingParams = StreamConverters.toXPendingParams(options);
+
+		Field idle = XPendingParams.class.getDeclaredField("idle");
+		idle.setAccessible(true);
+		Long idleValue = (Long) idle.get(xPendingParams);
+		assertThat(idleValue).isEqualTo(Duration.of(1, ChronoUnit.HOURS).toMillis());
+	}
+}
diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java
index b181ef6a60..e8107855e1 100644
--- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java
@@ -21,6 +21,7 @@
 import reactor.test.StepVerifier;
 
 import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 
 import org.assertj.core.data.Offset;
@@ -44,6 +45,7 @@
  * @author Christoph Strobl
  * @author Tugdual Grall
  * @author Dengliming
+ * @author Jeonggyu Choi
  */
 @EnabledOnCommand("XADD")
 public class LettuceReactiveStreamCommandsIntegrationTests extends LettuceReactiveCommandsTestSupport {
@@ -340,6 +342,156 @@ void xPendingShouldLoadPendingMessagesForNonExistingConsumer() {
 				}).verifyComplete();
 	}
 
+	@ParameterizedRedisTest // GH-2046
+	void xPendingShouldLoadPendingMessagesForGroupAndIdle() {
+
+		String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1);
+		nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group");
+
+		nativeCommands.xadd(KEY_1, KEY_2, VALUE_2);
+
+		connection.streamCommands()
+				.xReadGroup(Consumer.from("my-group", "my-consumer"),
+						StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed()))
+				.then().as(StepVerifier::create).verifyComplete();
+
+		Duration exceededIdle = Duration.of(1, ChronoUnit.MILLIS);
+
+		connection.streamCommands().xPending(KEY_1_BBUFFER, "my-group", Range.open("-", "+"), 10L, exceededIdle)
+				.delaySubscription(Duration.ofMillis(100)).as(StepVerifier::create).assertNext(it -> {
+					assertThat(it.size()).isOne();
+					assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer");
+					assertThat(it.get(0).getGroupName()).isEqualTo("my-group");
+					assertThat(it.get(0).getTotalDeliveryCount()).isOne();
+					assertThat(it.get(0).getIdAsString()).isNotNull();
+				}).verifyComplete();
+	}
+
+	@ParameterizedRedisTest // GH-2046
+	void xPendingShouldLoadEmptyPendingMessagesForGroupAndIdleWhenDurationNotExceeded() {
+
+		String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1);
+		nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group");
+
+		nativeCommands.xadd(KEY_1, KEY_2, VALUE_2);
+
+		connection.streamCommands()
+				.xReadGroup(Consumer.from("my-group", "my-consumer"),
+						StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed()))
+				.then().as(StepVerifier::create).verifyComplete();
+
+		Duration notExceededIdle = Duration.ofMinutes(10);
+
+		connection.streamCommands().xPending(KEY_1_BBUFFER, "my-group", Range.open("-", "+"), 10L, notExceededIdle)
+				.delaySubscription(Duration.ofMillis(100))
+
+				.as(StepVerifier::create).assertNext(it -> {
+					assertThat(it.isEmpty()).isTrue();
+				}).verifyComplete();
+	}
+
+	@ParameterizedRedisTest // GH-2046
+	void xPendingShouldLoadPendingMessagesForGroupNameAndConsumerNameAndIdle() {
+
+		String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1);
+		nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group");
+
+		nativeCommands.xadd(KEY_1, KEY_2, VALUE_2);
+
+		connection.streamCommands()
+				.xReadGroup(Consumer.from("my-group", "my-consumer"),
+						StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed()))
+				.then().as(StepVerifier::create).verifyComplete();
+
+		Duration exceededIdle = Duration.ofMillis(1);
+
+		connection.streamCommands()
+				.xPending(KEY_1_BBUFFER, "my-group", "my-consumer", Range.open("-", "+"), 10L, exceededIdle)
+				.delaySubscription(Duration.ofMillis(100))
+
+				.as(StepVerifier::create).assertNext(it -> {
+					assertThat(it.size()).isOne();
+					assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer");
+					assertThat(it.get(0).getGroupName()).isEqualTo("my-group");
+					assertThat(it.get(0).getTotalDeliveryCount()).isOne();
+					assertThat(it.get(0).getIdAsString()).isNotNull();
+				}).verifyComplete();
+	}
+
+	@ParameterizedRedisTest // GH-2046
+	void xPendingShouldLoadEmptyPendingMessagesForGroupNameAndConsumerNameAndIdleWhenDurationNotExceeded() {
+
+		String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1);
+		nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group");
+
+		nativeCommands.xadd(KEY_1, KEY_2, VALUE_2);
+
+		connection.streamCommands()
+				.xReadGroup(Consumer.from("my-group", "my-consumer"),
+						StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed()))
+				.then().as(StepVerifier::create).verifyComplete();
+
+		Duration notExceededIdle = Duration.ofMinutes(10);
+
+		connection.streamCommands()
+				.xPending(KEY_1_BBUFFER, "my-group", "my-consumer", Range.open("-", "+"), 10L, notExceededIdle)
+				.delaySubscription(Duration.ofMillis(100))
+
+				.as(StepVerifier::create).assertNext(it -> {
+					assertThat(it.isEmpty()).isTrue();
+				}).verifyComplete();
+	}
+
+	@ParameterizedRedisTest // GH-2046
+	void xPendingShouldLoadPendingMessageesForConsumerAndIdle() {
+
+		String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1);
+		nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group");
+
+		nativeCommands.xadd(KEY_1, KEY_2, VALUE_2);
+
+		connection.streamCommands()
+				.xReadGroup(Consumer.from("my-group", "my-consumer"),
+						StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed()))
+				.then().as(StepVerifier::create).verifyComplete();
+
+		Duration exceededIdle = Duration.ofMillis(1);
+
+		connection.streamCommands()
+				.xPending(KEY_1_BBUFFER, Consumer.from("my-group", "my-consumer"), Range.open("-", "+"), 10L, exceededIdle)
+				.delaySubscription(Duration.ofMillis(100))
+
+				.as(StepVerifier::create).assertNext(it -> {
+					assertThat(it.size()).isOne();
+					assertThat(it.get(0).getConsumerName()).isEqualTo("my-consumer");
+					assertThat(it.get(0).getGroupName()).isEqualTo("my-group");
+					assertThat(it.get(0).getTotalDeliveryCount()).isOne();
+					assertThat(it.get(0).getIdAsString()).isNotNull();
+				}).verifyComplete();
+	}
+
+	@ParameterizedRedisTest // GH-2046
+	void xPendingShouldLoadEmptyPendingMessagesForConsumerAndIdleWhenDurationNotExceeded() {
+
+		String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1);
+		nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group");
+
+		nativeCommands.xadd(KEY_1, KEY_2, VALUE_2);
+
+		connection.streamCommands()
+				.xReadGroup(Consumer.from("my-group", "my-consumer"),
+						StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed()))
+				.then().as(StepVerifier::create).verifyComplete();
+
+		Duration notExceededIdle = Duration.ofMinutes(10);
+
+		connection.streamCommands()
+				.xPending(KEY_1_BBUFFER, Consumer.from("my-group", "my-consumer"), Range.open("-", "+"), 10L, notExceededIdle)
+				.delaySubscription(Duration.ofMillis(100)).as(StepVerifier::create).assertNext(it -> {
+					assertThat(it.isEmpty()).isTrue();
+				}).verifyComplete();
+	}
+
 	@ParameterizedRedisTest // DATAREDIS-1084
 	void xPendingShouldLoadEmptyPendingMessages() {