Skip to content

Commit 2977cb1

Browse files
KAFKA-17747: [6/N] Replace subscription metadata with metadata hash in share group (#19796)
* Use metadata hash to replace subscription metadata. * Remove `ShareGroupPartitionMetadataKey` and `ShareGroupPartitionMetadataValue`. * Use `subscriptionTopicNames` and `metadataImage` to replace `subscriptionMetadata` in `subscribedTopicsChangeMap` function. Reviewers: Chia-Ping Tsai <[email protected]>, David Jacot <[email protected]>, Andrew Schofield <[email protected]> --------- Signed-off-by: PoAn Yang <[email protected]>
1 parent 82ea9d0 commit 2977cb1

File tree

18 files changed

+95
-1050
lines changed

18 files changed

+95
-1050
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,12 @@
4444
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
4545
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
4646
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
47-
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
48-
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
4947
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
5048
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
5149
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
5250
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
5351
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
5452
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
55-
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
5653
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
5754
import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
5855
import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue;
@@ -569,51 +566,6 @@ public static CoordinatorRecord newShareGroupMemberSubscriptionTombstoneRecord(
569566
);
570567
}
571568

572-
/**
573-
* Creates a ShareGroupPartitionMetadata record.
574-
*
575-
* @param groupId The group id.
576-
* @param newSubscriptionMetadata The subscription metadata.
577-
* @return The record.
578-
*/
579-
public static CoordinatorRecord newShareGroupSubscriptionMetadataRecord(
580-
String groupId,
581-
Map<String, TopicMetadata> newSubscriptionMetadata
582-
) {
583-
ShareGroupPartitionMetadataValue value = new ShareGroupPartitionMetadataValue();
584-
newSubscriptionMetadata.forEach((topicName, topicMetadata) ->
585-
value.topics().add(new ShareGroupPartitionMetadataValue.TopicMetadata()
586-
.setTopicId(topicMetadata.id())
587-
.setTopicName(topicMetadata.name())
588-
.setNumPartitions(topicMetadata.numPartitions())
589-
)
590-
);
591-
592-
return CoordinatorRecord.record(
593-
new ShareGroupPartitionMetadataKey()
594-
.setGroupId(groupId),
595-
new ApiMessageAndVersion(
596-
value,
597-
(short) 0
598-
)
599-
);
600-
}
601-
602-
/**
603-
* Creates a ShareGroupPartitionMetadata tombstone.
604-
*
605-
* @param groupId The group id.
606-
* @return The record.
607-
*/
608-
public static CoordinatorRecord newShareGroupSubscriptionMetadataTombstoneRecord(
609-
String groupId
610-
) {
611-
return CoordinatorRecord.tombstone(
612-
new ShareGroupPartitionMetadataKey()
613-
.setGroupId(groupId)
614-
);
615-
}
616-
617569
/**
618570
* Creates a ShareGroupMetadata record.
619571
*

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,6 @@
9595
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
9696
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
9797
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
98-
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
99-
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
10098
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
10199
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
102100
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
@@ -1234,13 +1232,6 @@ public void replay(
12341232
);
12351233
break;
12361234

1237-
case SHARE_GROUP_PARTITION_METADATA:
1238-
groupMetadataManager.replay(
1239-
(ShareGroupPartitionMetadataKey) key,
1240-
(ShareGroupPartitionMetadataValue) Utils.messageOrNull(value)
1241-
);
1242-
break;
1243-
12441235
case SHARE_GROUP_MEMBER_METADATA:
12451236
groupMetadataManager.replay(
12461237
(ShareGroupMemberMetadataKey) key,

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 41 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,6 @@
115115
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
116116
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
117117
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
118-
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
119-
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
120118
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
121119
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
122120
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
@@ -141,7 +139,6 @@
141139
import org.apache.kafka.coordinator.group.modern.ModernGroup;
142140
import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
143141
import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder;
144-
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
145142
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
146143
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
147144
import org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuilder;
@@ -235,7 +232,6 @@
235232
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord;
236233
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord;
237234
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord;
238-
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord;
239235
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord;
240236
import static org.apache.kafka.coordinator.group.Utils.assignmentToString;
241237
import static org.apache.kafka.coordinator.group.Utils.ofSentinel;
@@ -2541,18 +2537,18 @@ private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<In
25412537
) || initializedAssignmentPending(group);
25422538

25432539
int groupEpoch = group.groupEpoch();
2544-
Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
2540+
Map<String, SubscriptionCount> subscribedTopicNamesMap = group.subscribedTopicNames();
25452541
SubscriptionType subscriptionType = group.subscriptionType();
25462542

25472543
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
25482544
// The subscription metadata is updated in two cases:
25492545
// 1) The member has updated its subscriptions;
25502546
// 2) The refresh deadline has been reached.
2551-
Map<String, SubscriptionCount> subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember);
2552-
subscriptionMetadata = group.computeSubscriptionMetadata(
2547+
subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember);
2548+
long groupMetadataHash = ModernGroup.computeMetadataHash(
25532549
subscribedTopicNamesMap,
2554-
metadataImage.topics(),
2555-
metadataImage.cluster()
2550+
topicHashCache,
2551+
metadataImage
25562552
);
25572553

25582554
int numMembers = group.numMembers();
@@ -2565,17 +2561,16 @@ private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<In
25652561
numMembers
25662562
);
25672563

2568-
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
2569-
log.info("[GroupId {}] Computed new subscription metadata: {}.",
2570-
groupId, subscriptionMetadata);
2564+
if (groupMetadataHash != group.metadataHash()) {
2565+
log.info("[GroupId {}] Computed new metadata hash: {}.",
2566+
groupId, groupMetadataHash);
25712567
bumpGroupEpoch = true;
2572-
records.add(newShareGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
25732568
}
25742569

25752570
if (bumpGroupEpoch) {
25762571
groupEpoch += 1;
2577-
records.add(newShareGroupEpochRecord(groupId, groupEpoch, 0));
2578-
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
2572+
records.add(newShareGroupEpochRecord(groupId, groupEpoch, groupMetadataHash));
2573+
log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", groupId, groupEpoch, groupMetadataHash);
25792574
}
25802575

25812576
group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
@@ -2631,7 +2626,7 @@ private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<In
26312626
records,
26322627
Map.entry(
26332628
response,
2634-
maybeCreateInitializeShareGroupStateRequest(groupId, groupEpoch, subscriptionMetadata, records)
2629+
maybeCreateInitializeShareGroupStateRequest(groupId, groupEpoch, subscribedTopicNamesMap.keySet(), records)
26352630
)
26362631
);
26372632
}
@@ -2664,13 +2659,13 @@ private boolean initializedAssignmentPending(ShareGroup group) {
26642659
* Computes the diff between the subscribed metadata and the initialized share topic
26652660
* partitions corresponding to a share group.
26662661
*
2667-
* @param groupId The share group id for which diff is being calculated
2668-
* @param subscriptionMetadata The subscription metadata corresponding to the share group.
2662+
* @param groupId The share group id for which diff is being calculated
2663+
* @param subscriptionTopicNames The subscription topic names to the share group.
26692664
* @return A map of topic partitions which are subscribed by the share group but not initialized yet.
26702665
*/
26712666
// Visibility for testing
2672-
Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, Map<String, TopicMetadata> subscriptionMetadata) {
2673-
if (subscriptionMetadata == null || subscriptionMetadata.isEmpty()) {
2667+
Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, Set<String> subscriptionTopicNames) {
2668+
if (subscriptionTopicNames == null || subscriptionTopicNames.isEmpty()) {
26742669
return Map.of();
26752670
}
26762671

@@ -2689,18 +2684,20 @@ Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, Map<String, To
26892684
.filter(entry -> curTimestamp - entry.getValue().timestamp() < delta)
26902685
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
26912686
);
2692-
26932687
// Here will add any topics which are subscribed but not initialized and initializing
26942688
// topics whose timestamp indicates that they are older than delta elapsed.
2695-
subscriptionMetadata.forEach((topicName, topicMetadata) -> {
2696-
Set<Integer> alreadyInitializedPartSet = alreadyInitialized.containsKey(topicMetadata.id()) ? alreadyInitialized.get(topicMetadata.id()).partitions() : Set.of();
2697-
if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicMetadata.numPartitions()) {
2698-
Set<Integer> partitionSet = IntStream.range(0, topicMetadata.numPartitions()).boxed().collect(Collectors.toSet());
2699-
partitionSet.removeAll(alreadyInitializedPartSet);
2700-
// alreadyInitialized contains all initialized topics and initializing topics which are less than delta old
2701-
// which means we are putting subscribed topics which are unseen or initializing for more than delta. But, we
2702-
// are also updating the timestamp here which means, old initializing will not be included repeatedly.
2703-
topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k -> new InitMapValue(topicMetadata.name(), partitionSet, curTimestamp));
2689+
subscriptionTopicNames.forEach(topicName -> {
2690+
TopicImage topicImage = metadataImage.topics().getTopic(topicName);
2691+
if (topicImage != null) {
2692+
Set<Integer> alreadyInitializedPartSet = alreadyInitialized.containsKey(topicImage.id()) ? alreadyInitialized.get(topicImage.id()).partitions() : Set.of();
2693+
if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicImage.partitions().size()) {
2694+
Set<Integer> partitionSet = IntStream.range(0, topicImage.partitions().size()).boxed()
2695+
.filter(p -> !alreadyInitializedPartSet.contains(p)).collect(Collectors.toSet());
2696+
// alreadyInitialized contains all initialized topics and initializing topics which are less than delta old
2697+
// which means we are putting subscribed topics which are unseen or initializing for more than delta. But, we
2698+
// are also updating the timestamp here which means, old initializing will not be included repeatedly.
2699+
topicPartitionChangeMap.computeIfAbsent(topicImage.id(), k -> new InitMapValue(topicImage.name(), partitionSet, curTimestamp));
2700+
}
27042701
}
27052702
});
27062703
return topicPartitionChangeMap;
@@ -2710,22 +2707,22 @@ Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, Map<String, To
27102707
* Based on the diff between the subscribed topic partitions and the initialized topic partitions,
27112708
* created initialize request for the non-initialized ones.
27122709
*
2713-
* @param groupId The share group id for which partitions need to be initialized.
2714-
* @param groupEpoch The group epoch of the share group.
2715-
* @param subscriptionMetadata The subscription metadata for the share group.
2710+
* @param groupId The share group id for which partitions need to be initialized.
2711+
* @param groupEpoch The group epoch of the share group.
2712+
* @param subscriptionTopicNames The subscription topic names for the share group.
27162713
* @return An optional representing the persister initialize request.
27172714
*/
27182715
private Optional<InitializeShareGroupStateParameters> maybeCreateInitializeShareGroupStateRequest(
27192716
String groupId,
27202717
int groupEpoch,
2721-
Map<String, TopicMetadata> subscriptionMetadata,
2718+
Set<String> subscriptionTopicNames,
27222719
List<CoordinatorRecord> records
27232720
) {
2724-
if (subscriptionMetadata == null || subscriptionMetadata.isEmpty() || metadataImage.isEmpty()) {
2721+
if (subscriptionTopicNames == null || subscriptionTopicNames.isEmpty() || metadataImage.isEmpty()) {
27252722
return Optional.empty();
27262723
}
27272724

2728-
Map<Uuid, InitMapValue> topicPartitionChangeMap = subscribedTopicsChangeMap(groupId, subscriptionMetadata);
2725+
Map<Uuid, InitMapValue> topicPartitionChangeMap = subscribedTopicsChangeMap(groupId, subscriptionTopicNames);
27292726

27302727
// Nothing to initialize.
27312728
if (topicPartitionChangeMap.isEmpty()) {
@@ -4078,21 +4075,20 @@ private <T> CoordinatorResult<T, CoordinatorRecord> shareGroupFenceMember(
40784075
records.add(newShareGroupMemberSubscriptionTombstoneRecord(group.groupId(), member.memberId()));
40794076

40804077
// We update the subscription metadata without the leaving member.
4081-
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
4078+
long groupMetadataHash = ModernGroup.computeMetadataHash(
40824079
group.computeSubscribedTopicNames(member, null),
4083-
metadataImage.topics(),
4084-
metadataImage.cluster()
4080+
topicHashCache,
4081+
metadataImage
40854082
);
40864083

4087-
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
4088-
log.info("[GroupId {}] Computed new subscription metadata: {}.",
4089-
group.groupId(), subscriptionMetadata);
4090-
records.add(newShareGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata));
4084+
if (groupMetadataHash != group.metadataHash()) {
4085+
log.info("[GroupId {}] Computed new metadata hash: {}.",
4086+
group.groupId(), groupMetadataHash);
40914087
}
40924088

40934089
// We bump the group epoch.
40944090
int groupEpoch = group.groupEpoch() + 1;
4095-
records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch, 0));
4091+
records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch, groupMetadataHash));
40964092

