Skip to content

Commit d50525e

Browse files
Add the possibility to configure Confluent Stream catalog limit (#402)
* Set stream catalog API request limit to 500 & make the limit a parameter of the application * Fix syntax * Simplify properties ref * Remove extra line --------- Co-authored-by: thcai <[email protected]>
1 parent 64bf07f commit d50525e

File tree

3 files changed

+58
-13
lines changed

3 files changed

+58
-13
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.michelin.ns4kafka.property;
2+
3+
import io.micronaut.context.annotation.ConfigurationProperties;
4+
import lombok.Getter;
5+
import lombok.Setter;
6+
7+
/**
8+
* Confluent Cloud properties.
9+
*/
10+
@Getter
11+
@Setter
12+
@ConfigurationProperties("ns4kafka.confluent-cloud")
13+
public class ConfluentCloudProperties {
14+
private StreamCatalogProperties streamCatalog;
15+
16+
/**
17+
* Stream Catalog properties.
18+
*/
19+
@Getter
20+
@Setter
21+
@ConfigurationProperties("stream-catalog")
22+
public static class StreamCatalogProperties {
23+
int pageSize = 500;
24+
}
25+
}

src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.michelin.ns4kafka.model.Metadata;
44
import com.michelin.ns4kafka.model.Topic;
5+
import com.michelin.ns4kafka.property.ConfluentCloudProperties;
56
import com.michelin.ns4kafka.property.ManagedClusterProperties;
67
import com.michelin.ns4kafka.repository.TopicRepository;
78
import com.michelin.ns4kafka.repository.kafka.KafkaStoreException;
@@ -62,6 +63,8 @@ public class TopicAsyncExecutor {
6263

6364
private SchemaRegistryClient schemaRegistryClient;
6465

66+
private ConfluentCloudProperties confluentCloudProperties;
67+
6568
private Admin getAdminClient() {
6669
return managedClusterProperties.getAdminClient();
6770
}
@@ -111,18 +114,17 @@ public void synchronizeTopics() {
111114
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
112115

113116
if (!createTopics.isEmpty()) {
114-
log.debug("Topic(s) to create: "
115-
+ String.join(", ", createTopics.stream().map(topic -> topic.getMetadata().getName()).toList()));
117+
log.debug("Topic(s) to create: {}", String.join(", ",
118+
createTopics.stream().map(topic -> topic.getMetadata().getName()).toList()));
116119
}
117120

118121
if (!updateTopics.isEmpty()) {
119-
log.debug("Topic(s) to update: "
120-
+ String.join(", ", updateTopics.keySet().stream().map(ConfigResource::name).toList()));
122+
log.debug("Topic(s) to update: {}", String.join(", ",
123+
updateTopics.keySet().stream().map(ConfigResource::name).toList()));
121124
for (Map.Entry<ConfigResource, Collection<AlterConfigOp>> e : updateTopics.entrySet()) {
122125
for (AlterConfigOp op : e.getValue()) {
123-
log.debug(
124-
e.getKey().name() + " " + op.opType().toString() + " " + op.configEntry().name() + "("
125-
+ op.configEntry().value() + ")");
126+
log.debug("{} {} {}({})", e.getKey().name(), op.opType().toString(),
127+
op.configEntry().name(), op.configEntry().value());
126128
}
127129
}
128130
}
@@ -271,7 +273,7 @@ public void enrichWithCatalogInfo(Map<String, Topic> topics) {
271273

272274
// getting list of topics by managing offset & limit
273275
int offset = 0;
274-
int limit = 5000;
276+
int limit = confluentCloudProperties.getStreamCatalog().getPageSize();
275277
do {
276278
topicListResponse = schemaRegistryClient.getTopicWithCatalogInfo(
277279
managedClusterProperties.getName(), limit, offset).block();
@@ -332,7 +334,7 @@ public Map<String, Topic> collectBrokerTopicsFromNames(List<String> topicNames)
332334
.build())
333335
.spec(Topic.TopicSpec.builder()
334336
.replicationFactor(
335-
topicDescriptions.get(stringMapEntry.getKey()).partitions().get(0).replicas().size())
337+
topicDescriptions.get(stringMapEntry.getKey()).partitions().getFirst().replicas().size())
336338
.partitions(topicDescriptions.get(stringMapEntry.getKey()).partitions().size())
337339
.configs(stringMapEntry.getValue())
338340
.build())

src/test/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutorTest.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
import com.michelin.ns4kafka.model.Metadata;
1616
import com.michelin.ns4kafka.model.Topic;
17+
import com.michelin.ns4kafka.property.ConfluentCloudProperties;
18+
import com.michelin.ns4kafka.property.ConfluentCloudProperties.StreamCatalogProperties;
1719
import com.michelin.ns4kafka.property.ManagedClusterProperties;
1820
import com.michelin.ns4kafka.repository.TopicRepository;
1921
import com.michelin.ns4kafka.service.client.schema.SchemaRegistryClient;
@@ -59,6 +61,12 @@ class TopicAsyncExecutorTest {
5961
@Mock
6062
ManagedClusterProperties managedClusterProperties;
6163

64+
@Mock
65+
ConfluentCloudProperties confluentCloudProperties;
66+
67+
@Mock
68+
StreamCatalogProperties streamCatalogProperties;
69+
6270
@Mock
6371
TopicRepository topicRepository;
6472

@@ -428,6 +436,10 @@ void shouldNotEnrichWithCatalogInfoWhenNotConfluentCloud() {
428436
void shouldEnrichWithCatalogInfoWhenConfluentCloud() {
429437
when(managedClusterProperties.isConfluentCloud()).thenReturn(true);
430438
when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER);
439+
when(confluentCloudProperties.getStreamCatalog()).thenReturn(streamCatalogProperties);
440+
when(streamCatalogProperties.getPageSize()).thenReturn(500);
441+
442+
int limit = 500;
431443

432444
TopicEntity entity = TopicEntity.builder()
433445
.classificationNames(List.of(TAG1))
@@ -439,9 +451,9 @@ void shouldEnrichWithCatalogInfoWhenConfluentCloud() {
439451
TopicListResponse response1 = TopicListResponse.builder().entities(List.of(entity)).build();
440452
TopicListResponse response2 = TopicListResponse.builder().entities(List.of()).build();
441453

442-
when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, 5000, 0))
454+
when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, limit, 0))
443455
.thenReturn(Mono.just(response1));
444-
when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, 5000, 5000))
456+
when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, limit, limit))
445457
.thenReturn(Mono.just(response2));
446458

