Skip to content

Commit fcb722d

Browse files
authored
KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group (#19790)
When a consumer protocol static member replaces an existing member in a classic group, it's not necessary to recompute the assignment. However, it happens anyway. In [ConsumerGroup.fromClassicGroup](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java#L1140), we don't set the group's subscriptionMetadata. Later in the consumer group heartbeat, we [call updateSubscriptionMetadata](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1748), which [notices that the group's subscriptionMetadata needs an update](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L2757) and bumps the epoch. Since the epoch is bumped, we [recompute the assignment](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1766). As a fix, this patch sets the subscriptionMetadata in ConsumerGroup.fromClassicGroup. Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
1 parent 77aff85 commit fcb722d

File tree

4 files changed

+49
-43
lines changed

4 files changed

+49
-43
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1355,7 +1355,8 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Coordinator
13551355
snapshotRegistry,
13561356
metrics,
13571357
classicGroup,
1358-
metadataImage.topics()
1358+
metadataImage.topics(),
1359+
metadataImage.cluster()
13591360
);
13601361
} catch (SchemaException e) {
13611362
log.warn("Cannot upgrade classic group " + classicGroup.groupId() +

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.kafka.coordinator.group.modern.ModernGroup;
4444
import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
4545
import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
46+
import org.apache.kafka.image.ClusterImage;
4647
import org.apache.kafka.image.TopicsImage;
4748
import org.apache.kafka.timeline.SnapshotRegistry;
4849
import org.apache.kafka.timeline.TimelineHashMap;
@@ -1129,7 +1130,8 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup(
11291130
* @param snapshotRegistry The SnapshotRegistry.
11301131
* @param metrics The GroupCoordinatorMetricsShard.
11311132
* @param classicGroup The converted classic group.
1132-
* @param topicsImage The TopicsImage for topic id and topic name conversion.
1133+
* @param topicsImage The current metadata for all available topics.
1134+
* @param clusterImage The current metadata for the Kafka cluster.
11331135
* @return The created ConsumerGroup.
11341136
*
11351137
* @throws SchemaException if any member's subscription or assignment cannot be deserialized.
@@ -1139,7 +1141,8 @@ public static ConsumerGroup fromClassicGroup(
11391141
SnapshotRegistry snapshotRegistry,
11401142
GroupCoordinatorMetricsShard metrics,
11411143
ClassicGroup classicGroup,
1142-
TopicsImage topicsImage
1144+
TopicsImage topicsImage,
1145+
ClusterImage clusterImage
11431146
) {
11441147
String groupId = classicGroup.groupId();
11451148
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
@@ -1195,6 +1198,12 @@ public static ConsumerGroup fromClassicGroup(
11951198
consumerGroup.updateMember(newMember);
11961199
});
11971200

1201+
consumerGroup.setSubscriptionMetadata(consumerGroup.computeSubscriptionMetadata(
1202+
consumerGroup.subscribedTopicNames(),
1203+
topicsImage,
1204+
clusterImage
1205+
));
1206+
11981207
return consumerGroup;
11991208
}
12001209

@@ -1210,6 +1219,8 @@ public void createConsumerGroupRecords(
12101219
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId(), consumerGroupMember))
12111220
);
12121221

1222+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId(), subscriptionMetadata()));
1223+
12131224
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch(), 0));
12141225

12151226
members().forEach((consumerGroupMemberId, consumerGroupMember) ->

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 27 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -10239,6 +10239,10 @@ memberId2, new MemberAssignmentImpl(mkAssignment(
1023910239

1024010240
// Create the new consumer group with member 1.
1024110241
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
10242+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
10243+
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
10244+
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
10245+
)),
1024210246
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
1024310247
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
1024410248
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0),
@@ -10247,12 +10251,6 @@ memberId2, new MemberAssignmentImpl(mkAssignment(
1024710251
// Member 2 joins the new consumer group.
1024810252
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
1024910253

10250-
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
10251-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
10252-
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
10253-
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
10254-
)),
10255-
1025610254
// Newly joining member 2 bumps the group epoch. A new target assignment is computed.
1025710255
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
1025810256
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, assignor.targetPartitions(memberId2)),
@@ -10454,6 +10452,11 @@ memberId3, new MemberAssignmentImpl(mkAssignment(
1045410452
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
1045510453
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
1045610454

10455+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
10456+
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
10457+
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
10458+
)),
10459+
1045710460
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
1045810461
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
1045910462
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()),
@@ -10466,12 +10469,6 @@ memberId3, new MemberAssignmentImpl(mkAssignment(
1046610469
// Member 3 joins the new consumer group.
1046710470
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3),
1046810471

10469-
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
10470-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
10471-
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
10472-
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
10473-
)),
10474-
1047510472
// Newly joining member 3 bumps the group epoch. A new target assignment is computed.
1047610473
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
1047710474
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),
@@ -10659,7 +10656,7 @@ public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
1065910656
);
1066010657

1066110658
group.transitionTo(PREPARING_REBALANCE);
10662-
group.transitionTo(COMPLETING_REBALANCE);
10659+
group.initNextGeneration();
1066310660
group.transitionTo(STABLE);
1066410661

1066510662
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, assignments));
@@ -10681,8 +10678,8 @@ public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
1068110678

