-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19317 : Refactor ShareConsumerTest::waitedPoll to work with multiple records. #19789
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, minor comments.
long pollMs, | ||
int recordCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: needs indentation correction.
@@ -2977,6 +2984,28 @@ private ConsumerRecords<byte[], byte[]> waitedPoll( | |||
return waitedPoll(shareConsumer, pollMs, recordCount, false, "", List.of()); | |||
} | |||
|
|||
private List<ConsumerRecord<byte[], byte[]>> waitedPollMultipleRecords(ShareConsumer<byte[], byte[]> shareConsumer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private List<ConsumerRecord<byte[], byte[]>> waitedPollMultipleRecords(ShareConsumer<byte[], byte[]> shareConsumer, | |
private List<ConsumerRecord<byte[], byte[]>> waitedPollForMultipleRecords(ShareConsumer<byte[], byte[]> shareConsumer, |
}, | ||
DEFAULT_MAX_WAIT_MS, | ||
500L, | ||
() -> "failed to get records" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we log how many records recieved vs needed? It will be easier to debug in future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I have changed the log line now. Thanks.
@ShivsundarR Please avoid force pushing once the PR is under review and has previous comments. |
Hi @apoorvmittal10 , yes apologies, I was trying to fix the build failure by rebasing with master, and then realised should have added a merge commit. |
Are we not doing producer.flush preior reading, Then why still we get subset of records in poll?
Whynot to change the implementation of existing |
We are doing a
As we got this exception for the test, it means the
We could modify the existing function, I thought to have |
There is also this problem where there are a few other tests which expect more than 1 record in a poll using |
The |
Yes it should not be needed now, I will update the code. Thanks. |
I noticed @adixitconfluent 's PR here which adds a Initially I thought this could be a race condition in the client, but it seems unlikely now. We can observe the builds on AK to see if the flakiness persists and then we can close this PR if it is resolved. We can keep the PR open till then. |
What
https://issues.apache.org/jira/browse/KAFKA-19317
One of the test failed due to a possible race condition
in
waitedPoll()
where we expect 2 records and we get only 1 record onthe first
poll()
. This record wasn't acknowledged before the nextpoll()
which is not allowed when share.acknowledgement.mode is set to
"explicit". Hence the
IllegalStateException
was thrown.To fix this, I have refactored the test to produce and consume 1 record
each in
succession as that is more deterministic.
On digging into the reason for this flakiness, I noticed there might
be
a race condition in
waitedPoll()
where we might get some records onthe first
poll()
and some on later calls.waitedPoll() does not cumulatively add up the records received across
the different polls, it
retries until one
poll()
gives the exact number of records that itexpects.
So when we expect more than 1 record in
waitedPoll
, then there is achance of
records getting split across polls if
ShareFetchBuffer
does not haveall the records yet.
I have added a separate function which will cumulate the records across
the polls and
return the result and used this for tests which were calling
waitedPoll()
expecting multiple records.Some of these modified tests are using "
explicit
" mode, so we shouldideally refactor these tests too to expect 1 only record at a time.
These tests have clean runs in develocity though, so I have not modified
these tests in this PR. We can modify them if we observe flakiness for
them in future.