-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19042: [10/N] Move PlaintextConsumerAssignTest to clients-integration-tests module #19773
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-19042: [10/N] Move PlaintextConsumerAssignTest to clients-integration-tests module #19773
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @TaiJuWu, please resolve the conflict
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @TaiJuWu, just curious — is there a specific reason why PlaintextConsumerAssignTest.scala
is still included in this PR?
Sorry, that is an accident when I merge trunk. |
@@ -0,0 +1,4 @@ | |||
package org.apache.kafka.clients.consumer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this file :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks for checking.
String topic = "topic"; | ||
int partition = 0; | ||
int numPartitions = 3; | ||
short numReplica = 3; | ||
TopicPartition tp = new TopicPartition(topic, partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
String topic = "topic"; | |
int partition = 0; | |
int numPartitions = 3; | |
short numReplica = 3; | |
TopicPartition tp = new TopicPartition(topic, partition); | |
private final String topic = "topic"; | |
private final int partition = 0; | |
private final int numPartitions = 3; | |
private final short numReplica = 3; | |
private final TopicPartition tp = new TopicPartition(topic, partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TopicPartition tp = new TopicPartition(topic, partition);
this line is missed
} | ||
|
||
@ClusterTest | ||
void testClassicAssignAndCommitAsyncNotCommitted() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it might not be critical in test files, but could we add public
to all test methods just to keep things consistent?
} | ||
|
||
|
||
void testAssignAndCommitAsyncNotCommitted(String groupProtocol) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you make this private
?
CountConsumerCommitCallback cb = new CountConsumerCommitCallback(); | ||
|
||
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { | ||
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); | |
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords); |
|
||
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol))) { | ||
long startingTimestamp = System.currentTimeMillis(); | ||
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp, -1); | |
ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); |
|
||
@BeforeEach | ||
public void setup() throws InterruptedException { | ||
clusterInstance.createTopic(topic, numPartitions, numReplica); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original test has replica count is 2, let's keep it.
clusterInstance.createTopic(topic, numPartitions, numReplica); | |
clusterInstance.createTopic(topic, 2, BROKER_COUNT); |
types = {Type.KRAFT}, | ||
brokers = PlaintextConsumerAssignTest.BROKER_COUNT, | ||
serverProperties = { | ||
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original replica factor is 3
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), | |
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "3"), |
void testAsyncAssignAndCommitSyncNotCommitted() throws Exception { | ||
testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name); | ||
} | ||
void testAssignAndCommitSyncNotCommitted(String groupProtocol) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void testAssignAndCommitSyncNotCommitted(String groupProtocol) throws InterruptedException { | |
private void testAssignAndCommitSyncNotCommitted(String groupProtocol) { |
} | ||
|
||
@ClusterTest | ||
void testClassicAssignAndCommitSyncNotCommitted() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void testClassicAssignAndCommitSyncNotCommitted() throws Exception { | |
public void testClassicAssignAndCommitSyncNotCommitted() { |
} | ||
|
||
@ClusterTest | ||
void testAsyncAssignAndCommitSyncNotCommitted() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void testAsyncAssignAndCommitSyncNotCommitted() throws Exception { | |
public void testAsyncAssignAndCommitSyncNotCommitted() { |
Hi @brandboat , thanks for your detailed review and point out my mistakes, all comments are addressed, PTAL. |
|
||
@BeforeEach | ||
public void setup() throws InterruptedException { | ||
clusterInstance.createTopic(topic, BROKER_COUNT, (short) 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the BROKER_COUNT should be replica count, right?
clusterInstance.createTopic(topic, BROKER_COUNT, (short) 2); | |
clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix it, thanks.
…ka/clients/consumer/PlaintextConsumerAssignTest.java Co-authored-by: Kuan-Po Tseng <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @TaiJuWu for this patch, left some comments
public static void pollUntilTrue(Consumer<byte[], byte[]> consumer, | ||
Supplier<Boolean> testCondition, | ||
long waitTimeMs, String msg) throws InterruptedException { | ||
TestUtils.waitForCondition(() -> { | ||
consumer.poll(Duration.ofMillis(100)); | ||
return testCondition.get(); | ||
}, waitTimeMs, msg); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please follow the code style used in this file.
public static void pollUntilTrue(Consumer<byte[], byte[]> consumer, | |
Supplier<Boolean> testCondition, | |
long waitTimeMs, String msg) throws InterruptedException { | |
TestUtils.waitForCondition(() -> { | |
consumer.poll(Duration.ofMillis(100)); | |
return testCondition.get(); | |
}, waitTimeMs, msg); | |
} | |
public static void pollUntilTrue( | |
Consumer<byte[], byte[]> consumer, | |
Supplier<Boolean> testCondition, | |
long waitTimeMs, | |
String msg | |
) throws InterruptedException { | |
TestUtils.waitForCondition(() -> { | |
consumer.poll(Duration.ofMillis(100)); | |
return testCondition.get(); | |
}, waitTimeMs, msg); | |
} |
testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER.name); | ||
} | ||
|
||
private void testAssignAndCommitSyncNotCommitted(String groupProtocol) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use GroupProtocol
instead of String
private void testAssignAndCommitSyncNotCommitted(String groupProtocol) { | |
private void testAssignAndCommitSyncNotCommitted(GroupProtocol groupProtocol) { |
testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER.name); | ||
} | ||
|
||
private void testAssignAndCommitSyncAllConsumed(String groupProtocol) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private void testAssignAndCommitSyncAllConsumed(String groupProtocol) throws InterruptedException { | |
private void testAssignAndCommitSyncAllConsumed(GroupProtocol groupProtocol) throws InterruptedException { |
testAssignAndConsume(GroupProtocol.CONSUMER.name); | ||
} | ||
|
||
private void testAssignAndConsume(String groupProtocol) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private void testAssignAndConsume(String groupProtocol) throws InterruptedException { | |
private void testAssignAndConsume(GroupProtocol groupProtocol) throws InterruptedException { |
testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER.name); | ||
} | ||
|
||
private void testAssignAndConsumeSkippingPosition(String groupProtocol) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private void testAssignAndConsumeSkippingPosition(String groupProtocol) throws InterruptedException { | |
private void testAssignAndConsumeSkippingPosition(GroupProtocol groupProtocol) throws InterruptedException { |
testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CONSUMER.name); | ||
} | ||
|
||
private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(String groupProtocol) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(String groupProtocol) throws InterruptedException { | |
private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol groupProtocol) throws InterruptedException { |
int numRecords = 100; | ||
long startingTimestamp = System.currentTimeMillis(); | ||
|
||
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test require manually setting the GROUP_ID_CONFIG
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove it, thanks!``
// Check committed offsets twice with same consumer | ||
assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset()); | ||
assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, Why do we need to check the same consumer committed twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the test name testAssignAndRetrievingCommittedOffsetsMultipleTimes
we can get this test would like to test committed offset multiple time so we need to keep the behavior .
int offset = 10; | ||
long startingTimestamp = System.currentTimeMillis(); | ||
|
||
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about extract the same config as a variable.
val config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1");
int numRecords = 100; | ||
long startingTimestamp = System.currentTimeMillis(); | ||
|
||
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1"))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about extract the same config as a variable.
val config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol, GROUP_ID_CONFIG, "group1");
Hi @m1a2st, sorry for delay. All comments are addressed, PTAL. |
The PR do following: