Skip to content

KAFKA-19042: [10/N] Move PlaintextConsumerAssignTest to clients-integration-tests module #19773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -84,6 +85,15 @@ public static void consumeAndVerifyRecords(
);
}

public static void pollUntilTrue(Consumer<byte[], byte[]> consumer,
Supplier<Boolean> testCondition,
long waitTimeMs, String msg) throws InterruptedException {
TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100));
return testCondition.get();
}, waitTimeMs, msg);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the code style used in this file.

Suggested change
public static void pollUntilTrue(Consumer<byte[], byte[]> consumer,
Supplier<Boolean> testCondition,
long waitTimeMs, String msg) throws InterruptedException {
TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100));
return testCondition.get();
}, waitTimeMs, msg);
}
public static void pollUntilTrue(
Consumer<byte[], byte[]> consumer,
Supplier<Boolean> testCondition,
long waitTimeMs,
String msg
) throws InterruptedException {
TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100));
return testCondition.get();
}, waitTimeMs, msg);
}


public static void consumeAndVerifyRecords(
Consumer<byte[], byte[]> consumer,
TopicPartition tp,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.consumer;

import org.apache.kafka.clients.ClientsTestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;

import org.junit.jupiter.api.BeforeEach;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Integration tests for the consumer that covers logic related to manual assignment.
*/
@ClusterTestDefaults(
types = {Type.KRAFT},
brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
serverProperties = {
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "3"),
@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"),
@ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "10"),
}
)
public class PlaintextConsumerAssignTest {

public static final int BROKER_COUNT = 3;

private final ClusterInstance clusterInstance;
private final String topic = "topic";
private final int partition = 0;
TopicPartition tp = new TopicPartition(topic, partition);

PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
this.clusterInstance = clusterInstance;
}

@BeforeEach
public void setup() throws InterruptedException {
clusterInstance.createTopic(topic, BROKER_COUNT, (short) 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the BROKER_COUNT should be replica count, right?

Suggested change
clusterInstance.createTopic(topic, BROKER_COUNT, (short) 2);
clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix it, thanks.

}

@ClusterTest
public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception {
testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC.name);
}

@ClusterTest
public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception {
testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER.name);
}

private void testAssignAndCommitAsyncNotCommitted(String groupProtocol) throws InterruptedException {
int numRecords = 10000;
long startingTimestamp = System.currentTimeMillis();
CountConsumerCommitCallback cb = new CountConsumerCommitCallback();

try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp);
consumer.assign(List.of(tp));
consumer.commitAsync(cb);
ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 1 || cb.lastError.isPresent(),
10000, "Failed to observe commit callback before timeout");
Map<TopicPartition, OffsetAndMetadata> committedOffset = consumer.committed(Set.of(tp));
assertNotNull(committedOffset);
// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to
// tp. The committed offset should be null. This is intentional.
assertNull(committedOffset.get(tp));
assertTrue(consumer.assignment().contains(tp));
}
}

@ClusterTest
public void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC.name);
}

@ClusterTest
public void testAsyncAssignAndCommitSyncNotCommitted() {
testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
}

private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use GroupProtocol instead of String

Suggested change
private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
private void testAssignAndCommitSyncNotCommitted(GroupProtocol groupProtocol) {

int numRecords = 10000;
long startingTimestamp = System.currentTimeMillis();

try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp);
consumer.assign(List.of(tp));
consumer.commitSync();
Map<TopicPartition, OffsetAndMetadata> committedOffset = consumer.committed(Set.of(tp));
assertNotNull(committedOffset);
// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to
// tp. The committed offset should be null. This is intentional.
assertNull(committedOffset.get(tp));
assertTrue(consumer.assignment().contains(tp));
}
}

@ClusterTest
public void testClassicAssignAndCommitSyncAllConsumed() throws Exception {
testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC.name);
}

@ClusterTest
public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception {
testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name);
}

private void testAssignAndCommitSyncAllConsumed(String groupProtocol) throws InterruptedException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void testAssignAndCommitSyncAllConsumed(String groupProtocol) throws InterruptedException {
private void testAssignAndCommitSyncAllConsumed(GroupProtocol groupProtocol) throws InterruptedException {

int numRecords = 10000;

try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
long startingTimestamp = System.currentTimeMillis();
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp);
consumer.assign(List.of(tp));
consumer.seek(tp, 0);
ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp);

consumer.commitSync();
Map<TopicPartition, OffsetAndMetadata> committedOffset = consumer.committed(Set.of(tp));
assertNotNull(committedOffset);
assertNotNull(committedOffset.get(tp));
assertEquals(numRecords, committedOffset.get(tp).offset());
}
}

@ClusterTest
public void testClassicAssignAndConsume() throws InterruptedException {
testAssignAndConsume(GroupProtocol.CLASSIC.name);
}

@ClusterTest
public void testAsyncAssignAndConsume() throws InterruptedException {
testAssignAndConsume(GroupProtocol.CONSUMER.name);
}

private void testAssignAndConsume(String groupProtocol) throws InterruptedException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void testAssignAndConsume(String groupProtocol) throws InterruptedException {
private void testAssignAndConsume(GroupProtocol groupProtocol) throws InterruptedException {

int numRecords = 10;

try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
long startingTimestamp = System.currentTimeMillis();
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp);
consumer.assign(List.of(tp));
ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp);