1068210679
ConsumerGroupMember expectedClassicMember = new ConsumerGroupMember.Builder(memberId)
1068310680
.setInstanceId(instanceId)
10684-
.setMemberEpoch(0)
10685-
.setPreviousMemberEpoch(0)
10681+
.setMemberEpoch(group.generationId())
10682+
.setPreviousMemberEpoch(group.generationId())
1068610683
.setClientId(DEFAULT_CLIENT_ID)
1068710684
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
1068810685
.setSubscribedTopicNames(List.of(fooTopicName))
@@ -10718,7 +10715,7 @@ public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
1071810715
.build();
1071910716

1072010717
ConsumerGroupMember expectedFinalConsumerMember = new ConsumerGroupMember.Builder(expectedReplacingConsumerMember)
10721-
.setMemberEpoch(1)
10718+
.setMemberEpoch(group.generationId())
1072210719
.setServerAssignorName(NoOpPartitionAssignor.NAME)
1072310720
.setRebalanceTimeoutMs(5000)
1072410721
.setClassicMemberMetadata(null)
@@ -10730,9 +10727,10 @@ public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
1073010727

1073110728
// Create the new consumer group with the static member.
1073210729
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedClassicMember),
10733-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
10730+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))),
10731+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, group.generationId(), 0),
1073410732
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, expectedClassicMember.assignedPartitions()),
10735-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0),
10733+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, group.generationId()),
1073610734
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedClassicMember),
1073710735

1073810736
// Remove the static member because the rejoining member replaces it.
@@ -10745,17 +10743,10 @@ public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
1074510743
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
1074610744
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedReplacingConsumerMember),
1074710745

10748-
// The static member rejoins the new consumer group.
10746+
// The static member rejoins the new consumer group with the same instance id and
10747+
// takes the assignment of the previous member. No new target assignment is computed.
1074910748
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedFinalConsumerMember),
1075010749

10751-
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
10752-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))),
10753-
10754-
// Newly joining static member bumps the group epoch. A new target assignment is computed.
10755-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
10756-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
10757-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
10758-
1075910750
// The newly created static member takes the assignment from the existing member.
1076010751
// Bump its member epoch and transition to STABLE.
1076110752
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedFinalConsumerMember)
@@ -10856,6 +10847,10 @@ public void testConsumerGroupHeartbeatToClassicGroupWithEmptyAssignmentMember()
1085610847

1085710848
// Create the new consumer group with member 1.
1085810849
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
10850+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
10851+
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
10852+
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
10853+
)),
1085910854
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
1086010855
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
1086110856
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
@@ -10864,12 +10859,6 @@ public void testConsumerGroupHeartbeatToClassicGroupWithEmptyAssignmentMember()
1086410859
// Member 2 joins the new consumer group.
1086510860
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
1086610861

10867-
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
10868-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
10869-
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
10870-
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
10871-
)),
10872-
1087310862
// Newly joining member 2 bumps the group epoch. A new target assignment is computed.
1087410863
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0),
1087510864
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Map.of()),
@@ -11241,6 +11230,11 @@ memberId3, new MemberAssignmentImpl(mkAssignment(
1124111230
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
1124211231
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
1124311232

11233+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
11234+
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
11235+
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
11236+
)),
11237+
1124411238
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
1124511239
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
1124611240
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()),
@@ -11253,12 +11247,6 @@ memberId3, new MemberAssignmentImpl(mkAssignment(
1125311247
// Member 3 joins the new consumer group.
1125411248
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3),
1125511249

11256-
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
11257-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
11258-
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
11259-
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
11260-
)),
11261-
1126211250
// Newly joining member 3 bumps the group epoch. A new target assignment is computed.
1126311251
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0),
1126411252
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1532,7 +1532,8 @@ public void testFromClassicGroup() {
15321532
new SnapshotRegistry(logContext),
15331533
mock(GroupCoordinatorMetricsShard.class),
15341534
classicGroup,
1535-
metadataImage.topics()
1535+
metadataImage.topics(),
1536+
metadataImage.cluster()
15361537
);
15371538

15381539
ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
@@ -1545,6 +1546,10 @@ public void testFromClassicGroup() {
15451546
expectedConsumerGroup.updateTargetAssignment(memberId, new Assignment(mkAssignment(
15461547
mkTopicAssignment(fooTopicId, 0)
15471548
)));
1549+
expectedConsumerGroup.setSubscriptionMetadata(Map.of(
1550+
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
1551+
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
1552+
));
15481553
expectedConsumerGroup.updateMember(new ConsumerGroupMember.Builder(memberId)
15491554
.setMemberEpoch(classicGroup.generationId())
15501555
.setState(MemberState.STABLE)
@@ -1576,6 +1581,7 @@ public void testFromClassicGroup() {
15761581
assertEquals(expectedConsumerGroup.groupEpoch(), consumerGroup.groupEpoch());
15771582
assertEquals(expectedConsumerGroup.state(), consumerGroup.state());
15781583
assertEquals(expectedConsumerGroup.preferredServerAssignor(), consumerGroup.preferredServerAssignor());
1584+
assertEquals(Map.copyOf(expectedConsumerGroup.subscriptionMetadata()), Map.copyOf(consumerGroup.subscriptionMetadata()));
15791585
assertEquals(expectedConsumerGroup.members(), consumerGroup.members());
15801586
}
15811587

0 commit comments

Comments
 (0)