diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 2b50071a7f771..ab3cd89c63fe5 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -194,9 +194,16 @@ public OffsetMetadataManager build() {
 
     /**
      * The open transactions (producer ids) keyed by group.
+     * Tracks whether groups have any open transactions.
      */
     private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup;
 
+    /**
+     * The open transactions (producer ids) keyed by group id, topic name and partition id.
+     * Tracks whether partitions have any pending transactional offsets.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroupTopicAndPartition;
+
     private class Offsets {
         /**
          * The offsets keyed by group id, topic name and partition id.
@@ -281,6 +288,7 @@ private OffsetAndMetadata remove(
         this.offsets = new Offsets();
         this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
         this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.openTransactionsByGroupTopicAndPartition = new TimelineHashMap<>(snapshotRegistry, 0);
     }
 
     /**
@@ -650,24 +658,18 @@ public int deleteAllOffsets(
         // Delete all the pending transactional offsets too. Here we only write a tombstone
         // if the topic-partition was not in the main storage because we don't need to write
         // two consecutive tombstones.
-        TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
-        if (openTransactions != null) {
-            openTransactions.forEach(producerId -> {
-                Offsets pendingOffsets = pendingTransactionalOffsets.get(producerId);
-                if (pendingOffsets != null) {
-                    TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> pendingGroupOffsets =
-                        pendingOffsets.offsetsByGroup.get(groupId);
-                    if (pendingGroupOffsets != null) {
-                        pendingGroupOffsets.forEach((topic, offsetsByPartition) -> {
-                            offsetsByPartition.keySet().forEach(partition -> {
-                                if (!hasCommittedOffset(groupId, topic, partition)) {
-                                    records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition));
-                                    numDeletedOffsets.getAndIncrement();
-                                }
-                            });
-                        });
-                    }
-                }
+        TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
+            openTransactionsByGroupTopicAndPartition.get(groupId);
+        if (openTransactionsByTopic != null) {
+            openTransactionsByTopic.forEach((topic, openTransactionsByPartition) -> {
+                openTransactionsByPartition.forEach((partition, producerIds) -> {
+                    producerIds.forEach(producerId -> {
+                        if (!hasCommittedOffset(groupId, topic, partition)) {
+                            records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition));
+                            numDeletedOffsets.getAndIncrement();
+                        }
+                    });
+                });
             });
         }
 
@@ -685,17 +687,15 @@ boolean hasPendingTransactionalOffsets(
         String topic,
         int partition
     ) {
-        final TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
-        if (openTransactions == null) return false;
+        TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
+            openTransactionsByGroupTopicAndPartition.get(groupId);
+        if (openTransactionsByTopic == null) return false;
 
-        for (Long producerId : openTransactions) {
-            Offsets offsets = pendingTransactionalOffsets.get(producerId);
-            if (offsets != null && offsets.get(groupId, topic, partition) != null) {
-                return true;
-            }
-        }
+        TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic);
+        if (openTransactionsByPartition == null) return false;
 
-        return false;
+        TimelineHashSet<Long> openTransactions = openTransactionsByPartition.get(partition);
+        return openTransactions != null && !openTransactions.isEmpty();
     }
 
     /**
@@ -1005,6 +1005,11 @@ public void replay(
                 openTransactionsByGroup
                     .computeIfAbsent(groupId, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
                     .add(producerId);
+                openTransactionsByGroupTopicAndPartition
+                    .computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 1))
+                    .computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 1))
+                    .computeIfAbsent(partition, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
+                    .add(producerId);
             }
         } else {
             if (offsets.remove(groupId, topic, partition) != null) {
@@ -1012,14 +1017,29 @@ public void replay(
             }
 
             // Remove all the pending offset commits related to the tombstone.
-            TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
-            if (openTransactions != null) {
-                openTransactions.forEach(openProducerId -> {
-                    Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId);
-                    if (pendingOffsets != null) {
-                        pendingOffsets.remove(groupId, topic, partition);
+            TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
+                openTransactionsByGroupTopicAndPartition.get(groupId);
+            if (openTransactionsByTopic != null) {
+                TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic);
+                if (openTransactionsByPartition != null) {
+                    TimelineHashSet<Long> openTransactions = openTransactionsByPartition.get(partition);
+                    if (openTransactions != null) {
+                        openTransactions.forEach(openProducerId -> {
+                            Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId);
+                            if (pendingOffsets != null) {
+                                pendingOffsets.remove(groupId, topic, partition);
+                            }
+                        });
+
+                        openTransactionsByPartition.remove(partition);
+                        if (openTransactionsByPartition.isEmpty()) {
+                            openTransactionsByTopic.remove(topic);
+                        }
+                        if (openTransactionsByTopic.isEmpty()) {
+                            openTransactionsByGroupTopicAndPartition.remove(groupId);
+                        }
                     }
-                });
+                }
             }
         }
     }
@@ -1031,6 +1051,7 @@ public void replay(
      * @param result        The result of the transaction.
      * @throws RuntimeException if the transaction can not be completed.
      */
+    @SuppressWarnings("NPathComplexity")
     public void replayEndTransactionMarker(
         long producerId,
         TransactionResult result
@@ -1043,14 +1064,39 @@ public void replayEndTransactionMarker(
             return;
         }
 
-        pendingOffsets.offsetsByGroup.keySet().forEach(groupId -> {
-            TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
-            if (openTransactions != null) {
-                openTransactions.remove(producerId);
-                if (openTransactions.isEmpty()) {
+        pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
+            TimelineHashSet<Long> groupTransactions = openTransactionsByGroup.get(groupId);
+            if (groupTransactions != null) {
+                groupTransactions.remove(producerId);
+                if (groupTransactions.isEmpty()) {
                     openTransactionsByGroup.remove(groupId);
                 }
             }
+
+            TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
+                openTransactionsByGroupTopicAndPartition.get(groupId);
+            if (openTransactionsByTopic == null) return;
+
+            topicOffsets.forEach((topic, partitionOffsets) -> {
+                TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic);
+                if (openTransactionsByPartition == null) return;
+
+                partitionOffsets.keySet().forEach(partitionId -> {
+                    TimelineHashSet<Long> partitionTransactions = openTransactionsByPartition.get(partitionId);
+                    if (partitionTransactions != null) {
+                        partitionTransactions.remove(producerId);
+                        if (partitionTransactions.isEmpty()) {
+                            openTransactionsByPartition.remove(partitionId);
+                        }
+                        if (openTransactionsByPartition.isEmpty()) {
+                            openTransactionsByTopic.remove(topic);
+                        }
+                        if (openTransactionsByTopic.isEmpty()) {
+                            openTransactionsByGroupTopicAndPartition.remove(groupId);
+                        }
+                    }
+                });
+            });
         });
 
         if (result == TransactionResult.COMMIT) {