-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-14691; Add TopicId to OffsetFetch API #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1027,6 +1027,8 @@ class KafkaApis(val requestChannel: RequestChannel, | |
offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup, | ||
requireStable: Boolean | ||
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = { | ||
val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion) | ||
|
||
groupCoordinator.fetchAllOffsets( | ||
requestContext, | ||
offsetFetchRequest, | ||
|
@@ -1040,13 +1042,33 @@ class KafkaApis(val requestChannel: RequestChannel, | |
offsetFetchResponse | ||
} else { | ||
// Clients are not allowed to see offsets for topics that are not authorized for Describe. | ||
val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized( | ||
val authorizedNames = authHelper.filterByAuthorized( | ||
requestContext, | ||
DESCRIBE, | ||
TOPIC, | ||
offsetFetchResponse.topics.asScala | ||
)(_.name) | ||
offsetFetchResponse.setTopics(authorizedOffsets.asJava) | ||
|
||
val topics = new mutable.ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseTopics] | ||
offsetFetchResponse.topics.forEach { topic => | ||
if (authorizedNames.contains(topic.name)) { | ||
if (useTopicIds) { | ||
// If the topic is not provided by the group coordinator, we set it | ||
// using the metadata cache. | ||
if (topic.topicId == Uuid.ZERO_UUID) { | ||
topic.setTopicId(metadataCache.getTopicId(topic.name)) | ||
} | ||
// If we don't have the topic id at all, we skip the topic because | ||
// we can not serialize it without it. | ||
if (topic.topicId != Uuid.ZERO_UUID) { | ||
topics += topic | ||
} | ||
} else { | ||
topics += topic | ||
} | ||
} | ||
} | ||
offsetFetchResponse.setTopics(topics.asJava) | ||
} | ||
} | ||
} | ||
|
@@ -1056,14 +1078,53 @@ class KafkaApis(val requestChannel: RequestChannel, | |
offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Topic ID Support Version Handling: Please verify that the implementation correctly handles both topic IDs and topic names across different API versions, especially around version 10 which introduces topic ID support. The conditional logic paths based on version checks need careful verification to ensure backward compatibility is maintained. |
||
requireStable: Boolean | ||
): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = { | ||
val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion) | ||
|
||
if (useTopicIds) { | ||
offsetFetchRequest.topics.forEach { topic => | ||
if (topic.topicId != Uuid.ZERO_UUID) { | ||
metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) | ||
} | ||
} | ||
} | ||
|
||
// Clients are not allowed to see offsets for topics that are not authorized for Describe. | ||
val (authorizedTopics, unauthorizedTopics) = authHelper.partitionSeqByAuthorized( | ||
val authorizedTopicNames = authHelper.filterByAuthorized( | ||
requestContext, | ||
DESCRIBE, | ||
TOPIC, | ||
offsetFetchRequest.topics.asScala | ||
)(_.name) | ||
|
||
val authorizedTopics = new mutable.ArrayBuffer[OffsetFetchRequestData.OffsetFetchRequestTopics] | ||
val errorTopics = new mutable.ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseTopics] | ||
|
||
def buildErrorResponse( | ||
topic: OffsetFetchRequestData.OffsetFetchRequestTopics, | ||
error: Errors | ||
): OffsetFetchResponseData.OffsetFetchResponseTopics = { | ||
val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics() | ||
.setTopicId(topic.topicId) | ||
.setName(topic.name) | ||
topic.partitionIndexes.forEach { partitionIndex => | ||
topicResponse.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions() | ||
.setPartitionIndex(partitionIndex) | ||
.setCommittedOffset(-1) | ||
.setErrorCode(error.code)) | ||
} | ||
topicResponse | ||
} | ||
|
||
offsetFetchRequest.topics.forEach { topic => | ||
if (useTopicIds && topic.name.isEmpty) { | ||
errorTopics += buildErrorResponse(topic, Errors.UNKNOWN_TOPIC_ID) | ||
} else if (!authorizedTopicNames.contains(topic.name)) { | ||
errorTopics += buildErrorResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED) | ||
} else { | ||
authorizedTopics += topic | ||
} | ||
} | ||
|
||
groupCoordinator.fetchOffsets( | ||
requestContext, | ||
new OffsetFetchRequestData.OffsetFetchRequestGroup() | ||
|
@@ -1081,19 +1142,10 @@ class KafkaApis(val requestChannel: RequestChannel, | |
offsetFetchResponse | ||
} else { | ||
val topics = new util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]( | ||
offsetFetchResponse.topics.size + unauthorizedTopics.size | ||
offsetFetchResponse.topics.size + errorTopics.size | ||
) | ||
topics.addAll(offsetFetchResponse.topics) | ||
unauthorizedTopics.foreach { topic => | ||
val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name) | ||
topic.partitionIndexes.forEach { partitionIndex => | ||
topicResponse.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions() | ||
.setPartitionIndex(partitionIndex) | ||
.setCommittedOffset(-1) | ||
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)) | ||
} | ||
topics.add(topicResponse) | ||
} | ||
topics.addAll(errorTopics.asJava) | ||
offsetFetchResponse.setTopics(topics) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Authorization Logic with Topic IDs: This section modifies authorization logic to handle topic IDs. Please confirm that authorization checks work properly when using topic IDs instead of topic names, as this is security-critical functionality.