-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19268 Missing mocks for SharePartitionManagerTest tests #19786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Overall looks fine. A few comments.
For testReleaseSessionCompletesDelayedShareFetchRequest
-> you can just mock behaviour for releaseAcquiredRecords
for tp3. Example - when(sp3.releaseAcquiredRecords(ArgumentMatchers.eq(memberId))).thenReturn(CompletableFuture.completedFuture(null));
that should be enough
For testReleaseSessionSuccess
, I agree that getting the exception is intentional. So, we don't need a change there
static Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResultWithFakeRecords(List<TopicIdPartition> topicIdPartitions) { | ||
List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new ArrayList<>(); | ||
for (TopicIdPartition topicIdPartition : topicIdPartitions) { | ||
MemoryRecords records = MemoryRecords.withRecords( | ||
Compression.NONE, | ||
new SimpleRecord("test-key".getBytes(), "test-value".getBytes()) | ||
); | ||
|
||
LogReadResult logReadResult = new LogReadResult( | ||
new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), records), | ||
Option.empty(), | ||
-1L, -1L, -1L, -1L, -1L, | ||
Option.empty(), Option.empty(), Option.empty() | ||
); | ||
|
||
logReadResults.add(new Tuple2<>(topicIdPartition, logReadResult)); | ||
} | ||
|
||
return CollectionConverters.asScala(logReadResults).toSeq(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make the change from MemoryRecords.EMPTY
to MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("test-key".getBytes(), "test-value".getBytes()))
in the function buildLogReadResult
itself.
.when(mockReplicaManager) | ||
.readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Extra tabs on both lines
partitionCache.put(new SharePartitionKey(groupId, tp0), sp0); | ||
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1); | ||
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2); | ||
partitionCache.put(new SharePartitionKey(groupId, tp3), sp3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need sp3
. releaseSession is only called for tp0, tp1, and tp2. So, just mocking these 3 should work.
jira:
https://issues.apache.org/jira/browse/KAFKA-19268
In jira, there are 5 unit
tests(testAcknowledgeCompletesDelayedShareFetchRequest,
testMultipleConcurrentShareFetches
testCachedTopicPartitionsForValidShareSessions,
testReleaseSessionCompletesDelayedShareFetchRequest,
testReleaseSessionSuccess) that print exceptions.
testAcknowledgeCompletesDelayedShareFetchRequest
[problem]
acknowleged, otherwise error occurs.
[solution]
testMultipleConcurrentShareFetches
[problem]
maybeSliceFetchRecords.
at
org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52)
~[kafka-clients-4.1.0-SNAPSHOT.jar:?]
at
kafka.server.share.ShareFetchUtils.maybeSliceFetchRecords(ShareFetchUtils.java:219)
[main/:?]
at
kafka.server.share.ShareFetchUtils.processFetchResponse(ShareFetchUtils.java:132)
[main/:?]
[solution]
doAnswer chaning (line 1075 ~ 1105) works well in multi threading test.
Even though I changed values of assertEquals in doAnswer chaining to
random value, test passed as well. It needs to be checked that chaining
works well even in multi thread test.
testCachedTopicPartitionsForValidShareSessions
[problem]
[solution]
testReleaseSessionCompletesDelayedShareFetchRequest
[problem]
'sharePartitionManager.releaseSession(groupId, memberId);' which leads
to exception.
[solution]
testReleaseSessionSuccess
[problem]
'sharePartitionManager.releaseSession(groupId, memberId.toString());'
which leads to exception.
[solution]