Skip to content

KAFKA-19317 : Refactor ShareConsumerTest::waitedPoll to work with multiple records. #19789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from

Conversation

ShivsundarR
Copy link
Collaborator

@ShivsundarR ShivsundarR commented May 22, 2025

What
https://issues.apache.org/jira/browse/KAFKA-19317

  • One of the test failed due to a possible race condition
    inwaitedPoll() where we expect 2 records and we get only 1 record on
    the first poll(). This record wasn't acknowledged before the next
    poll()
    which is not allowed when share.acknowledgement.mode is set to
    "explicit". Hence the IllegalStateException was thrown.
    To fix this, I have refactored the test to produce and consume 1 record
    each in
    succession as that is more deterministic.

  • On digging into the reason for this flakiness, I noticed there might
    be
    a race condition in waitedPoll() where we might get some records on
    the first poll() and some on later calls.
    waitedPoll() does not cumulatively add up the records received across
    the different polls, it
    retries until one poll() gives the exact number of records that it
    expects.
    So when we expect more than 1 record in waitedPoll, then there is a
    chance of
    records getting split across polls ifShareFetchBuffer does not have
    all the records yet.
    I have added a separate function which will cumulate the records across
    the polls and
    return the result and used this for tests which were calling
    waitedPoll() expecting multiple records.

  • Some of these modified tests are using "explicit" mode, so we should
    ideally refactor these tests too to expect 1 only record at a time.
    These tests have clean runs in develocity though, so I have not modified
    these tests in this PR. We can modify them if we observe flakiness for
    them in future.

@github-actions github-actions bot added triage PRs from the community tests Test fixes (including flaky tests) clients small Small PRs labels May 22, 2025
@ShivsundarR ShivsundarR added ci-approved tests Test fixes (including flaky tests) KIP-932 Queues for Kafka clients and removed tests Test fixes (including flaky tests) clients small Small PRs triage PRs from the community labels May 22, 2025
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, minor comments.

Comment on lines 2988 to 2989
long pollMs,
int recordCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: needs indentation correction.

@@ -2977,6 +2984,28 @@ private ConsumerRecords<byte[], byte[]> waitedPoll(
return waitedPoll(shareConsumer, pollMs, recordCount, false, "", List.of());
}

private List<ConsumerRecord<byte[], byte[]>> waitedPollMultipleRecords(ShareConsumer<byte[], byte[]> shareConsumer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private List<ConsumerRecord<byte[], byte[]>> waitedPollMultipleRecords(ShareConsumer<byte[], byte[]> shareConsumer,
private List<ConsumerRecord<byte[], byte[]>> waitedPollForMultipleRecords(ShareConsumer<byte[], byte[]> shareConsumer,

},
DEFAULT_MAX_WAIT_MS,
500L,
() -> "failed to get records"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we log how many records recieved vs needed? It will be easier to debug in future?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I have changed the log line now. Thanks.

@github-actions github-actions bot added the small Small PRs label May 23, 2025
@apoorvmittal10
Copy link
Contributor

@ShivsundarR Please avoid force pushing once the PR is under review and has previous comments.

@ShivsundarR
Copy link
Collaborator Author

ShivsundarR commented May 23, 2025

Hi @apoorvmittal10 , yes apologies, I was trying to fix the build failure by rebasing with master, and then realised should have added a merge commit.
It seems build failure is addressed in this PR - #19792.
I will add a merge commit once this PR is merged, that should fix the build failure.

@apoorvmittal10
Copy link
Contributor

  • One of the test failed due to a possible race condition
    inwaitedPoll() where we expect 2 records and we get only 1 record on
    the first poll(). This record wasn't acknowledged before the next
    poll()

Are we not doing producer.flush preior reading, Then why still we get subset of records in poll?

I have added a separate function which will cumulate the records across

Whynot to change the implementation of existing waitdPoll method?

@ShivsundarR
Copy link
Collaborator Author

We are doing a producer.flush, I probably expect its a race condition when the ShareFetchBuffer where both background and application thread are writing and reading from once the response is received.

java.lang.IllegalStateException: All records must be acknowledged in explicit acknowledgement mode. 2025-05-17T16:09:18.3314782Z at 

As we got this exception for the test, it means the waitedPoll is getting partial records across multiple polls, which have not been acknowledged in explicit mode. Hence the exception is thrown. So I deduced that for multiple records, waitedPoll() might not be deterministic as it expects all records in a single go.

Whynot to change the implementation of existing waitedPoll method?

We could modify the existing function, I thought to have waitedPoll() maybe only for checking 1 record, and then a separate implementation to check for multiple records. As most of the tests in the suite only check for 1 record, they can use a straightforward implementation in waitedPoll. Does that sound good?

@ShivsundarR
Copy link
Collaborator Author

ShivsundarR commented May 23, 2025

Some of these modified tests are using "explicit" mode, so we should
ideally refactor these tests too to expect 1 only record at a time.
These tests have clean runs in develocity though, so I have not modified
these tests in this PR. We can modify them if we observe flakiness for
them in future.

There is also this problem where there are a few other tests which expect more than 1 record in a poll using explicit mode. These are are not flaky as of now, but we might need to modify them to test 1 record at a time to be deterministic.
If we introduce AcknowledgeType.RETAIN or change the default way to not throw an exception in future, then these tests are good, but for now we can probably monitor and change if required.

@apoorvmittal10
Copy link
Contributor

I thought to have waitedPoll() maybe only for checking 1 record

The waitdPoll method also takes recordsCount, so should it not be needed now?

@ShivsundarR
Copy link
Collaborator Author

ShivsundarR commented May 23, 2025

Yes it should not be needed now, I will update the code. Thanks.

@github-actions github-actions bot removed the small Small PRs label May 23, 2025
@ShivsundarR
Copy link
Collaborator Author

I noticed @adixitconfluent 's PR here which adds a future.get() to produceAbortedTransaction(). This could explain the flakiness in testAlterReadUncommittedToReadCommittedIsolationLevel as well. If the abort transaction does not complete in time, then we could possibly see these records in a different poll().
So that PR should fix the problem and we would get both the records on a single poll(). It is now merged.
This also explains why the other tests which use explicit mode and expect multiple records are not flaky.

Initially I thought this could be a race condition in the client, but it seems unlikely now. We can observe the builds on AK to see if the flakiness persists and then we can close this PR if it is resolved. We can keep the PR open till then.
The refactors in waitedPoll() should also not be required as we would ideally get all the produced records in a single poll().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved clients KIP-932 Queues for Kafka tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants