diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 2b50071a7f771..e50ec78298bac 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -198,12 +198,19 @@ public OffsetMetadataManager build() {
     private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup;
 
     private class Offsets {
+        /**
+         * Whether to preserve empty entries for groups when removing offsets.
+         * We use this to keep track of the groups associated with pending transactions.
+         */
+        private final boolean preserveGroups;
+
         /**
          * The offsets keyed by group id, topic name and partition id.
          */
         private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup;
 
-        private Offsets() {
+        private Offsets(boolean preserveGroups) {
+            this.preserveGroups = preserveGroups;
             this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
         }
 
@@ -256,7 +263,7 @@ private OffsetAndMetadata remove(
             if (partitionOffsets.isEmpty())
                 topicOffsets.remove(topic);
 
-            if (topicOffsets.isEmpty())
+            if (!preserveGroups && topicOffsets.isEmpty())
                 offsetsByGroup.remove(groupId);
 
             return removedValue;
@@ -278,7 +285,7 @@ private OffsetAndMetadata remove(
         this.groupMetadataManager = groupMetadataManager;
         this.config = config;
         this.metrics = metrics;
-        this.offsets = new Offsets();
+        this.offsets = new Offsets(false);
         this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
         this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
     }
@@ -995,7 +1002,7 @@ public void replay(
                 // offsets store. Pending offsets there are moved to the main store when
                 // the transaction is committed; or removed when the transaction is aborted.
                 pendingTransactionalOffsets
-                    .computeIfAbsent(producerId, __ -> new Offsets())
+                    .computeIfAbsent(producerId, __ -> new Offsets(true))
                     .put(
                         groupId,
                         topic,
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 6f788d84fd009..65d8f5514dec1 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -2593,6 +2593,67 @@ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() {
         assertEquals(List.of(), records);
     }
 
+    @Test
+    public void testCleanupExpiredOffsetsWithDeletedPendingTransactionalOffsets() {
+        GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
+        Group group = mock(Group.class);
+
+        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
+            .withGroupMetadataManager(groupMetadataManager)
+            .withOffsetsRetentionMinutes(1)
+            .build();
+
+        long commitTimestamp = context.time.milliseconds();
+
+        context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
+        context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, commitTimestamp + 500);
+
+        when(groupMetadataManager.group("group-id")).thenReturn(group);
+        when(group.offsetExpirationCondition()).thenReturn(Optional.of(
+            new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)));
+        when(group.isSubscribedToTopic("foo")).thenReturn(false);
+
+        // Delete the pending transactional offset.
+        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection =
+            new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(List.of(
+                new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+                    .setName("foo")
+                    .setPartitions(List.of(
+                        new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(1)
+                    ))
+            ).iterator());
+        CoordinatorResult<OffsetDeleteResponseData, CoordinatorRecord> result = context.deleteOffsets(
+            new OffsetDeleteRequestData()
+                .setGroupId("group-id")
+                .setTopics(requestTopicCollection)
+        );
+        List<CoordinatorRecord> expectedRecords = List.of(
+            GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1)
+        );
+        assertEquals(expectedRecords, result.records());
+
+        context.time.sleep(Duration.ofMinutes(1).toMillis());
+
+        // The group should not be deleted because it has a pending transaction.
+        expectedRecords = List.of(
+            GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 0)
+        );
+        List<CoordinatorRecord> records = new ArrayList<>();
+        assertFalse(context.cleanupExpiredOffsets("group-id", records));
+        assertEquals(expectedRecords, records);
+
+        // Commit the ongoing transaction.
+        context.replayEndTransactionMarker(10L, TransactionResult.COMMIT);
+
+        // The group should be deletable now.
+        context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
+        context.time.sleep(Duration.ofMinutes(1).toMillis());
+
+        records = new ArrayList<>();
+        assertTrue(context.cleanupExpiredOffsets("group-id", records));
+        assertEquals(expectedRecords, records);
+    }
+
     private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(
         int partition,
         long offset,