Skip to content

KAFKA-19285: Added more tests in SharePartitionManagerTest #19778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 28, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.ShareSessionLimitReachedException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
Expand Down Expand Up @@ -118,6 +119,7 @@
import static kafka.server.share.DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.validateRotatedListEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand Down Expand Up @@ -273,6 +275,91 @@ public void testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequ
sharePartitionManager.newContext(groupId, reqData2, List.of(), reqMetadata2, true, CONNECTION_ID));
}

@Test
public void testNewContextThrowsErrorWhenShareSessionNotFoundOnFinalEpoch() {
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
assertThrows(ShareSessionNotFoundException.class, () -> sharePartitionManager.newContext("grp", List.of(), List.of(),
new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.FINAL_EPOCH), false, CONNECTION_ID));
}

@Test
public void testNewContextThrowsErrorWhenAcknowledgeDataPresentOnInitialEpoch() {
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();
Uuid tpId0 = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));

assertThrows(InvalidRequestException.class, () -> sharePartitionManager.newContext("grp", List.of(tp0, tp1), List.of(),
new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH), true, CONNECTION_ID));
}

@Test
public void testNewContextThrowsErrorWhenShareSessionCacheIsFullOnInitialEpoch() {
// Define a cache with max size 1
ShareSessionCache cache = new ShareSessionCache(1);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();

Uuid tpId0 = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));

String groupId = "grp";
Uuid memberId1 = Uuid.randomUuid();
Uuid memberId2 = Uuid.randomUuid();

// Create a new share session with an initial share fetch request
List<TopicIdPartition> reqData = List.of(tp0, tp1);

ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH);
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData, EMPTY_PART_LIST, reqMetadata1, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());

// Trying to create a new share session, but since cache is already full, it should throw an exception
ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH);
assertThrows(ShareSessionLimitReachedException.class, () -> sharePartitionManager.newContext("grp", reqData, EMPTY_PART_LIST,
reqMetadata2, false, "id-2"));
}

@Test
public void testNewContextExistingSessionNewRequestWithInitialEpoch() {
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();

Uuid tpId0 = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));

String groupId = "grp";
Uuid memberId = Uuid.randomUuid();
List<TopicIdPartition> reqData = List.of(tp0, tp1);

ShareRequestMetadata reqMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH);

// Create a new share session with an initial share fetch request
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData, EMPTY_PART_LIST, reqMetadata, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());
assertEquals(1, cache.size());

// Sending another request with INITIAL_EPOCH and same share session key. This should return a new ShareSessionContext
// and delete the older one.
ShareFetchContext context2 = sharePartitionManager.newContext(groupId, reqData, EMPTY_PART_LIST, reqMetadata, false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context2);
assertFalse(((ShareSessionContext) context1).isSubsequent());
assertEquals(1, cache.size());
}

@Test
public void testNewContext() {
ShareSessionCache cache = new ShareSessionCache(10);
Expand Down Expand Up @@ -372,6 +459,110 @@ public void testNewContext() {
assertEquals(0, cache.size());
}

@Test
public void testAcknowledgeSessionUpdateThrowsOnInitialEpoch() {
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();

assertThrows(InvalidShareSessionEpochException.class,
() -> sharePartitionManager.acknowledgeSessionUpdate("grp",
new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH)));
}

@Test
public void testAcknowledgeSessionUpdateThrowsWhenShareSessionNotFound() {
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();

// The share session corresponding to this memberId has not been created yet. This should throw an exception.
assertThrows(ShareSessionNotFoundException.class,
() -> sharePartitionManager.acknowledgeSessionUpdate("grp",
new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))));
}

@Test
public void testAcknowledgeSessionUpdateThrowsInvalidShareSessionEpochException() {
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();

Uuid tpId0 = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));

String groupId = "grp";
Uuid memberId = Uuid.randomUuid();

// Create a new share session with an initial share fetch request
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST,
new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());

// The expected epoch from the share session should be 1, but we are passing 2. This should throw an exception.
assertThrows(InvalidShareSessionEpochException.class,
() -> sharePartitionManager.acknowledgeSessionUpdate("grp",
new ShareRequestMetadata(memberId,
ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)))));
}

@Test
public void testAcknowledgeSessionUpdateSuccessOnSubsequentEpoch() {
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();

Uuid tpId0 = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));

String groupId = "grp";
Uuid memberId = Uuid.randomUuid();

// Create a new share session with an initial share fetch request
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST,
new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());

// The expected epoch from the share session should be 1, but we are passing 2. This should throw an exception.
assertDoesNotThrow(
() -> sharePartitionManager.acknowledgeSessionUpdate("grp",
new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH))));
}

@Test
public void testAcknowledgeSessionUpdateSuccessOnFinalEpoch() {
ShareSessionCache cache = new ShareSessionCache(10);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache)
.build();

Uuid tpId0 = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));

String groupId = "grp";
Uuid memberId = Uuid.randomUuid();

// Create a new share session with an initial share fetch request
ShareFetchContext context1 = sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST,
new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH), false, CONNECTION_ID);
assertInstanceOf(ShareSessionContext.class, context1);
assertFalse(((ShareSessionContext) context1).isSubsequent());

// The expected epoch from the share session should be 1, but we are passing 2. This should throw an exception.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we passing FINAL_EPOCH?

assertDoesNotThrow(
() -> sharePartitionManager.acknowledgeSessionUpdate("grp",
new ShareRequestMetadata(memberId, ShareRequestMetadata.FINAL_EPOCH)));
}

@Test
public void testSubsequentShareSession() {
sharePartitionManager = SharePartitionManagerBuilder.builder().build();
Expand Down