assertEquals(numRecords, consumer.position(tp));
}
}

@ClusterTest
public void testClassicAssignAndConsumeSkippingPosition() throws InterruptedException {
testAssignAndConsumeSkippingPosition(GroupProtocol.CLASSIC.name);
}

@ClusterTest
public void testAsyncAssignAndConsumeSkippingPosition() throws InterruptedException {
testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER.name);
}

private void testAssignAndConsumeSkippingPosition(String groupProtocol) throws InterruptedException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void testAssignAndConsumeSkippingPosition(String groupProtocol) throws InterruptedException {
private void testAssignAndConsumeSkippingPosition(GroupProtocol groupProtocol) throws InterruptedException {

int numRecords = 10;

try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
long startingTimestamp = System.currentTimeMillis();
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp);
consumer.assign(List.of(tp));
int offset = 1;
consumer.seek(tp, offset);
ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords - offset, offset, offset, startingTimestamp + offset);

assertEquals(numRecords, consumer.position(tp));
}
}

@ClusterTest
public void testClassicAssignAndFetchCommittedOffsets() throws InterruptedException {
testAssignAndFetchCommittedOffsets(GroupProtocol.CLASSIC.name);
}

@ClusterTest
public void testAsyncAssignAndFetchCommittedOffsets() throws InterruptedException {
testAssignAndFetchCommittedOffsets(GroupProtocol.CONSUMER.name);
}

private void testAssignAndFetchCommittedOffsets(String groupProtocol) throws InterruptedException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void testAssignAndFetchCommittedOffsets(String groupProtocol) throws InterruptedException {
private void testAssignAndFetchCommittedOffsets(GroupProtocol groupProtocol) throws InterruptedException {

int numRecords = 100;
long startingTimestamp = System.currentTimeMillis();

try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about extract the same config as a variable.

val config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1");

ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp);
consumer.assign(List.of(tp));
// First consumer consumes and commits offsets
consumer.seek(tp, 0);
ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp);
consumer.commitSync();
assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset());
}

// We should see the committed offsets from another consumer
try (Consumer<byte[], byte[]> anotherConsumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) {
anotherConsumer.assign(List.of(tp));
assertEquals(numRecords, anotherConsumer.committed(Set.of(tp)).get(tp).offset());
}
}

@ClusterTest
public void testClassicAssignAndConsumeFromCommittedOffsets() throws InterruptedException {
testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CLASSIC.name);
}

@ClusterTest
public void testAsyncAssignAndConsumeFromCommittedOffsets() throws InterruptedException {
testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CONSUMER.name);
}

private void testAssignAndConsumeFromCommittedOffsets(String groupProtocol) throws InterruptedException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void testAssignAndConsumeFromCommittedOffsets(String groupProtocol) throws InterruptedException {
private void testAssignAndConsumeFromCommittedOffsets(GroupProtocol groupProtocol) throws InterruptedException {

int numRecords = 100;
int offset = 10;
long startingTimestamp = System.currentTimeMillis();

try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about extract the same config as a variable.

val config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1");

ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp);
consumer.assign(List.of(tp));
consumer.commitSync(Map.of(tp, new OffsetAndMetadata(offset)));
assertEquals(offset, consumer.committed(Set.of(tp)).get(tp).offset());
}

// We should see the committed offsets from another consumer
try (Consumer<byte[], byte[]> anotherConsumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) {
assertEquals(offset, anotherConsumer.committed(Set.of(tp)).get(tp).offset());
anotherConsumer.assign(List.of(tp));
ClientsTestUtils.consumeAndVerifyRecords(anotherConsumer, tp, numRecords - offset, offset, offset, startingTimestamp + offset);
}
}

@ClusterTest
public void testClassicAssignAndRetrievingCommittedOffsetsMultipleTimes() throws InterruptedException {
testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CLASSIC.name);
}

@ClusterTest
public void testAsyncAssignAndRetrievingCommittedOffsetsMultipleTimes() throws InterruptedException {
testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CONSUMER.name);
}

private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(String groupProtocol) throws InterruptedException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(String groupProtocol) throws InterruptedException {
private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol groupProtocol) throws InterruptedException {

int numRecords = 100;
long startingTimestamp = System.currentTimeMillis();

try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test require manually setting the GROUP_ID_CONFIG?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it, thanks!``

ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp);
consumer.assign(List.of(tp));

// Consume and commit offsets
consumer.seek(tp, 0);
ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp);
consumer.commitSync();

// Check committed offsets twice with same consumer
assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset());
assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset());
Comment on lines +295 to +297
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, Why do we need to check the same consumer committed twice?

Copy link
Collaborator Author

@TaiJuWu TaiJuWu May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the test name testAssignAndRetrievingCommittedOffsetsMultipleTimes we can get this test would like to test committed offset multiple time so we need to keep the behavior .

}
}

private static class CountConsumerCommitCallback implements OffsetCommitCallback {
int successCount = 0;
int failCount = 0;
Optional<Exception> lastError = Optional.empty();

public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception == null) {
successCount += 1;
} else {
failCount += 1;
lastError = Optional.of(exception);
}
}
}
}
Loading
Loading