diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index f74a231e98d4a..9ce33cf0fdbb1 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.ShareSessionLimitReachedException; import org.apache.kafka.common.errors.ShareSessionNotFoundException; import org.apache.kafka.common.message.ShareAcknowledgeResponseData; import org.apache.kafka.common.message.ShareFetchResponseData; @@ -120,6 +121,7 @@ import static kafka.server.share.DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes; import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords; import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.validateRotatedListEquals; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -168,6 +170,7 @@ public class SharePartitionManagerTest { private SharePartitionManager sharePartitionManager; private static final List EMPTY_PART_LIST = List.of(); + private static final List EMPTY_ACQUIRED_RECORDS = List.of(); @BeforeEach public void setUp() { @@ -211,7 +214,7 @@ public void testNewContextReturnsFinalContextWithoutRequestData() { assertFalse(((ShareSessionContext) context1).isSubsequent()); ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, ShareRequestMetadata.FINAL_EPOCH); - ShareFetchContext context2 = sharePartitionManager.newContext(groupId, List.of(), List.of(), reqMetadata2, true, CONNECTION_ID); + ShareFetchContext context2 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, reqMetadata2, true, CONNECTION_ID); assertEquals(FinalContext.class, context2.getClass()); } @@ -273,7 +276,92 @@ public void testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequ // shareFetch is not empty, and it contains tpId1, which should return FinalContext instance since it is FINAL_EPOCH List reqData2 = List.of(new TopicIdPartition(tpId1, new TopicPartition("foo", 0))); assertInstanceOf(FinalContext.class, - sharePartitionManager.newContext(groupId, reqData2, List.of(), reqMetadata2, true, CONNECTION_ID)); + sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, true, CONNECTION_ID)); + } + + @Test + public void testNewContextThrowsErrorWhenShareSessionNotFoundOnFinalEpoch() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + assertThrows(ShareSessionNotFoundException.class, () -> sharePartitionManager.newContext("grp", EMPTY_PART_LIST, EMPTY_PART_LIST, + new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.FINAL_EPOCH), false, CONNECTION_ID)); + } + + @Test + public void testNewContextThrowsErrorWhenAcknowledgeDataPresentOnInitialEpoch() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + Uuid tpId0 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); + + assertThrows(InvalidRequestException.class, () -> sharePartitionManager.newContext("grp", List.of(tp0, tp1), EMPTY_PART_LIST, + new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH), true, CONNECTION_ID)); + } + + @Test + public void testNewContextThrowsErrorWhenShareSessionCacheIsFullOnInitialEpoch() { + // Define a cache with max size 1 + ShareSessionCache cache = new ShareSessionCache(1); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + Uuid tpId0 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); + + String groupId = "grp"; + Uuid memberId1 = Uuid.randomUuid(); + Uuid memberId2 = Uuid.randomUuid(); + + // Create a new share session with an initial share fetch request + List reqData = List.of(tp0, tp1); + + ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH); + ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID); + assertInstanceOf(ShareSessionContext.class, context1); + assertFalse(((ShareSessionContext) context1).isSubsequent()); + + // Trying to create a new share session, but since cache is already full, it should throw an exception + ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH); + assertThrows(ShareSessionLimitReachedException.class, () -> sharePartitionManager.newContext("grp", reqData, EMPTY_PART_LIST, + reqMetadata2, false, "id-2")); + } + + @Test + public void testNewContextExistingSessionNewRequestWithInitialEpoch() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + Uuid tpId0 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); + + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + List reqData = List.of(tp0, tp1); + + ShareRequestMetadata reqMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH); + + // Create a new share session with an initial share fetch request + ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData, EMPTY_PART_LIST, reqMetadata, false, CONNECTION_ID); + assertInstanceOf(ShareSessionContext.class, context1); + assertFalse(((ShareSessionContext) context1).isSubsequent()); + assertEquals(1, cache.size()); + + // Sending another request with INITIAL_EPOCH and same share session key. This should return a new ShareSessionContext + // and delete the older one. + ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData, EMPTY_PART_LIST, reqMetadata, false, CONNECTION_ID); + assertInstanceOf(ShareSessionContext.class, context2); + assertFalse(((ShareSessionContext) context1).isSubsequent()); + assertEquals(1, cache.size()); } @Test @@ -326,7 +414,7 @@ public void testNewContext() { new ShareRequestMetadata(memberId4, 1), true, "id-3")); // Continue the first share session we created. - ShareFetchContext context5 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST, + ShareFetchContext context5 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 1), true, CONNECTION_ID); assertInstanceOf(ShareSessionContext.class, context5); assertTrue(((ShareSessionContext) context5).isSubsequent()); @@ -348,14 +436,14 @@ public void testNewContext() { new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true, CONNECTION_ID)); // Test generating a throttled response for a subsequent share session - ShareFetchContext context7 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST, + ShareFetchContext context7 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true, CONNECTION_ID); ShareFetchResponse resp7 = context7.throttleResponse(100); assertEquals(Errors.NONE, resp7.error()); assertEquals(100, resp7.throttleTimeMs()); // Get the final share session. - ShareFetchContext context8 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST, + ShareFetchContext context8 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata2.memberId(), ShareRequestMetadata.FINAL_EPOCH), true, CONNECTION_ID); assertEquals(FinalContext.class, context8.getClass()); assertEquals(1, cache.size()); @@ -375,6 +463,110 @@ public void testNewContext() { assertEquals(0, cache.size()); } + @Test + public void testAcknowledgeSessionUpdateThrowsOnInitialEpoch() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + assertThrows(InvalidShareSessionEpochException.class, + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH))); + } + + @Test + public void testAcknowledgeSessionUpdateThrowsWhenShareSessionNotFound() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + // The share session corresponding to this memberId has not been created yet. This should throw an exception. + assertThrows(ShareSessionNotFoundException.class, + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))); + } + + @Test + public void testAcknowledgeSessionUpdateThrowsInvalidShareSessionEpochException() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + Uuid tpId0 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); + + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + + // Create a new share session with an initial share fetch request + ShareFetchContext context1 = sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST, + new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID); + assertInstanceOf(ShareSessionContext.class, context1); + assertFalse(((ShareSessionContext) context1).isSubsequent()); + + // The expected epoch from the share session should be 1, but we are passing 2. This should throw an exception. + assertThrows(InvalidShareSessionEpochException.class, + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(memberId, + ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))))); + } + + @Test + public void testAcknowledgeSessionUpdateSuccessOnSubsequentEpoch() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + Uuid tpId0 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); + + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + + // Create a new share session with an initial share fetch request + ShareFetchContext context1 = sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST, + new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID); + assertInstanceOf(ShareSessionContext.class, context1); + assertFalse(((ShareSessionContext) context1).isSubsequent()); + + // The expected epoch from the share session should be 1, and we are passing the same. So, execution should be successful. + assertDoesNotThrow( + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))); + } + + @Test + public void testAcknowledgeSessionUpdateSuccessOnFinalEpoch() { + ShareSessionCache cache = new ShareSessionCache(10); + sharePartitionManager = SharePartitionManagerBuilder.builder() + .withCache(cache) + .build(); + + Uuid tpId0 = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); + + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + + // Create a new share session with an initial share fetch request + ShareFetchContext context1 = sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST, + new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID); + assertInstanceOf(ShareSessionContext.class, context1); + assertFalse(((ShareSessionContext) context1).isSubsequent()); + + // The expected epoch from the share session should be 1, but we are passing the Final Epoch (-1). This should throw an exception. + assertDoesNotThrow( + () -> sharePartitionManager.acknowledgeSessionUpdate("grp", + new ShareRequestMetadata(memberId, ShareRequestMetadata.FINAL_EPOCH))); + } + @Test public void testSubsequentShareSession() { sharePartitionManager = SharePartitionManagerBuilder.builder().build(); @@ -472,7 +664,7 @@ public void testZeroSizeShareSession() { List removed2 = new ArrayList<>(); removed2.add(foo0); removed2.add(foo1); - ShareFetchContext context2 = sharePartitionManager.newContext(groupId, List.of(), removed2, + ShareFetchContext context2 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, removed2, new ShareRequestMetadata(reqMetadata1.memberId(), 1), true, CONNECTION_ID); assertInstanceOf(ShareSessionContext.class, context2); @@ -505,7 +697,7 @@ public void testToForgetPartitions() { mockUpdateAndGenerateResponseData(context1, groupId, reqMetadata1.memberId()); - ShareFetchContext context2 = sharePartitionManager.newContext(groupId, List.of(), List.of(foo), + ShareFetchContext context2 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, List.of(foo), new ShareRequestMetadata(reqMetadata1.memberId(), 1), true, CONNECTION_ID); // So foo is removed but not the others. @@ -513,9 +705,9 @@ public void testToForgetPartitions() { mockUpdateAndGenerateResponseData(context2, groupId, reqMetadata1.memberId()); - ShareFetchContext context3 = sharePartitionManager.newContext(groupId, List.of(), List.of(bar), + ShareFetchContext context3 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, List.of(bar), new ShareRequestMetadata(reqMetadata1.memberId(), 2), true, CONNECTION_ID); - assertPartitionsPresent((ShareSessionContext) context3, List.of()); + assertPartitionsPresent((ShareSessionContext) context3, EMPTY_PART_LIST); } // This test simulates a share session where the topic ID changes broker side (the one handling the request) in both the metadata cache and the log @@ -556,7 +748,7 @@ public void testShareSessionUpdateTopicIdsBrokerSide() { assertEquals(2, resp1.responseData(topicNames).size()); // Create a subsequent share fetch request as though no topics changed. - ShareFetchContext context2 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST, + ShareFetchContext context2 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata1.memberId(), 1), true, CONNECTION_ID); assertInstanceOf(ShareSessionContext.class, context2); @@ -620,7 +812,7 @@ public void testGetErroneousAndValidTopicIdPartitions() { new ShareRequestMetadata(Uuid.randomUuid(), 1), true, CONNECTION_ID)); // Continue the first share session we created. - ShareFetchContext context5 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST, + ShareFetchContext context5 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 1), true, CONNECTION_ID); assertInstanceOf(ShareSessionContext.class, context5); assertTrue(((ShareSessionContext) context5).isSubsequent()); @@ -646,12 +838,12 @@ public void testGetErroneousAndValidTopicIdPartitions() { assertErroneousAndValidTopicIdPartitions(context7.getErroneousAndValidTopicIdPartitions(), List.of(tpNull1, tpNull2), List.of(tp0, tp1)); // Get the final share session. - ShareFetchContext context8 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST, + ShareFetchContext context8 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata2.memberId(), ShareRequestMetadata.FINAL_EPOCH), true, CONNECTION_ID); assertEquals(FinalContext.class, context8.getClass()); assertEquals(1, cache.size()); - assertErroneousAndValidTopicIdPartitions(context8.getErroneousAndValidTopicIdPartitions(), List.of(), List.of()); + assertErroneousAndValidTopicIdPartitions(context8.getErroneousAndValidTopicIdPartitions(), EMPTY_PART_LIST, EMPTY_PART_LIST); // Check for throttled response ShareFetchResponse resp8 = context8.throttleResponse(100); assertEquals(Errors.NONE, resp8.error()); @@ -740,7 +932,7 @@ public void testShareFetchContextResponseSize() { new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true, CONNECTION_ID)); // Test generating a throttled response for a subsequent share session - ShareFetchContext context7 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST, + ShareFetchContext context7 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true, CONNECTION_ID); int respSize7 = context7.responseSize(respData2, version); @@ -751,7 +943,7 @@ public void testShareFetchContextResponseSize() { assertEquals(4 + new ShareFetchResponseData().size(objectSerializationCache, version), respSize7); // Get the final share session. - ShareFetchContext context8 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST, + ShareFetchContext context8 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata2.memberId(), ShareRequestMetadata.FINAL_EPOCH), true, CONNECTION_ID); assertEquals(FinalContext.class, context8.getClass()); assertEquals(1, cache.size()); @@ -879,7 +1071,7 @@ public void testCachedTopicPartitionsForValidShareSessions() { assertEquals(List.of(tp3), sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId2)); // Get the final share session. - ShareFetchContext context5 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST, + ShareFetchContext context5 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata1.memberId(), ShareRequestMetadata.FINAL_EPOCH), true, CONNECTION_ID); assertEquals(FinalContext.class, context5.getClass()); @@ -894,7 +1086,7 @@ public void testCachedTopicPartitionsForValidShareSessions() { assertTrue(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId1).isEmpty()); // Continue the second share session . - ShareFetchContext context6 = sharePartitionManager.newContext(groupId, List.of(), List.of(tp3), + ShareFetchContext context6 = sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, List.of(tp3), new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true, CONNECTION_ID); assertInstanceOf(ShareSessionContext.class, context6); assertTrue(((ShareSessionContext) context6).isSubsequent()); @@ -903,7 +1095,7 @@ public void testCachedTopicPartitionsForValidShareSessions() { ShareFetchResponse resp6 = context6.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData6); assertEquals(Errors.NONE, resp6.error()); - assertEquals(List.of(), sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId2)); + assertEquals(EMPTY_PART_LIST, sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId2)); } @Test @@ -1961,7 +2153,7 @@ public void testReleaseSessionCompletesDelayedShareFetchRequest() { when(sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString(memberId))).thenReturn(List.of(tp1, tp3)); doAnswer(invocation -> buildLogReadResult(List.of(tp1))).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); - when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(new ShareAcquiredRecords(List.of(), 0)); + when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(new ShareAcquiredRecords(EMPTY_ACQUIRED_RECORDS, 0)); // Release acquired records on session close request for tp1 and tp3. sharePartitionManager.releaseSession(groupId, memberId); @@ -2531,7 +2723,7 @@ public void testSharePartitionPartialInitializationFailure() throws Exception { when(sp1.maybeAcquireFetchLock(any())).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(true); when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); - when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(new ShareAcquiredRecords(List.of(), 0)); + when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn(new ShareAcquiredRecords(EMPTY_ACQUIRED_RECORDS, 0)); // Fail initialization for tp2. SharePartition sp2 = mock(SharePartition.class);