40974093
cancelGroupSessionTimeout(group.groupId(), member.memberId());
40984094

@@ -5357,6 +5353,7 @@ public void replay(
53575353
if (value != null) {
53585354
ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, true);
53595355
shareGroup.setGroupEpoch(value.epoch());
5356+
shareGroup.setMetadataHash(value.metadataHash());
53605357
} else {
53615358
ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, false);
53625359
if (!shareGroup.members().isEmpty()) {
@@ -5537,32 +5534,6 @@ public void replay(
55375534
}
55385535
}
55395536

5540-
/**
5541-
* Replays ShareGroupPartitionMetadataKey/Value to update the hard state of
5542-
* the share group. It updates the subscription metadata of the share
5543-
* group.
5544-
*
5545-
* @param key A ShareGroupPartitionMetadataKey key.
5546-
* @param value A ShareGroupPartitionMetadataValue record.
5547-
*/
5548-
public void replay(
5549-
ShareGroupPartitionMetadataKey key,
5550-
ShareGroupPartitionMetadataValue value
5551-
) {
5552-
String groupId = key.groupId();
5553-
ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);
5554-
5555-
if (value != null) {
5556-
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
5557-
value.topics().forEach(topicMetadata ->
5558-
subscriptionMetadata.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata))
5559-
);
5560-
group.setSubscriptionMetadata(subscriptionMetadata);
5561-
} else {
5562-
group.setSubscriptionMetadata(Map.of());
5563-
}
5564-
}
5565-
55665537
/**
55675538
* Replays ShareGroupTargetAssignmentMemberKey/Value to update the hard state of
55685539
* the share group. It updates the target assignment of the member or deletes it.

0 commit comments

Comments
 (0)