447459
Map<String, Topic> brokerTopics = Map.of(
@@ -463,6 +475,10 @@ void shouldEnrichWithCatalogInfoWhenConfluentCloud() {
463475
void shouldEnrichWithCatalogInfoForMultipleTopics() {
464476
when(managedClusterProperties.isConfluentCloud()).thenReturn(true);
465477
when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER);
478+
when(confluentCloudProperties.getStreamCatalog()).thenReturn(streamCatalogProperties);
479+
when(streamCatalogProperties.getPageSize()).thenReturn(500);
480+
481+
int limit = 500;
466482

467483
TopicEntity entity1 = TopicEntity.builder()
468484
.classificationNames(List.of())
@@ -500,9 +516,9 @@ void shouldEnrichWithCatalogInfoForMultipleTopics() {
500516
.entities(List.of(entity1, entity2, entity3, entity4)).build();
501517
TopicListResponse response2 = TopicListResponse.builder().entities(List.of()).build();
502518

503-
when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, 5000, 0))
519+
when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, limit, 0))
504520
.thenReturn(Mono.just(response1));
505-
when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, 5000, 5000))
521+
when(schemaRegistryClient.getTopicWithCatalogInfo(LOCAL_CLUSTER, limit, limit))
506522
.thenReturn(Mono.just(response2));
507523

508524
Map<String, Topic> brokerTopics = Map.of(
@@ -553,6 +569,8 @@ void shouldEnrichWithCatalogInfoForMultipleTopics() {
553569
void shouldEnrichWithCatalogInfoWhenConfluentCloudAndResponseIsNull() {
554570
when(managedClusterProperties.isConfluentCloud()).thenReturn(true);
555571
when(managedClusterProperties.getName()).thenReturn(LOCAL_CLUSTER);
572+
when(confluentCloudProperties.getStreamCatalog()).thenReturn(streamCatalogProperties);
573+
when(streamCatalogProperties.getPageSize()).thenReturn(500);
556574
when(schemaRegistryClient.getTopicWithCatalogInfo(anyString(), any(Integer.class), any(Integer.class)))
557575
.thenReturn(Mono.empty());
558576

0 commit comments

Comments
 (0)