Skip to content

KAFKA-19042: [10/N] Move PlaintextConsumerAssignTest to clients-integration-tests module #19773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: trunk
Choose a base branch
from

Conversation

TaiJuWu
Copy link
Collaborator

@TaiJuWu TaiJuWu commented May 20, 2025

The PR do following:

  1. rewrite to new test infra
  2. rewrite to java
  3. move to clients-integration-tests

@github-actions github-actions bot added triage PRs from the community tests Test fixes (including flaky tests) clients labels May 20, 2025
@github-actions github-actions bot added the core Kafka Broker label May 20, 2025
Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @TaiJuWu, please resolve the conflict

@github-actions github-actions bot removed the triage PRs from the community label May 23, 2025
Copy link
Member

@brandboat brandboat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @TaiJuWu, just curious — is there a specific reason why PlaintextConsumerAssignTest.scala is still included in this PR?

@TaiJuWu
Copy link
Collaborator Author

TaiJuWu commented May 26, 2025

Hi @TaiJuWu, just curious — is there a specific reason why PlaintextConsumerAssignTest.scala is still included in this PR?

Sorry, that is an accident when I merge trunk.

@@ -0,0 +1,4 @@
package org.apache.kafka.clients.consumer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this file :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks for checking.

Comment on lines 64 to 68
String topic = "topic";
int partition = 0;
int numPartitions = 3;
short numReplica = 3;
TopicPartition tp = new TopicPartition(topic, partition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
String topic = "topic";
int partition = 0;
int numPartitions = 3;
short numReplica = 3;
TopicPartition tp = new TopicPartition(topic, partition);
private final String topic = "topic";
private final int partition = 0;
private final int numPartitions = 3;
private final short numReplica = 3;
private final TopicPartition tp = new TopicPartition(topic, partition);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TopicPartition tp = new TopicPartition(topic, partition); this line is missed

}

@ClusterTest
void testClassicAssignAndCommitAsyncNotCommitted() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it might not be critical in test files, but could we add public to all test methods just to keep things consistent?

}


void testAssignAndCommitAsyncNotCommitted(String groupProtocol) throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make this private?

CountConsumerCommitCallback cb = new CountConsumerCommitCallback();

try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1);
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords);


try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) {
long startingTimestamp = System.currentTimeMillis();
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1);
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp);


@BeforeEach
public void setup() throws InterruptedException {
clusterInstance.createTopic(topic, numPartitions, numReplica);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original test has replica count is 2, let's keep it.

Suggested change
clusterInstance.createTopic(topic, numPartitions, numReplica);
clusterInstance.createTopic(topic, 2, BROKER_COUNT);

types = {Type.KRAFT},
brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
serverProperties = {
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original replica factor is 3

Suggested change
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "3"),

void testAsyncAssignAndCommitSyncNotCommitted() throws Exception {
testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
}
void testAssignAndCommitSyncNotCommitted(String groupProtocol) throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void testAssignAndCommitSyncNotCommitted(String groupProtocol) throws InterruptedException {
private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {

}

@ClusterTest
void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void testClassicAssignAndCommitSyncNotCommitted() throws Exception {
public void testClassicAssignAndCommitSyncNotCommitted() {

}

@ClusterTest
void testAsyncAssignAndCommitSyncNotCommitted() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void testAsyncAssignAndCommitSyncNotCommitted() throws Exception {
public void testAsyncAssignAndCommitSyncNotCommitted() {

@TaiJuWu
Copy link
Collaborator Author

TaiJuWu commented May 27, 2025

Hi @brandboat , thanks for your detailed review and point out my mistakes, all comments are addressed, PTAL.


@BeforeEach
public void setup() throws InterruptedException {
clusterInstance.createTopic(topic, BROKER_COUNT, (short) 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the BROKER_COUNT should be replica count, right?

Suggested change
clusterInstance.createTopic(topic, BROKER_COUNT, (short) 2);
clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix it, thanks.

…ka/clients/consumer/PlaintextConsumerAssignTest.java

Co-authored-by: Kuan-Po Tseng <[email protected]>
Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @TaiJuWu for this patch, left some comments

Comment on lines 88 to 95
public static void pollUntilTrue(Consumer<byte[], byte[]> consumer,
Supplier<Boolean> testCondition,
long waitTimeMs, String msg) throws InterruptedException {
TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100));
return testCondition.get();
}, waitTimeMs, msg);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the code style used in this file.

Suggested change
public static void pollUntilTrue(Consumer<byte[], byte[]> consumer,
Supplier<Boolean> testCondition,
long waitTimeMs, String msg) throws InterruptedException {
TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100));
return testCondition.get();
}, waitTimeMs, msg);
}
public static void pollUntilTrue(
Consumer<byte[], byte[]> consumer,
Supplier<Boolean> testCondition,
long waitTimeMs,
String msg
) throws InterruptedException {
TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100));
return testCondition.get();
}, waitTimeMs, msg);
}

testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name);
}

private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use GroupProtocol instead of String

Suggested change
private void testAssignAndCommitSyncNotCommitted(String groupProtocol) {
private void testAssignAndCommitSyncNotCommitted(GroupProtocol groupProtocol) {

testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name);
}

private void testAssignAndCommitSyncAllConsumed(String groupProtocol) throws InterruptedException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void testAssignAndCommitSyncAllConsumed(String groupProtocol) throws InterruptedException {
private void testAssignAndCommitSyncAllConsumed(GroupProtocol groupProtocol) throws InterruptedException {

testAssignAndConsume(GroupProtocol.CONSUMER.name);
}

private void testAssignAndConsume(String groupProtocol) throws InterruptedException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void testAssignAndConsume(String groupProtocol) throws InterruptedException {
private void testAssignAndConsume(GroupProtocol groupProtocol) throws InterruptedException {

testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER.name);
}

private void testAssignAndConsumeSkippingPosition(String groupProtocol) throws InterruptedException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void testAssignAndConsumeSkippingPosition(String groupProtocol) throws InterruptedException {
private void testAssignAndConsumeSkippingPosition(GroupProtocol groupProtocol) throws InterruptedException {

testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CONSUMER.name);
}

private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(String groupProtocol) throws InterruptedException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(String groupProtocol) throws InterruptedException {
private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol groupProtocol) throws InterruptedException {

int numRecords = 100;
long startingTimestamp = System.currentTimeMillis();

try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test require manually setting the GROUP_ID_CONFIG?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it, thanks!``

Comment on lines +294 to +296
// Check committed offsets twice with same consumer
assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset());
assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, Why do we need to check the same consumer committed twice?

Copy link
Collaborator Author

@TaiJuWu TaiJuWu May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the test name testAssignAndRetrievingCommittedOffsetsMultipleTimes we can get this test would like to test committed offset multiple time so we need to keep the behavior .

int offset = 10;
long startingTimestamp = System.currentTimeMillis();

try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about extract the same config as a variable.

val config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1");

int numRecords = 100;
long startingTimestamp = System.currentTimeMillis();

try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about extract the same config as a variable.

val config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1");

@TaiJuWu
Copy link
Collaborator Author

TaiJuWu commented May 29, 2025

Hi @m1a2st, sorry for delay. All comments are addressed, PTAL.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clients core Kafka Broker tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants