From a7fca7874c0a0d7249ce4dcb3adb68594f211e36 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 20 May 2025 14:42:36 +0800 Subject: [PATCH 01/12] migrate --- .../kafka/clients/ClientsTestUtils.java | 10 + .../consumer/PlaintextConsumerAssignTest.java | 316 ++++++++++++++++++ 2 files changed, 326 insertions(+) create mode 100644 clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java index 9203197ad3aec..09e3dc8a452f7 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.function.Supplier; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -52,6 +53,15 @@ public static List> consumeRecords( return records; } + public static void pollUntilTrue(Consumer consumer, + Supplier testCondition, + long waitTimeMs, String msg) throws InterruptedException { + TestUtils.waitForCondition(() -> { + consumer.poll(Duration.ofMillis(100)); + return testCondition.get(); + }, waitTimeMs, msg); + } + public static void consumeAndVerifyRecords( Consumer consumer, TopicPartition tp, diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java new file mode 100644 index 0000000000000..66b1908cec5c1 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.ClientsTestUtils; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; + +import org.junit.jupiter.api.BeforeEach; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.METADATA_MAX_AGE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration tests for the consumer that covers logic related to manual assignment. + */ +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = PlaintextConsumerAssignTest.BROKER_COUNT, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "10"), + } +) +public class PlaintextConsumerAssignTest { + + public static final int BROKER_COUNT = 3; + + private final ClusterInstance clusterInstance; + String topic = "topic"; + int partition = 0; + int numPartitions = 3; + short numReplica = 3; + TopicPartition tp = new TopicPartition(topic, partition); + + PlaintextConsumerAssignTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; + } + + @BeforeEach + public void setup() throws InterruptedException { + clusterInstance.createTopic(topic, numPartitions, numReplica); + } + + @ClusterTest + void testClassicAssignAndCommitAsyncNotCommitted() throws Exception { + testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name); + } + + @ClusterTest + void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception { + testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name); + } + + + void testAssignAndCommitAsyncNotCommitted(String groupProtocol) throws InterruptedException { + int numRecords = 10000; + long startingTimestamp = System.currentTimeMillis(); + CountConsumerCommitCallback cb = new CountConsumerCommitCallback(); + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + consumer.assign(List.of(tp)); + consumer.commitAsync(cb); + ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 1 || cb.lastError.isPresent(), + 10000, "Failed to observe commit callback before timeout"); + Map committedOffset = consumer.committed(Set.of(tp)); + assertNotNull(committedOffset); + // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to + // tp. The committed offset should be null. This is intentional. + assertNull(committedOffset.get(tp)); + assertTrue(consumer.assignment().contains(tp)); + } + } + + @ClusterTest + void testClassicAssignAndCommitSyncNotCommitted() throws Exception { + testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name); + } + + @ClusterTest + void testAsyncAssignAndCommitSyncNotCommitted() throws Exception { + testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name); + } + void testAssignAndCommitSyncNotCommitted(String groupProtocol) throws InterruptedException { + int numRecords = 10000; + long startingTimestamp = System.currentTimeMillis(); + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + consumer.assign(List.of(tp)); + consumer.commitSync(); + Map committedOffset = consumer.committed(Set.of(tp)); + assertNotNull(committedOffset); + // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to + // tp. The committed offset should be null. This is intentional. + assertNull(committedOffset.get(tp)); + assertTrue(consumer.assignment().contains(tp)); + } + } + + @ClusterTest + void testClassicAssignAndCommitSyncAllConsumed() throws Exception { + testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name); + } + + @ClusterTest + void testAsyncAssignAndCommitSyncAllConsumed() throws Exception { + testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name); + } + void testAssignAndCommitSyncAllConsumed(String groupProtocol) throws InterruptedException { + int numRecords = 10000; + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + long startingTimestamp = System.currentTimeMillis(); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + consumer.assign(List.of(tp)); + consumer.seek(tp, 0); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp, -1); + + consumer.commitSync(); + Map committedOffset = consumer.committed(Set.of(tp)); + assertNotNull(committedOffset); + assertNotNull(committedOffset.get(tp)); + assertEquals(numRecords, committedOffset.get(tp).offset()); + } + } + + @ClusterTest + public void testClassicAssignAndConsume() throws InterruptedException { + testAssignAndConsume(GroupProtocol.CLASSIC.name); + } + + @ClusterTest + public void testAsyncAssignAndConsume() throws InterruptedException { + testAssignAndConsume(GroupProtocol.CONSUMER.name); + } + + private void testAssignAndConsume(String groupProtocol) throws InterruptedException { + int numRecords = 10; + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + long startingTimestamp = System.currentTimeMillis(); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + consumer.assign(List.of(tp)); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp, -1); + + assertEquals(numRecords, consumer.position(tp)); + } + } + + @ClusterTest + public void testClassicAssignAndConsumeSkippingPosition() throws InterruptedException { + testAssignAndConsumeSkippingPosition(GroupProtocol.CLASSIC.name); + } + + @ClusterTest + public void testAsyncAssignAndConsumeSkippingPosition() throws InterruptedException { + testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER.name); + } + + private void testAssignAndConsumeSkippingPosition(String groupProtocol) throws InterruptedException { + int numRecords = 10; + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + long startingTimestamp = System.currentTimeMillis(); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + consumer.assign(List.of(tp)); + int offset = 1; + consumer.seek(tp, offset); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords - offset, offset, offset, startingTimestamp + offset, -1); + + assertEquals(numRecords, consumer.position(tp)); + } + } + + @ClusterTest + public void testClassicAssignAndFetchCommittedOffsets() throws InterruptedException { + testAssignAndFetchCommittedOffsets(GroupProtocol.CLASSIC.name); + } + + @ClusterTest + public void testAsyncAssignAndFetchCommittedOffsets() throws InterruptedException { + testAssignAndFetchCommittedOffsets(GroupProtocol.CONSUMER.name); + } + + private void testAssignAndFetchCommittedOffsets(String groupProtocol) throws InterruptedException { + int numRecords = 100; + long startingTimestamp = System.currentTimeMillis(); + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + consumer.assign(List.of(tp)); + // First consumer consumes and commits offsets + consumer.seek(tp, 0); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp, -1); + consumer.commitSync(); + assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset()); + } + + // We should see the committed offsets from another consumer + try (Consumer anotherConsumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + anotherConsumer.assign(List.of(tp)); + assertEquals(numRecords, anotherConsumer.committed(Set.of(tp)).get(tp).offset()); + } + } + + @ClusterTest + public void testClassicAssignAndConsumeFromCommittedOffsets() throws InterruptedException { + testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CLASSIC.name); + } + + @ClusterTest + public void testAsyncAssignAndConsumeFromCommittedOffsets() throws InterruptedException { + testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CONSUMER.name); + } + + private void testAssignAndConsumeFromCommittedOffsets(String groupProtocol) throws InterruptedException { + int numRecords = 100; + int offset = 10; + long startingTimestamp = System.currentTimeMillis(); + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1", METADATA_MAX_AGE_CONFIG, "10"))) { + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + consumer.assign(List.of(tp)); + consumer.commitSync(Map.of(tp, new OffsetAndMetadata(offset))); + assertEquals(offset, consumer.committed(Set.of(tp)).get(tp).offset()); + } + + // We should see the committed offsets from another consumer + try (Consumer anotherConsumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1", METADATA_MAX_AGE_CONFIG, "10"))) { + assertEquals(offset, anotherConsumer.committed(Set.of(tp)).get(tp).offset()); + anotherConsumer.assign(List.of(tp)); + ClientsTestUtils.consumeAndVerifyRecords(anotherConsumer, tp, numRecords - offset, offset, offset, startingTimestamp + offset); + } + } + + @ClusterTest + public void testClassicAssignAndRetrievingCommittedOffsetsMultipleTimes() throws InterruptedException { + testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CLASSIC.name); + } + + @ClusterTest + public void testAsyncAssignAndRetrievingCommittedOffsetsMultipleTimes() throws InterruptedException { + testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CONSUMER.name); + } + + private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(String groupProtocol) throws InterruptedException { + int numRecords = 100; + long startingTimestamp = System.currentTimeMillis(); + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1", METADATA_MAX_AGE_CONFIG, "10"))) { + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + consumer.assign(List.of(tp)); + + // Consume and commit offsets + consumer.seek(tp, 0); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp); + consumer.commitSync(); + + // Check committed offsets twice with same consumer + assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset()); + assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset()); + } + } + + private static class CountConsumerCommitCallback implements OffsetCommitCallback { + int successCount = 0; + int failCount = 0; + Optional lastError = Optional.empty(); + + public void onComplete(Map offsets, Exception exception) { + if (exception == null) { + successCount += 1; + } else { + failCount += 1; + lastError = Optional.of(exception); + } + } + } +} From f2f18b86ff893f461c005d9499311deee569103d Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 20 May 2025 21:39:34 +0800 Subject: [PATCH 02/12] remove Max_metadata_age --- .../clients/consumer/PlaintextConsumerAssignTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java index 66b1908cec5c1..06e6109391a5c 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java @@ -34,7 +34,6 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.METADATA_MAX_AGE_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; @@ -255,7 +254,7 @@ private void testAssignAndConsumeFromCommittedOffsets(String groupProtocol) thro int offset = 10; long startingTimestamp = System.currentTimeMillis(); - try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1", METADATA_MAX_AGE_CONFIG, "10"))) { + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); consumer.assign(List.of(tp)); consumer.commitSync(Map.of(tp, new OffsetAndMetadata(offset))); @@ -263,7 +262,7 @@ private void testAssignAndConsumeFromCommittedOffsets(String groupProtocol) thro } // We should see the committed offsets from another consumer - try (Consumer anotherConsumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1", METADATA_MAX_AGE_CONFIG, "10"))) { + try (Consumer anotherConsumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { assertEquals(offset, anotherConsumer.committed(Set.of(tp)).get(tp).offset()); anotherConsumer.assign(List.of(tp)); ClientsTestUtils.consumeAndVerifyRecords(anotherConsumer, tp, numRecords - offset, offset, offset, startingTimestamp + offset); @@ -284,7 +283,7 @@ private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(String groupPr int numRecords = 100; long startingTimestamp = System.currentTimeMillis(); - try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1", METADATA_MAX_AGE_CONFIG, "10"))) { + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); consumer.assign(List.of(tp)); From 14c5457a2f7da316ca6b289930270e9e4a1ddb00 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 20 May 2025 21:56:32 +0800 Subject: [PATCH 03/12] fix issue --- .../kafka/clients/consumer/PlaintextConsumerAssignTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java index 06e6109391a5c..a438462d37add 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java @@ -222,7 +222,7 @@ private void testAssignAndFetchCommittedOffsets(String groupProtocol) throws Int int numRecords = 100; long startingTimestamp = System.currentTimeMillis(); - try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); consumer.assign(List.of(tp)); // First consumer consumes and commits offsets @@ -233,7 +233,7 @@ private void testAssignAndFetchCommittedOffsets(String groupProtocol) throws Int } // We should see the committed offsets from another consumer - try (Consumer anotherConsumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + try (Consumer anotherConsumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { anotherConsumer.assign(List.of(tp)); assertEquals(numRecords, anotherConsumer.committed(Set.of(tp)).get(tp).offset()); } From 4191793adf4cbe50614a528b6d9530f5e9553770 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 20 May 2025 21:59:53 +0800 Subject: [PATCH 04/12] remove scala version --- .../api/PlaintextConsumerAssignTest.scala | 207 ------------------ 1 file changed, 207 deletions(-) delete mode 100644 core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala deleted file mode 100644 index 474e10100d877..0000000000000 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import kafka.utils.{TestInfoUtils, TestUtils} -import java.util.Properties -import org.apache.kafka.clients.consumer._ -import org.apache.kafka.common.TopicPartition -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Timeout -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.MethodSource - -import scala.jdk.CollectionConverters._ - -/** - * Integration tests for the consumer that covers logic related to manual assignment. - */ -@Timeout(600) -class PlaintextConsumerAssignTest extends AbstractConsumerTest { - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndCommitAsyncNotCommitted(groupProtocol: String): Unit = { - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - val producer = createProducer() - val numRecords = 10000 - val startingTimestamp = System.currentTimeMillis() - val cb = new CountConsumerCommitCallback - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - consumer.assign(List(tp).asJava) - consumer.commitAsync(cb) - TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || cb.lastError.isDefined, - "Failed to observe commit callback before timeout", waitTimeMs = 10000) - val committedOffset = consumer.committed(Set(tp).asJava) - assertNotNull(committedOffset) - // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to - // tp. The committed offset should be null. This is intentional. - assertNull(committedOffset.get(tp)) - assertTrue(consumer.assignment.contains(tp)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndCommitSyncNotCommitted(groupProtocol: String): Unit = { - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - val producer = createProducer() - val numRecords = 10000 - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - consumer.assign(List(tp).asJava) - consumer.commitSync() - val committedOffset = consumer.committed(Set(tp).asJava) - assertNotNull(committedOffset) - // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to - // tp. The committed offset should be null. This is intentional. - assertNull(committedOffset.get(tp)) - assertTrue(consumer.assignment.contains(tp)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndCommitSyncAllConsumed(groupProtocol: String): Unit = { - val numRecords = 10000 - - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - consumer.assign(List(tp).asJava) - consumer.seek(tp, 0) - consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) - - consumer.commitSync() - val committedOffset = consumer.committed(Set(tp).asJava) - assertNotNull(committedOffset) - assertNotNull(committedOffset.get(tp)) - assertEquals(numRecords, committedOffset.get(tp).offset()) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndConsume(groupProtocol: String): Unit = { - val numRecords = 10 - - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - - val props = new Properties() - val consumer = createConsumer(configOverrides = props, - configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) - consumer.assign(List(tp).asJava) - consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) - - assertEquals(numRecords, consumer.position(tp)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndConsumeSkippingPosition(groupProtocol: String): Unit = { - val numRecords = 10 - - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - - val props = new Properties() - val consumer = createConsumer(configOverrides = props, - configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) - consumer.assign(List(tp).asJava) - val offset = 1 - consumer.seek(tp, offset) - consumeAndVerifyRecords(consumer = consumer, numRecords - offset, startingOffset = offset, - startingKeyAndValueIndex = offset, startingTimestamp = startingTimestamp + offset) - - assertEquals(numRecords, consumer.position(tp)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndFetchCommittedOffsets(groupProtocol: String): Unit = { - val numRecords = 100 - val startingTimestamp = System.currentTimeMillis() - val producer = createProducer() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - consumer.assign(List(tp).asJava) - // First consumer consumes and commits offsets - consumer.seek(tp, 0) - consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, - startingTimestamp = startingTimestamp) - consumer.commitSync() - assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset) - // We should see the committed offsets from another consumer - val anotherConsumer = createConsumer(configOverrides = props) - anotherConsumer.assign(List(tp).asJava) - assertEquals(numRecords, anotherConsumer.committed(Set(tp).asJava).get(tp).offset) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndConsumeFromCommittedOffsets(groupProtocol: String): Unit = { - val producer = createProducer() - val numRecords = 100 - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords = numRecords, tp, startingTimestamp = startingTimestamp) - - // Commit offset with first consumer - val props = new Properties() - props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1") - val consumer = createConsumer(configOverrides = props) - consumer.assign(List(tp).asJava) - val offset = 10 - consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(offset))) - .asJava) - assertEquals(offset, consumer.committed(Set(tp).asJava).get(tp).offset) - consumer.close() - - // Consume from committed offsets with another consumer in same group - val anotherConsumer = createConsumer(configOverrides = props) - assertEquals(offset, anotherConsumer.committed(Set(tp).asJava).get(tp).offset) - anotherConsumer.assign(List(tp).asJava) - consumeAndVerifyRecords(consumer = anotherConsumer, numRecords - offset, - startingOffset = offset, startingKeyAndValueIndex = offset, - startingTimestamp = startingTimestamp + offset) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndRetrievingCommittedOffsetsMultipleTimes(groupProtocol: String): Unit = { - val numRecords = 100 - val startingTimestamp = System.currentTimeMillis() - val producer = createProducer() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - consumer.assign(List(tp).asJava) - - // Consume and commit offsets - consumer.seek(tp, 0) - consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, - startingTimestamp = startingTimestamp) - consumer.commitSync() - - // Check committed offsets twice with same consumer - assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset) - assertEquals(numRecords, consumer.committed(Set(tp).asJava).get(tp).offset) - } - -} From 4bec23e5e8afdb95898b7da24eeceaa8198efd67 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Mon, 26 May 2025 10:44:43 +0800 Subject: [PATCH 05/12] remove scala version --- .../PlaintextConsumerAssignorsTest.java | 4 + .../api/PlaintextConsumerAssignTest.scala | 203 ------------------ 2 files changed, 4 insertions(+), 203 deletions(-) create mode 100644 clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignorsTest.java delete mode 100644 core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignorsTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignorsTest.java new file mode 100644 index 0000000000000..85dcf9c722acc --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignorsTest.java @@ -0,0 +1,4 @@ +package org.apache.kafka.clients.consumer; + +public class PlaintextConsumerAssignorsTest { +} diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala deleted file mode 100644 index 6673a1e0334c0..0000000000000 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import kafka.utils.{TestInfoUtils, TestUtils} -import java.util.Properties -import org.apache.kafka.clients.consumer._ -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Timeout -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.MethodSource - -/** - * Integration tests for the consumer that covers logic related to manual assignment. - */ -@Timeout(600) -class PlaintextConsumerAssignTest extends AbstractConsumerTest { - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndCommitAsyncNotCommitted(groupProtocol: String): Unit = { - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - val producer = createProducer() - val numRecords = 10000 - val startingTimestamp = System.currentTimeMillis() - val cb = new CountConsumerCommitCallback - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - consumer.assign(java.util.List.of(tp)) - consumer.commitAsync(cb) - TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || cb.lastError.isDefined, - "Failed to observe commit callback before timeout", waitTimeMs = 10000) - val committedOffset = consumer.committed(java.util.Set.of(tp)) - assertNotNull(committedOffset) - // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to - // tp. The committed offset should be null. This is intentional. - assertNull(committedOffset.get(tp)) - assertTrue(consumer.assignment.contains(tp)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndCommitSyncNotCommitted(groupProtocol: String): Unit = { - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - val producer = createProducer() - val numRecords = 10000 - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - consumer.assign(java.util.List.of(tp)) - consumer.commitSync() - val committedOffset = consumer.committed(java.util.Set.of(tp)) - assertNotNull(committedOffset) - // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to - // tp. The committed offset should be null. This is intentional. - assertNull(committedOffset.get(tp)) - assertTrue(consumer.assignment.contains(tp)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndCommitSyncAllConsumed(groupProtocol: String): Unit = { - val numRecords = 10000 - - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - consumer.assign(java.util.List.of(tp)) - consumer.seek(tp, 0) - consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) - - consumer.commitSync() - val committedOffset = consumer.committed(java.util.Set.of(tp)) - assertNotNull(committedOffset) - assertNotNull(committedOffset.get(tp)) - assertEquals(numRecords, committedOffset.get(tp).offset()) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndConsume(groupProtocol: String): Unit = { - val numRecords = 10 - - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - - val props = new Properties() - val consumer = createConsumer(configOverrides = props, - configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) - consumer.assign(java.util.List.of(tp)) - consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) - - assertEquals(numRecords, consumer.position(tp)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndConsumeSkippingPosition(groupProtocol: String): Unit = { - val numRecords = 10 - - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - - val props = new Properties() - val consumer = createConsumer(configOverrides = props, - configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) - consumer.assign(java.util.List.of(tp)) - val offset = 1 - consumer.seek(tp, offset) - consumeAndVerifyRecords(consumer = consumer, numRecords - offset, startingOffset = offset, - startingKeyAndValueIndex = offset, startingTimestamp = startingTimestamp + offset) - - assertEquals(numRecords, consumer.position(tp)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndFetchCommittedOffsets(groupProtocol: String): Unit = { - val numRecords = 100 - val startingTimestamp = System.currentTimeMillis() - val producer = createProducer() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - consumer.assign(java.util.List.of(tp)) - // First consumer consumes and commits offsets - consumer.seek(tp, 0) - consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, - startingTimestamp = startingTimestamp) - consumer.commitSync() - assertEquals(numRecords, consumer.committed(java.util.Set.of(tp)).get(tp).offset) - // We should see the committed offsets from another consumer - val anotherConsumer = createConsumer(configOverrides = props) - anotherConsumer.assign(java.util.List.of(tp)) - assertEquals(numRecords, anotherConsumer.committed(java.util.Set.of(tp)).get(tp).offset) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndConsumeFromCommittedOffsets(groupProtocol: String): Unit = { - val producer = createProducer() - val numRecords = 100 - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords = numRecords, tp, startingTimestamp = startingTimestamp) - - // Commit offset with first consumer - val props = new Properties() - props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1") - val consumer = createConsumer(configOverrides = props) - consumer.assign(java.util.List.of(tp)) - val offset = 10 - consumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(offset))) - assertEquals(offset, consumer.committed(java.util.Set.of(tp)).get(tp).offset) - consumer.close() - - // Consume from committed offsets with another consumer in same group - val anotherConsumer = createConsumer(configOverrides = props) - assertEquals(offset, anotherConsumer.committed(java.util.Set.of(tp)).get(tp).offset) - anotherConsumer.assign(java.util.List.of(tp)) - consumeAndVerifyRecords(consumer = anotherConsumer, numRecords - offset, - startingOffset = offset, startingKeyAndValueIndex = offset, - startingTimestamp = startingTimestamp + offset) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndRetrievingCommittedOffsetsMultipleTimes(groupProtocol: String): Unit = { - val numRecords = 100 - val startingTimestamp = System.currentTimeMillis() - val producer = createProducer() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - consumer.assign(java.util.List.of(tp)) - - // Consume and commit offsets - consumer.seek(tp, 0) - consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, - startingTimestamp = startingTimestamp) - consumer.commitSync() - - // Check committed offsets twice with same consumer - assertEquals(numRecords, consumer.committed(java.util.Set.of(tp)).get(tp).offset) - assertEquals(numRecords, consumer.committed(java.util.Set.of(tp)).get(tp).offset) - } - -} From a577c098f663b992a4ff1b04802af8c009b33fa3 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Mon, 26 May 2025 11:07:45 +0800 Subject: [PATCH 06/12] tmp --- .../PlaintextConsumerAssignorsTest.java | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignorsTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignorsTest.java index 85dcf9c722acc..10ddb06e9d2d0 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignorsTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignorsTest.java @@ -1,4 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.ClientsTestUtils; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.junit.jupiter.api.BeforeEach; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = PlaintextConsumerAssignorsTest.BROKER_COUNT, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "10"), + } +) + public class PlaintextConsumerAssignorsTest { + public static final int BROKER_COUNT = 3; + + int numPartitions = 3; + short numReplica = 3; + + private final ClusterInstance clusterInstance; + String topic0 = "topic0"; + int partition0 = 0; + TopicPartition tp0 = new TopicPartition(topic0, numPartitions); + String topic1 = "topic1"; + TopicPartition tp1 = new TopicPartition(topic1, numPartitions); + String topic2= "topic2"; + TopicPartition tp2 = new TopicPartition(topic2, numPartitions); + + + PlaintextConsumerAssignorsTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; + } + + @BeforeEach + public void setup() throws InterruptedException { + clusterInstance.createTopic(topic0, numPartitions, numReplica); + clusterInstance.createTopic(topic1, numPartitions, numReplica); + } + + + @ClusterTest + void testClassicMultiConsumerRoundRobinAssignor() throws Exception { + testMultiConsumerRoundRobinAssignor(GroupProtocol.CLASSIC.name); + } + + void testMultiConsumerRoundRobinAssignor(String groupProtocol) throws InterruptedException { + int numRecords = 10000; + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, "roundrobin-group", + PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName()))) { + ClientsTestUtils.sendRecords(clusterInstance, tp0, numRecords); + ClientsTestUtils.sendRecords(clusterInstance, tp1, numRecords); + assertEquals(0, consumer.assignment().size()); + + consumer.subscribe(List.of(topic0, topic1)); + ClientsTestUtils.awaitAssignment(consumer, Set.of(tp0, tp1)); + + ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 1 || cb.lastError.isPresent(), + 10000, "Failed to observe commit callback before timeout"); + Map committedOffset = consumer.committed(Set.of(tp)); + assertNotNull(committedOffset); + // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to + // tp. The committed offset should be null. This is intentional. + assertNull(committedOffset.get(tp)); + assertTrue(consumer.assignment().contains(tp)); + } + } + } From 7712313f90e043cd9d144b10088a50512fece547 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Mon, 26 May 2025 12:51:42 +0800 Subject: [PATCH 07/12] remove reundant --- .../PlaintextConsumerAssignorsTest.java | 108 ------------------ 1 file changed, 108 deletions(-) delete mode 100644 clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignorsTest.java diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignorsTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignorsTest.java deleted file mode 100644 index 10ddb06e9d2d0..0000000000000 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignorsTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.consumer; - - -import org.apache.kafka.clients.ClientsTestUtils; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.test.ClusterInstance; -import org.apache.kafka.common.test.api.ClusterConfigProperty; -import org.apache.kafka.common.test.api.ClusterTest; -import org.apache.kafka.common.test.api.ClusterTestDefaults; -import org.apache.kafka.common.test.api.Type; -import org.junit.jupiter.api.BeforeEach; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; -import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; -import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; -import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; -import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; -import static org.junit.jupiter.api.Assertions.assertEquals; - -@ClusterTestDefaults( - types = {Type.KRAFT}, - brokers = PlaintextConsumerAssignorsTest.BROKER_COUNT, - serverProperties = { - @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), - @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), - @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), - @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "10"), - } -) - -public class PlaintextConsumerAssignorsTest { - public static final int BROKER_COUNT = 3; - - int numPartitions = 3; - short numReplica = 3; - - private final ClusterInstance clusterInstance; - String topic0 = "topic0"; - int partition0 = 0; - TopicPartition tp0 = new TopicPartition(topic0, numPartitions); - String topic1 = "topic1"; - TopicPartition tp1 = new TopicPartition(topic1, numPartitions); - String topic2= "topic2"; - TopicPartition tp2 = new TopicPartition(topic2, numPartitions); - - - PlaintextConsumerAssignorsTest(ClusterInstance clusterInstance) { - this.clusterInstance = clusterInstance; - } - - @BeforeEach - public void setup() throws InterruptedException { - clusterInstance.createTopic(topic0, numPartitions, numReplica); - clusterInstance.createTopic(topic1, numPartitions, numReplica); - } - - - @ClusterTest - void testClassicMultiConsumerRoundRobinAssignor() throws Exception { - testMultiConsumerRoundRobinAssignor(GroupProtocol.CLASSIC.name); - } - - void testMultiConsumerRoundRobinAssignor(String groupProtocol) throws InterruptedException { - int numRecords = 10000; - - try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, "roundrobin-group", - PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName()))) { - ClientsTestUtils.sendRecords(clusterInstance, tp0, numRecords); - ClientsTestUtils.sendRecords(clusterInstance, tp1, numRecords); - assertEquals(0, consumer.assignment().size()); - - consumer.subscribe(List.of(topic0, topic1)); - ClientsTestUtils.awaitAssignment(consumer, Set.of(tp0, tp1)); - - ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 1 || cb.lastError.isPresent(), - 10000, "Failed to observe commit callback before timeout"); - Map committedOffset = consumer.committed(Set.of(tp)); - assertNotNull(committedOffset); - // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to - // tp. The committed offset should be null. This is intentional. - assertNull(committedOffset.get(tp)); - assertTrue(consumer.assignment().contains(tp)); - } - } - -} From 262621e3ef628d6bc94af41537ba41fd98c67daa Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 27 May 2025 10:47:49 +0800 Subject: [PATCH 08/12] address brandboat comments --- .../consumer/PlaintextConsumerAssignTest.java | 55 +++++++++---------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java index a438462d37add..50c195aeb3c92 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java @@ -50,7 +50,7 @@ types = {Type.KRAFT}, brokers = PlaintextConsumerAssignTest.BROKER_COUNT, serverProperties = { - @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "3"), @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "10"), @@ -61,10 +61,8 @@ public class PlaintextConsumerAssignTest { public static final int BROKER_COUNT = 3; private final ClusterInstance clusterInstance; - String topic = "topic"; - int partition = 0; - int numPartitions = 3; - short numReplica = 3; + private final String topic = "topic"; + private final int partition = 0; TopicPartition tp = new TopicPartition(topic, partition); PlaintextConsumerAssignTest(ClusterInstance clusterInstance) { @@ -73,27 +71,26 @@ public class PlaintextConsumerAssignTest { @BeforeEach public void setup() throws InterruptedException { - clusterInstance.createTopic(topic, numPartitions, numReplica); + clusterInstance.createTopic(topic, BROKER_COUNT, (short) 2); } @ClusterTest - void testClassicAssignAndCommitAsyncNotCommitted() throws Exception { + public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception { testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name); } @ClusterTest - void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception { + public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception { testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name); } - - void testAssignAndCommitAsyncNotCommitted(String groupProtocol) throws InterruptedException { + private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) throws InterruptedException { int numRecords = 10000; long startingTimestamp = System.currentTimeMillis(); CountConsumerCommitCallback cb = new CountConsumerCommitCallback(); try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { - ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); consumer.commitAsync(cb); ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 1 || cb.lastError.isPresent(), @@ -108,20 +105,21 @@ void testAssignAndCommitAsyncNotCommitted(String groupProtocol) throws Interrupt } @ClusterTest - void testClassicAssignAndCommitSyncNotCommitted() throws Exception { + public void testClassicAssignAndCommitSyncNotCommitted() throws Exception { testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name); } @ClusterTest - void testAsyncAssignAndCommitSyncNotCommitted() throws Exception { + public void testAsyncAssignAndCommitSyncNotCommitted() { testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name); } - void testAssignAndCommitSyncNotCommitted(String groupProtocol) throws InterruptedException { + + private void testAssignAndCommitSyncNotCommitted(String groupProtocol) { int numRecords = 10000; long startingTimestamp = System.currentTimeMillis(); try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { - ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); consumer.commitSync(); Map committedOffset = consumer.committed(Set.of(tp)); @@ -134,23 +132,24 @@ void testAssignAndCommitSyncNotCommitted(String groupProtocol) throws Interrupte } @ClusterTest - void testClassicAssignAndCommitSyncAllConsumed() throws Exception { + public void testClassicAssignAndCommitSyncAllConsumed() throws Exception { testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name); } @ClusterTest - void testAsyncAssignAndCommitSyncAllConsumed() throws Exception { + public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception { testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name); } - void testAssignAndCommitSyncAllConsumed(String groupProtocol) throws InterruptedException { + + private void testAssignAndCommitSyncAllConsumed(String groupProtocol) throws InterruptedException { int numRecords = 10000; try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { long startingTimestamp = System.currentTimeMillis(); - ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); consumer.seek(tp, 0); - ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp, -1); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp); consumer.commitSync(); Map committedOffset = consumer.committed(Set.of(tp)); @@ -175,9 +174,9 @@ private void testAssignAndConsume(String groupProtocol) throws InterruptedExcept try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { long startingTimestamp = System.currentTimeMillis(); - ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); - ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp, -1); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp); assertEquals(numRecords, consumer.position(tp)); } @@ -198,11 +197,11 @@ private void testAssignAndConsumeSkippingPosition(String groupProtocol) throws I try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { long startingTimestamp = System.currentTimeMillis(); - ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); int offset = 1; consumer.seek(tp, offset); - ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords - offset, offset, offset, startingTimestamp + offset, -1); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords - offset, offset, offset, startingTimestamp + offset); assertEquals(numRecords, consumer.position(tp)); } @@ -223,11 +222,11 @@ private void testAssignAndFetchCommittedOffsets(String groupProtocol) throws Int long startingTimestamp = System.currentTimeMillis(); try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { - ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); // First consumer consumes and commits offsets consumer.seek(tp, 0); - ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp, -1); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp); consumer.commitSync(); assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset()); } @@ -255,7 +254,7 @@ private void testAssignAndConsumeFromCommittedOffsets(String groupProtocol) thro long startingTimestamp = System.currentTimeMillis(); try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { - ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); consumer.commitSync(Map.of(tp, new OffsetAndMetadata(offset))); assertEquals(offset, consumer.committed(Set.of(tp)).get(tp).offset()); @@ -284,7 +283,7 @@ private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(String groupPr long startingTimestamp = System.currentTimeMillis(); try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { - ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); // Consume and commit offsets From 344adb815207e02b95ac298b663d700a560b7fa6 Mon Sep 17 00:00:00 2001 From: TaiJuWu Date: Tue, 27 May 2025 10:56:50 +0800 Subject: [PATCH 09/12] Update clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java Co-authored-by: Kuan-Po Tseng --- .../kafka/clients/consumer/PlaintextConsumerAssignTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java index 50c195aeb3c92..cd6fc26106d29 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java @@ -71,7 +71,7 @@ public class PlaintextConsumerAssignTest { @BeforeEach public void setup() throws InterruptedException { - clusterInstance.createTopic(topic, BROKER_COUNT, (short) 2); + clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT); } @ClusterTest From f7510d9e17982dba5f08825599f1af8212281f83 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 29 May 2025 11:37:51 +0800 Subject: [PATCH 10/12] address comments --- .../kafka/clients/ClientsTestUtils.java | 8 ++- .../consumer/PlaintextConsumerAssignTest.java | 70 ++++++++++--------- 2 files changed, 41 insertions(+), 37 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java index 56531cd8559b4..07b8797d286f3 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java @@ -85,9 +85,11 @@ public static void consumeAndVerifyRecords( ); } - public static void pollUntilTrue(Consumer consumer, - Supplier testCondition, - long waitTimeMs, String msg) throws InterruptedException { + public static void pollUntilTrue( + Consumer consumer, + Supplier testCondition, + long waitTimeMs, String msg + ) throws InterruptedException { TestUtils.waitForCondition(() -> { consumer.poll(Duration.ofMillis(100)); return testCondition.get(); diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java index cd6fc26106d29..940c3a1075d86 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -76,20 +77,20 @@ public void setup() throws InterruptedException { @ClusterTest public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception { - testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name); + testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC); } @ClusterTest public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception { - testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name); + testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER); } - private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) throws InterruptedException { + private void testAssignAndCommitAsyncNotCommitted(GroupProtocol groupProtocol) throws InterruptedException { int numRecords = 10000; long startingTimestamp = System.currentTimeMillis(); CountConsumerCommitCallback cb = new CountConsumerCommitCallback(); - try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name))) { ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); consumer.commitAsync(cb); @@ -106,19 +107,19 @@ private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) throws I @ClusterTest public void testClassicAssignAndCommitSyncNotCommitted() throws Exception { - testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name); + testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC); } @ClusterTest public void testAsyncAssignAndCommitSyncNotCommitted() { - testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name); + testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER); } - private void testAssignAndCommitSyncNotCommitted(String groupProtocol) { + private void testAssignAndCommitSyncNotCommitted(GroupProtocol groupProtocol) { int numRecords = 10000; long startingTimestamp = System.currentTimeMillis(); - try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name))) { ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); consumer.commitSync(); @@ -133,18 +134,18 @@ private void testAssignAndCommitSyncNotCommitted(String groupProtocol) { @ClusterTest public void testClassicAssignAndCommitSyncAllConsumed() throws Exception { - testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name); + testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC); } @ClusterTest public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception { - testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name); + testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER); } - private void testAssignAndCommitSyncAllConsumed(String groupProtocol) throws InterruptedException { + private void testAssignAndCommitSyncAllConsumed(GroupProtocol groupProtocol) throws InterruptedException { int numRecords = 10000; - try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name))) { long startingTimestamp = System.currentTimeMillis(); ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); @@ -161,18 +162,18 @@ private void testAssignAndCommitSyncAllConsumed(String groupProtocol) throws Int @ClusterTest public void testClassicAssignAndConsume() throws InterruptedException { - testAssignAndConsume(GroupProtocol.CLASSIC.name); + testAssignAndConsume(GroupProtocol.CLASSIC); } @ClusterTest public void testAsyncAssignAndConsume() throws InterruptedException { - testAssignAndConsume(GroupProtocol.CONSUMER.name); + testAssignAndConsume(GroupProtocol.CONSUMER); } - private void testAssignAndConsume(String groupProtocol) throws InterruptedException { + private void testAssignAndConsume(GroupProtocol groupProtocol) throws InterruptedException { int numRecords = 10; - try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name))) { long startingTimestamp = System.currentTimeMillis(); ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); @@ -184,18 +185,18 @@ private void testAssignAndConsume(String groupProtocol) throws InterruptedExcept @ClusterTest public void testClassicAssignAndConsumeSkippingPosition() throws InterruptedException { - testAssignAndConsumeSkippingPosition(GroupProtocol.CLASSIC.name); + testAssignAndConsumeSkippingPosition(GroupProtocol.CLASSIC); } @ClusterTest public void testAsyncAssignAndConsumeSkippingPosition() throws InterruptedException { - testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER.name); + testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER); } - private void testAssignAndConsumeSkippingPosition(String groupProtocol) throws InterruptedException { + private void testAssignAndConsumeSkippingPosition(GroupProtocol groupProtocol) throws InterruptedException { int numRecords = 10; - try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name))) { long startingTimestamp = System.currentTimeMillis(); ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); @@ -209,19 +210,19 @@ private void testAssignAndConsumeSkippingPosition(String groupProtocol) throws I @ClusterTest public void testClassicAssignAndFetchCommittedOffsets() throws InterruptedException { - testAssignAndFetchCommittedOffsets(GroupProtocol.CLASSIC.name); + testAssignAndFetchCommittedOffsets(GroupProtocol.CLASSIC); } @ClusterTest public void testAsyncAssignAndFetchCommittedOffsets() throws InterruptedException { - testAssignAndFetchCommittedOffsets(GroupProtocol.CONSUMER.name); + testAssignAndFetchCommittedOffsets(GroupProtocol.CONSUMER); } - private void testAssignAndFetchCommittedOffsets(String groupProtocol) throws InterruptedException { + private void testAssignAndFetchCommittedOffsets(GroupProtocol groupProtocol) throws InterruptedException { int numRecords = 100; long startingTimestamp = System.currentTimeMillis(); - try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name, GROUP_ID_CONFIG, "group1"))) { ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); // First consumer consumes and commits offsets @@ -232,7 +233,7 @@ private void testAssignAndFetchCommittedOffsets(String groupProtocol) throws Int } // We should see the committed offsets from another consumer - try (Consumer anotherConsumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { + try (Consumer anotherConsumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name, GROUP_ID_CONFIG, "group1"))) { anotherConsumer.assign(List.of(tp)); assertEquals(numRecords, anotherConsumer.committed(Set.of(tp)).get(tp).offset()); } @@ -240,20 +241,21 @@ private void testAssignAndFetchCommittedOffsets(String groupProtocol) throws Int @ClusterTest public void testClassicAssignAndConsumeFromCommittedOffsets() throws InterruptedException { - testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CLASSIC.name); + testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CLASSIC); } @ClusterTest public void testAsyncAssignAndConsumeFromCommittedOffsets() throws InterruptedException { - testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CONSUMER.name); + testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CONSUMER); } - private void testAssignAndConsumeFromCommittedOffsets(String groupProtocol) throws InterruptedException { + private void testAssignAndConsumeFromCommittedOffsets(GroupProtocol groupProtocol) throws InterruptedException { int numRecords = 100; int offset = 10; long startingTimestamp = System.currentTimeMillis(); + Map config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name, GROUP_ID_CONFIG, "group1"); - try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { + try (Consumer consumer = clusterInstance.consumer(config)) { ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); consumer.commitSync(Map.of(tp, new OffsetAndMetadata(offset))); @@ -261,7 +263,7 @@ private void testAssignAndConsumeFromCommittedOffsets(String groupProtocol) thro } // We should see the committed offsets from another consumer - try (Consumer anotherConsumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { + try (Consumer anotherConsumer = clusterInstance.consumer(config)) { assertEquals(offset, anotherConsumer.committed(Set.of(tp)).get(tp).offset()); anotherConsumer.assign(List.of(tp)); ClientsTestUtils.consumeAndVerifyRecords(anotherConsumer, tp, numRecords - offset, offset, offset, startingTimestamp + offset); @@ -270,19 +272,19 @@ private void testAssignAndConsumeFromCommittedOffsets(String groupProtocol) thro @ClusterTest public void testClassicAssignAndRetrievingCommittedOffsetsMultipleTimes() throws InterruptedException { - testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CLASSIC.name); + testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CLASSIC); } @ClusterTest public void testAsyncAssignAndRetrievingCommittedOffsetsMultipleTimes() throws InterruptedException { - testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CONSUMER.name); + testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CONSUMER); } - private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(String groupProtocol) throws InterruptedException { + private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol groupProtocol) throws InterruptedException { int numRecords = 100; long startingTimestamp = System.currentTimeMillis(); - try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name))) { ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); consumer.assign(List.of(tp)); From 28383f509aee267452363ee79affbbaa42f1c2ef Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 29 May 2025 11:44:46 +0800 Subject: [PATCH 11/12] fix build --- .../kafka/clients/consumer/PlaintextConsumerAssignTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java index 940c3a1075d86..b12b6dacb4d8b 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; From 5a04c5fa42315d87362a98d9d5ed5904a41e90e1 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sun, 1 Jun 2025 08:49:25 +0800 Subject: [PATCH 12/12] fix build --- .../test/java/org/apache/kafka/clients/ClientsTestUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java index 524c7c0e8b153..651c3033f291f 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java @@ -43,8 +43,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.function.Supplier; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import static org.apache.kafka.clients.ClientsTestUtils.TestClusterResourceListenerDeserializer.UPDATE_CONSUMER_COUNT; import static org.apache.kafka.clients.ClientsTestUtils.TestClusterResourceListenerSerializer.UPDATE_PRODUCER_COUNT;