diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 907cba953fb21..23acdc43b5736 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
@@ -71,7 +72,8 @@ public Builder(String groupId,
                        boolean requireStable,
                        List<TopicPartition> partitions,
                        boolean throwOnFetchStableOffsetsUnsupported) {
-            super(ApiKeys.OFFSET_FETCH);
+            // It can only be used with topic names.
+            super(ApiKeys.OFFSET_FETCH, ApiKeys.OFFSET_FETCH.oldestVersion(), (short) 9);
 
             OffsetFetchRequestData.OffsetFetchRequestGroup group =
                 new OffsetFetchRequestData.OffsetFetchRequestGroup()
@@ -103,7 +105,8 @@ public Builder(String groupId,
         public Builder(Map<String, List<TopicPartition>> groupIdToTopicPartitionMap,
                        boolean requireStable,
                        boolean throwOnFetchStableOffsetsUnsupported) {
-            super(ApiKeys.OFFSET_FETCH);
+            // It can only be used with topic names.
+            super(ApiKeys.OFFSET_FETCH, ApiKeys.OFFSET_FETCH.oldestVersion(), (short) 9);
 
             List<OffsetFetchRequestGroup> groups = new ArrayList<>();
             for (Entry<String, List<TopicPartition>> entry : groupIdToTopicPartitionMap.entrySet()) {
@@ -134,6 +137,12 @@ public Builder(Map<String, List<TopicPartition>> groupIdToTopicPartitionMap,
             this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported;
         }
 
+        public Builder(OffsetFetchRequestData data, boolean throwOnFetchStableOffsetsUnsupported) {
+            super(ApiKeys.OFFSET_FETCH);
+            this.data = data;
+            this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported;
+        }
+
         @Override
         public OffsetFetchRequest build(short version) {
             if (data.groups().size() > 1 && version < 8) {
@@ -350,4 +359,8 @@ public boolean isAllPartitionsForGroup(String groupId) {
     public OffsetFetchRequestData data() {
         return data;
     }
+
+    public static boolean useTopicIds(short version) {
+        return version >= 10;
+    }
 }
diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json b/clients/src/main/resources/common/message/OffsetFetchRequest.json
index 88f5b568d724c..0fac6ad1c573f 100644
--- a/clients/src/main/resources/common/message/OffsetFetchRequest.json
+++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json
@@ -38,8 +38,11 @@
   //
   // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). It adds
   // the MemberId and MemberEpoch fields. Those are filled in and validated when the new consumer protocol is used.
-  "validVersions": "1-9",
+  //
+  // Version 10 adds support for topic ids (KIP-848).
+  "validVersions": "1-10",
   "flexibleVersions": "6+",
+  "latestVersionUnstable": true,
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",
       "about": "The group to fetch offsets for." },
@@ -60,8 +63,10 @@
         "about": "The member epoch if using the new consumer protocol (KIP-848)." },
       { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",
         "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
-        { "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
+        { "name": "Name", "type": "string", "versions": "8-9", "entityType": "topicName", "ignorable": true,
           "about": "The topic name."},
+        { "name":  "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
+          "about": "The topic ID." },
         { "name": "PartitionIndexes", "type": "[]int32", "versions": "8+",
           "about": "The partition indexes we would like to fetch offsets for." }
       ]}
diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json b/clients/src/main/resources/common/message/OffsetFetchResponse.json
index 9f0a5157cc424..c55466cdda72f 100644
--- a/clients/src/main/resources/common/message/OffsetFetchResponse.json
+++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json
@@ -38,7 +38,9 @@
   // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is
   // the same as version 8 but can return STALE_MEMBER_EPOCH and UNKNOWN_MEMBER_ID errors when the new consumer group
   // protocol is used.
-  "validVersions": "1-9",
+  //
+  // Version 10 adds support for topic ids (KIP-848).
+  "validVersions": "1-10",
   "flexibleVersions": "6+",
   // Supported errors:
   // - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -49,6 +51,7 @@
   // - UNSTABLE_OFFSET_COMMIT (version 7+)
   // - UNKNOWN_MEMBER_ID (version 9+)
   // - STALE_MEMBER_EPOCH (version 9+)
+  // - UNKNOWN_TOPIC_ID (version 10+)
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
@@ -78,8 +81,10 @@
         "about": "The group ID." },
       { "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+",
         "about": "The responses per topic.", "fields": [
-        { "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
+        { "name": "Name", "type": "string", "versions": "8-9", "entityType": "topicName", "ignorable": true,
           "about": "The topic name." },
+        { "name":  "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
+          "about": "The topic ID." },
         { "name": "Partitions", "type": "[]OffsetFetchResponsePartitions", "versions": "8+",
           "about": "The responses per partition.", "fields": [
           { "name": "PartitionIndex", "type": "int32", "versions": "8+",
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6a22963ac7d6a..b257ebfccd03c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1027,6 +1027,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
     requireStable: Boolean
   ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
+    val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion)
+
     groupCoordinator.fetchAllOffsets(
       requestContext,
       offsetFetchRequest,
@@ -1040,13 +1042,33 @@ class KafkaApis(val requestChannel: RequestChannel,
         offsetFetchResponse
       } else {
         // Clients are not allowed to see offsets for topics that are not authorized for Describe.
-        val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized(
+        val authorizedNames = authHelper.filterByAuthorized(
           requestContext,
           DESCRIBE,
           TOPIC,
           offsetFetchResponse.topics.asScala
         )(_.name)
-        offsetFetchResponse.setTopics(authorizedOffsets.asJava)
+
+        val topics = new mutable.ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseTopics]
+        offsetFetchResponse.topics.forEach { topic =>
+          if (authorizedNames.contains(topic.name)) {
+            if (useTopicIds) {
+              // If the topic is not provided by the group coordinator, we set it
+              // using the metadata cache.
+              if (topic.topicId == Uuid.ZERO_UUID) {
+                topic.setTopicId(metadataCache.getTopicId(topic.name))
+              }
+              // If we don't have the topic id at all, we skip the topic because
+              // we can not serialize it without it.
+              if (topic.topicId != Uuid.ZERO_UUID) {
+                topics += topic
+              }
+            } else {
+              topics += topic
+            }
+          }
+        }
+        offsetFetchResponse.setTopics(topics.asJava)
       }
     }
   }
@@ -1056,14 +1078,53 @@ class KafkaApis(val requestChannel: RequestChannel,
     offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup,
     requireStable: Boolean
   ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
+    val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion)
+
+    if (useTopicIds) {
+      offsetFetchRequest.topics.forEach { topic =>
+        if (topic.topicId != Uuid.ZERO_UUID) {
+          metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name))
+        }
+      }
+    }
+
     // Clients are not allowed to see offsets for topics that are not authorized for Describe.
-    val (authorizedTopics, unauthorizedTopics) = authHelper.partitionSeqByAuthorized(
+    val authorizedTopicNames = authHelper.filterByAuthorized(
       requestContext,
       DESCRIBE,
       TOPIC,
       offsetFetchRequest.topics.asScala
     )(_.name)
 
+    val authorizedTopics = new mutable.ArrayBuffer[OffsetFetchRequestData.OffsetFetchRequestTopics]
+    val errorTopics = new mutable.ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseTopics]
+
+    def buildErrorResponse(
+      topic: OffsetFetchRequestData.OffsetFetchRequestTopics,
+      error: Errors
+    ): OffsetFetchResponseData.OffsetFetchResponseTopics = {
+      val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics()
+        .setTopicId(topic.topicId)
+        .setName(topic.name)
+      topic.partitionIndexes.forEach { partitionIndex =>
+        topicResponse.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+          .setPartitionIndex(partitionIndex)
+          .setCommittedOffset(-1)
+          .setErrorCode(error.code))
+      }
+      topicResponse
+    }
+
+    offsetFetchRequest.topics.forEach { topic =>
+      if (useTopicIds && topic.name.isEmpty) {
+        errorTopics += buildErrorResponse(topic, Errors.UNKNOWN_TOPIC_ID)
+      } else if (!authorizedTopicNames.contains(topic.name)) {
+        errorTopics += buildErrorResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED)
+      } else {
+        authorizedTopics += topic
+      }
+    }
+
     groupCoordinator.fetchOffsets(
       requestContext,
       new OffsetFetchRequestData.OffsetFetchRequestGroup()
@@ -1081,19 +1142,10 @@ class KafkaApis(val requestChannel: RequestChannel,
         offsetFetchResponse
       } else {
         val topics = new util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics](
-          offsetFetchResponse.topics.size + unauthorizedTopics.size
+          offsetFetchResponse.topics.size + errorTopics.size
         )
         topics.addAll(offsetFetchResponse.topics)
-        unauthorizedTopics.foreach { topic =>
-          val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name)
-          topic.partitionIndexes.forEach { partitionIndex =>
-            topicResponse.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
-              .setPartitionIndex(partitionIndex)
-              .setCommittedOffset(-1)
-              .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code))
-          }
-          topics.add(topicResponse)
-        }
+        topics.addAll(errorTopics.asJava)
         offsetFetchResponse.setTopics(topics)
       }
     }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index c644043168438..2d52dd6301ac5 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -8136,17 +8136,37 @@ class KafkaApisTest extends Logging {
   @ParameterizedTest
   @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
   def testHandleOffsetFetchWithMultipleGroups(version: Short): Unit = {
+    val foo = "foo"
+    val bar = "bar"
+    val fooId = Uuid.randomUuid()
+    addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2)
+
     def makeRequest(version: Short): RequestChannel.Request = {
-      val groups = Map(
-        "group-1" -> List(
-          new TopicPartition("foo", 0),
-          new TopicPartition("foo", 1)
-        ).asJava,
-        "group-2" -> null,
-        "group-3" -> null,
-        "group-4" -> null,
-      ).asJava
-      buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version))
+      buildRequest(
+        new OffsetFetchRequest.Builder(
+          new OffsetFetchRequestData()
+            .setGroups(List(
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-1")
+                .setTopics(List(
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(foo)
+                    .setTopicId(fooId)
+                    .setPartitionIndexes(List[Integer](0, 1).asJava)
+                ).asJava),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-2")
+                .setTopics(null),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-3")
+                .setTopics(null),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-4")
+                .setTopics(null),
+            ).asJava),
+          false
+        ).build(version)
+      )
     }
 
     if (version < 8) {
@@ -8162,6 +8182,7 @@ class KafkaApisTest extends Logging {
           .setGroupId("group-1")
           .setTopics(List(
             new OffsetFetchRequestData.OffsetFetchRequestTopics()
+              .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID)
               .setName("foo")
               .setPartitionIndexes(List[Integer](0, 1).asJava)).asJava),
         false
@@ -8194,13 +8215,14 @@ class KafkaApisTest extends Logging {
         false
       )).thenReturn(group4Future)
       kafkaApis = createKafkaApis()
-      kafkaApis.handleOffsetFetchRequest(requestChannelRequest)
+      kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
       val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
         .setGroupId("group-1")
         .setTopics(List(
           new OffsetFetchResponseData.OffsetFetchResponseTopics()
-            .setName("foo")
+            .setTopicId(fooId)
+            .setName(foo)
             .setPartitions(List(
               new OffsetFetchResponseData.OffsetFetchResponsePartitions()
                 .setPartitionIndex(0)
@@ -8213,11 +8235,30 @@ class KafkaApisTest extends Logging {
             ).asJava)
         ).asJava)
 
+      val expectedGroup1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId("group-1")
+        .setTopics(List(
+          new OffsetFetchResponseData.OffsetFetchResponseTopics()
+            .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID)
+            .setName(if (version < 10) foo else "")
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100)
+                .setCommittedLeaderEpoch(1),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(1)
+                .setCommittedOffset(200)
+                .setCommittedLeaderEpoch(2)
+            ).asJava)
+        ).asJava)
+
+
       val group2Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
         .setGroupId("group-2")
         .setTopics(List(
           new OffsetFetchResponseData.OffsetFetchResponseTopics()
-            .setName("bar")
+            .setName(bar)
             .setPartitions(List(
               new OffsetFetchResponseData.OffsetFetchResponsePartitions()
                 .setPartitionIndex(0)
@@ -8242,7 +8283,7 @@ class KafkaApisTest extends Logging {
         .setGroupId("group-4")
         .setErrorCode(Errors.INVALID_GROUP_ID.code)
 
-      val expectedGroups = List(group1Response, group2Response, group3Response, group4Response)
+      val expectedGroups = List(expectedGroup1Response, group2Response, group3Response, group4Response)
 
       group1Future.complete(group1Response)
       group2Future.complete(group2Response)
@@ -8250,13 +8291,161 @@ class KafkaApisTest extends Logging {
       group4Future.complete(group4Response)
 
       val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
-      assertEquals(expectedGroups.toSet, response.data.groups().asScala.toSet)
+      assertEquals(expectedGroups.toSet, response.data.groups.asScala.toSet)
     }
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+  def testHandleOffsetFetchWithUnknownTopicIds(version: Short): Unit = {
+    // We only test with topic ids.
+    if (version < 10) return
+
+    val foo = "foo"
+    val bar = "bar"
+    val fooId = Uuid.randomUuid()
+    val barId = Uuid.randomUuid()
+    addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2)
+
+    def makeRequest(version: Short): RequestChannel.Request = {
+      buildRequest(
+        new OffsetFetchRequest.Builder(
+          new OffsetFetchRequestData()
+            .setGroups(List(
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-1")
+                .setTopics(List(
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(foo)
+                    .setTopicId(fooId)
+                    .setPartitionIndexes(List[Integer](0).asJava),
+                  // bar does not exist so it must return UNKNOWN_TOPIC_ID.
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(bar)
+                    .setTopicId(barId)
+                    .setPartitionIndexes(List[Integer](0).asJava)
+                ).asJava),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-2")
+                .setTopics(null)
+            ).asJava),
+          false
+        ).build(version)
+      )
+    }
+
+    val requestChannelRequest = makeRequest(version)
+
+    val group1Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
+    when(groupCoordinator.fetchOffsets(
+      requestChannelRequest.context,
+      new OffsetFetchRequestData.OffsetFetchRequestGroup()
+        .setGroupId("group-1")
+        .setTopics(List(
+          new OffsetFetchRequestData.OffsetFetchRequestTopics()
+            .setTopicId(fooId)
+            .setName("foo")
+            .setPartitionIndexes(List[Integer](0).asJava)).asJava),
+      false
+    )).thenReturn(group1Future)
+
+    val group2Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
+    when(groupCoordinator.fetchAllOffsets(
+      requestChannelRequest.context,
+      new OffsetFetchRequestData.OffsetFetchRequestGroup()
+        .setGroupId("group-2")
+        .setTopics(null),
+      false
+    )).thenReturn(group2Future)
+
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+      .setGroupId("group-1")
+      .setTopics(List(
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setTopicId(fooId)
+          .setName(foo)
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommittedLeaderEpoch(1)
+          ).asJava)
+      ).asJava)
+
+    val group2Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+      .setGroupId("group-2")
+      .setTopics(List(
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setName(foo)
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommittedLeaderEpoch(1)
+          ).asJava),
+        // bar does not exist so it must be filtered out.
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setName(bar)
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommittedLeaderEpoch(1)
+          ).asJava)
+      ).asJava)
+
+    val expectedResponse = new OffsetFetchResponseData()
+      .setGroups(List(
+        new OffsetFetchResponseData.OffsetFetchResponseGroup()
+          .setGroupId("group-1")
+          .setTopics(List(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+              .setTopicId(fooId)
+              .setPartitions(List(
+                new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                  .setPartitionIndex(0)
+                  .setCommittedOffset(100)
+                  .setCommittedLeaderEpoch(1)
+              ).asJava),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+              .setTopicId(barId)
+              .setPartitions(List(
+                new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                  .setPartitionIndex(0)
+                  .setCommittedOffset(-1)
+                  .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+              ).asJava)
+          ).asJava),
+        new OffsetFetchResponseData.OffsetFetchResponseGroup()
+          .setGroupId("group-2")
+          .setTopics(List(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+              .setTopicId(fooId)
+              .setPartitions(List(
+                new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                  .setPartitionIndex(0)
+                  .setCommittedOffset(100)
+                  .setCommittedLeaderEpoch(1)
+              ).asJava)
+          ).asJava)
+      ).asJava)
+
+    group1Future.complete(group1Response)
+    group2Future.complete(group2Response)
+
+    val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+    assertEquals(expectedResponse, response.data)
+  }
+
   @ParameterizedTest
   @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
   def testHandleOffsetFetchWithSingleGroup(version: Short): Unit = {
+    // The single group builder does not support topic ids.
+    if (version >= 10) return
+
     def makeRequest(version: Short): RequestChannel.Request = {
       buildRequest(new OffsetFetchRequest.Builder(
         "group-1",
@@ -8331,10 +8520,14 @@ class KafkaApisTest extends Logging {
   @ParameterizedTest
   @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
   def testHandleOffsetFetchAllOffsetsWithSingleGroup(version: Short): Unit = {
-    // Version 0 gets offsets from Zookeeper. Version 1 does not support fetching all
-    // offsets request. We are not interested in testing these here.
+    // Version 1 does not support fetching all offsets request. We are not
+    // interested in testing these here.
     if (version < 2) return
 
+    val foo = "foo"
+    val fooId = Uuid.randomUuid()
+    addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2)
+
     def makeRequest(version: Short): RequestChannel.Request = {
       buildRequest(new OffsetFetchRequest.Builder(
         "group-1",
@@ -8361,7 +8554,7 @@ class KafkaApisTest extends Logging {
       .setGroupId("group-1")
       .setTopics(List(
         new OffsetFetchResponseData.OffsetFetchResponseTopics()
-          .setName("foo")
+          .setName(foo)
           .setPartitions(List(
             new OffsetFetchResponseData.OffsetFetchResponsePartitions()
               .setPartitionIndex(0)
@@ -8376,7 +8569,25 @@ class KafkaApisTest extends Logging {
 
     val expectedOffsetFetchResponse = if (version >= 8) {
       new OffsetFetchResponseData()
-        .setGroups(List(group1Response).asJava)
+        .setGroups(List(
+          new OffsetFetchResponseData.OffsetFetchResponseGroup()
+            .setGroupId("group-1")
+            .setTopics(List(
+              new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName(if (version < 10) foo else "")
+                .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID)
+                .setPartitions(List(
+                  new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                    .setPartitionIndex(0)
+                    .setCommittedOffset(100)
+                    .setCommittedLeaderEpoch(1),
+                  new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                    .setPartitionIndex(1)
+                    .setCommittedOffset(200)
+                    .setCommittedLeaderEpoch(2)
+                ).asJava)
+            ).asJava)
+        ).asJava)
     } else {
       new OffsetFetchResponseData()
         .setTopics(List(
@@ -8401,25 +8612,61 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedOffsetFetchResponse, response.data)
   }
 
-  @Test
-  def testHandleOffsetFetchAuthorization(): Unit = {
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+  def testHandleOffsetFetchAuthorization(version: Short): Unit = {
+    // We don't test the non batched API.
+    if (version < 8) return
+
+    val foo = "foo"
+    val bar = "bar"
+    val fooId = Uuid.randomUuid()
+    val barId = Uuid.randomUuid()
+    addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2)
+    addTopicToMetadataCache(bar, topicId = barId, numPartitions = 2)
+
     def makeRequest(version: Short): RequestChannel.Request = {
-      val groups = Map(
-        "group-1" -> List(
-          new TopicPartition("foo", 0),
-          new TopicPartition("bar", 0)
-        ).asJava,
-        "group-2" -> List(
-          new TopicPartition("foo", 0),
-          new TopicPartition("bar", 0)
-        ).asJava,
-        "group-3" -> null,
-        "group-4" -> null,
-      ).asJava
-      buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version))
+      buildRequest(
+        new OffsetFetchRequest.Builder(
+          new OffsetFetchRequestData()
+            .setGroups(List(
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-1")
+                .setTopics(List(
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(foo)
+                    .setTopicId(fooId)
+                    .setPartitionIndexes(List[Integer](0).asJava),
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(bar)
+                    .setTopicId(barId)
+                    .setPartitionIndexes(List[Integer](0).asJava)
+                ).asJava),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-2")
+                .setTopics(List(
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(foo)
+                    .setTopicId(fooId)
+                    .setPartitionIndexes(List[Integer](0).asJava),
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(bar)
+                    .setTopicId(barId)
+                    .setPartitionIndexes(List[Integer](0).asJava)
+                ).asJava),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-3")
+                .setTopics(null),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-4")
+                .setTopics(null),
+            ).asJava),
+          false
+        ).build(version)
+      )
     }
 
-    val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
+    val requestChannelRequest = makeRequest(version)
 
     val authorizer: Authorizer = mock(classOf[Authorizer])
 
@@ -8449,7 +8696,8 @@ class KafkaApisTest extends Logging {
       new OffsetFetchRequestData.OffsetFetchRequestGroup()
         .setGroupId("group-1")
         .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
-          .setName("bar")
+          .setName(bar)
+          .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
           .setPartitionIndexes(List[Integer](0).asJava)).asJava),
       false
     )).thenReturn(group1Future)
@@ -8470,7 +8718,8 @@ class KafkaApisTest extends Logging {
       .setGroupId("group-1")
       .setTopics(List(
         new OffsetFetchResponseData.OffsetFetchResponseTopics()
-          .setName("bar")
+          .setName(bar)
+          .setTopicId(barId)
           .setPartitions(List(
             new OffsetFetchResponseData.OffsetFetchResponsePartitions()
               .setPartitionIndex(0)
@@ -8484,7 +8733,8 @@ class KafkaApisTest extends Logging {
       .setTopics(List(
         // foo should be filtered out.
         new OffsetFetchResponseData.OffsetFetchResponseTopics()
-          .setName("foo")
+          .setName(foo)
+          .setTopicId(fooId)
           .setPartitions(List(
             new OffsetFetchResponseData.OffsetFetchResponsePartitions()
               .setPartitionIndex(0)
@@ -8492,7 +8742,8 @@ class KafkaApisTest extends Logging {
               .setCommittedLeaderEpoch(1)
           ).asJava),
         new OffsetFetchResponseData.OffsetFetchResponseTopics()
-          .setName("bar")
+          .setName(bar)
+          .setTopicId(barId)
           .setPartitions(List(
             new OffsetFetchResponseData.OffsetFetchResponsePartitions()
               .setPartitionIndex(0)
@@ -8508,7 +8759,8 @@ class KafkaApisTest extends Logging {
           .setGroupId("group-1")
           .setTopics(List(
             new OffsetFetchResponseData.OffsetFetchResponseTopics()
-              .setName("bar")
+              .setName(if (version < 10) bar else "")
+              .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
               .setPartitions(List(
                 new OffsetFetchResponseData.OffsetFetchResponsePartitions()
                   .setPartitionIndex(0)
@@ -8516,7 +8768,8 @@ class KafkaApisTest extends Logging {
                   .setCommittedLeaderEpoch(1)
               ).asJava),
             new OffsetFetchResponseData.OffsetFetchResponseTopics()
-              .setName("foo")
+              .setName(if (version < 10) foo else "")
+              .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID)
               .setPartitions(List(
                 new OffsetFetchResponseData.OffsetFetchResponsePartitions()
                   .setPartitionIndex(0)
@@ -8533,7 +8786,8 @@ class KafkaApisTest extends Logging {
           .setGroupId("group-3")
           .setTopics(List(
             new OffsetFetchResponseData.OffsetFetchResponseTopics()
-              .setName("bar")
+              .setName(if (version < 10) bar else "")
+              .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
               .setPartitions(List(
                 new OffsetFetchResponseData.OffsetFetchResponsePartitions()
                   .setPartitionIndex(0)
@@ -8554,23 +8808,55 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedOffsetFetchResponse, response.data)
   }
 
-  @Test
-  def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(): Unit = {
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+  def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(version: Short): Unit = {
+    // We don't test the non batched API.
+    if (version < 8) return
+
+    val foo = "foo"
+    val bar = "bar"
+    val fooId = Uuid.randomUuid()
+    val barId = Uuid.randomUuid()
+    addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2)
+    addTopicToMetadataCache(bar, topicId = barId, numPartitions = 2)
+
     def makeRequest(version: Short): RequestChannel.Request = {
-      val groups = Map(
-        "group-1" -> List(
-          new TopicPartition("foo", 0),
-          new TopicPartition("bar", 0)
-        ).asJava,
-        "group-2" -> List(
-          new TopicPartition("foo", 0),
-          new TopicPartition("bar", 0)
-        ).asJava
-      ).asJava
-      buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version))
+      buildRequest(
+        new OffsetFetchRequest.Builder(
+          new OffsetFetchRequestData()
+            .setGroups(List(
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-1")
+                .setTopics(List(
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(foo)
+                    .setTopicId(fooId)
+                    .setPartitionIndexes(List[Integer](0).asJava),
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(bar)
+                    .setTopicId(barId)
+                    .setPartitionIndexes(List[Integer](0).asJava)
+                ).asJava),
+              new OffsetFetchRequestData.OffsetFetchRequestGroup()
+                .setGroupId("group-2")
+                .setTopics(List(
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(foo)
+                    .setTopicId(fooId)
+                    .setPartitionIndexes(List[Integer](0).asJava),
+                  new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                    .setName(bar)
+                    .setTopicId(barId)
+                    .setPartitionIndexes(List[Integer](0).asJava)
+                ).asJava)
+            ).asJava),
+          false
+        ).build(version)
+      )
     }
 
-    val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
+    val requestChannelRequest = makeRequest(version)
 
     val authorizer: Authorizer = mock(classOf[Authorizer])
 
@@ -8598,7 +8884,8 @@ class KafkaApisTest extends Logging {
       new OffsetFetchRequestData.OffsetFetchRequestGroup()
         .setGroupId("group-1")
         .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
-          .setName("bar")
+          .setName(bar)
+          .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
           .setPartitionIndexes(List[Integer](0).asJava)).asJava),
       false
     )).thenReturn(group1Future)
@@ -8609,7 +8896,8 @@ class KafkaApisTest extends Logging {
       new OffsetFetchRequestData.OffsetFetchRequestGroup()
         .setGroupId("group-2")
         .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
-          .setName("bar")
+          .setName(bar)
+          .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
           .setPartitionIndexes(List[Integer](0).asJava)).asJava),
       false
     )).thenReturn(group1Future)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 2b50071a7f771..eb5f1b07b8216 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.StaleMemberEpochException;
@@ -739,7 +740,9 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchOffsets(
 
         request.topics().forEach(topic -> {
             final OffsetFetchResponseData.OffsetFetchResponseTopics topicResponse =
-                new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name());
+                new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                    .setTopicId(topic.topicId())
+                    .setName(topic.name());
             topicResponses.add(topicResponse);
 
             final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets = groupOffsets == null ?
@@ -809,7 +812,11 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets(
                 final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets = topicEntry.getValue();
 
                 final OffsetFetchResponseData.OffsetFetchResponseTopics topicResponse =
-                    new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic);
+                    new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                        // It is set to zero for now but it will be set to the persisted
+                        // topic id along the committed offset, if present.
+                        .setTopicId(Uuid.ZERO_UUID)
+                        .setName(topic);
                 topicResponses.add(topicResponse);
 
                 topicOffsets.entrySet(lastCommittedOffset).forEach(partitionEntry -> {
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 6f788d84fd009..90d1a74122f88 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -1765,6 +1765,44 @@ public void testFetchOffsetsWithUnknownGroup() {
         assertEquals(expectedResponse, context.fetchOffsets("group", request, Long.MAX_VALUE));
     }
 
+    @Test
+    public void testFetchOffsetsWithTopicIds() {
+        Uuid fooId = Uuid.randomUuid();
+        Uuid barId = Uuid.randomUuid();
+        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
+
+        context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
+
+        context.commitOffset("group", "foo", 0, 100L, 1);
+        context.commitOffset("group", "bar", 0, 200L, 1);
+
+        List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList(
+            new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("foo")
+                .setTopicId(fooId)
+                .setPartitionIndexes(List.of(0)),
+            new OffsetFetchRequestData.OffsetFetchRequestTopics()
+                .setName("bar")
+                .setTopicId(barId)
+                .setPartitionIndexes(List.of(0))
+        );
+
+        assertEquals(List.of(
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("foo")
+                .setTopicId(fooId)
+                .setPartitions(List.of(
+                    mkOffsetPartitionResponse(0, 100L, 1, "metadata")
+                )),
+            new OffsetFetchResponseData.OffsetFetchResponseTopics()
+                .setName("bar")
+                .setTopicId(barId)
+                .setPartitions(List.of(
+                    mkOffsetPartitionResponse(0, 200L, 1, "metadata")
+                ))
+        ), context.fetchOffsets("group", request, Long.MAX_VALUE));
+    }
+
     @Test
     public void testFetchOffsetsAtDifferentCommittedOffset() {
         OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();