Skip to content

Expose all properties in KafkaStreamsExecutionContext class #362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected KafkaStreamsStarterTest() {}
void generalSetUp() {
Properties properties = getProperties();

KafkaStreamsExecutionContext.registerProperties(properties);
KafkaStreamsExecutionContext.registerProperties(properties, null);

String schemaRegistryUrl = properties.getProperty(SCHEMA_REGISTRY_URL_CONFIG);
KafkaStreamsExecutionContext.setSerdesConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.michelin.kstreamplify.serde.TopicWithSerde.SELF;
import static com.michelin.kstreamplify.topic.TopicUtils.PREFIX_PROPERTY_NAME;

import com.michelin.kstreamplify.property.PropertiesUtils;
import java.util.Map;
import java.util.Properties;
import lombok.Getter;
Expand All @@ -45,6 +46,10 @@ public class KafkaStreamsExecutionContext {
@Setter
private static Properties properties;

@Getter
@Setter
private static Properties kafkaProperties;

@Getter
private static String prefix;

Expand All @@ -55,18 +60,20 @@ private KafkaStreamsExecutionContext() {}
*
* @param properties The Kafka properties
*/
public static void registerProperties(Properties properties) {
public static void registerProperties(Properties properties, Properties kafkaProperties) {
if (properties == null) {
return;
}

prefix = properties.getProperty(PREFIX_PROPERTY_NAME + PROPERTY_SEPARATOR + SELF, "");
KafkaStreamsExecutionContext.properties = PropertiesUtils.removeKafkaPrefix(properties);
KafkaStreamsExecutionContext.kafkaProperties = kafkaProperties;

KafkaStreamsExecutionContext.prefix =
properties.getProperty(PREFIX_PROPERTY_NAME + PROPERTY_SEPARATOR + SELF, "");
if (StringUtils.isNotBlank(prefix) && properties.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
properties.setProperty(
KafkaStreamsExecutionContext.properties.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
prefix.concat(properties.getProperty(StreamsConfig.APPLICATION_ID_CONFIG)));
}

KafkaStreamsExecutionContext.properties = properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ protected void initProperties() {
properties = PropertiesUtils.loadProperties();
serverPort = (Integer) properties.get(SERVER_PORT_PROPERTY_NAME);
kafkaProperties = PropertiesUtils.loadKafkaProperties(properties);
KafkaStreamsExecutionContext.registerProperties(kafkaProperties);
KafkaStreamsExecutionContext.registerProperties(properties, kafkaProperties);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,35 @@ public static Properties loadProperties() {
}

/**
* Get the Kafka properties only from the given properties.
* Get the Kafka properties from the given properties, without the kafka properties prefix.
*
* @param props The properties
* @return The Kafka properties
*/
public static Properties loadKafkaProperties(Properties props) {
Properties resultProperties = new Properties();
for (var prop : props.entrySet()) {
if (StringUtils.contains(prop.getKey().toString(), KAFKA_PROPERTIES_PREFIX)) {
resultProperties.put(
StringUtils.remove(prop.getKey().toString(), KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR),
prop.getValue());
}
}
props.keySet().stream()
.filter(key -> key.toString().startsWith(KAFKA_PROPERTIES_PREFIX))
.forEach(key -> resultProperties.put(
key.toString().replaceAll("^" + KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR, ""),
props.get(key).toString()));

return resultProperties;
}

/**
* Remove the kafka properties prefix from the given properties keys.
*
* @param props The properties
* @return The updated properties
*/
public static Properties removeKafkaPrefix(Properties props) {
Properties resultProperties = new Properties();
props.keySet()
.forEach(key -> resultProperties.put(
key.toString().replaceAll("^" + KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR, ""),
props.get(key).toString()));

return resultProperties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ void setUp() {

@Test
void shouldNotRegisterPropertiesWhenNull() {
KafkaStreamsExecutionContext.registerProperties(null);
KafkaStreamsExecutionContext.registerProperties(null, null);
assertNull(KafkaStreamsExecutionContext.getProperties());
assertNull(KafkaStreamsExecutionContext.getKafkaProperties());
}

@Test
Expand All @@ -45,7 +46,7 @@ void shouldAddPrefixToAppId() {
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
properties.put("prefix.self", "abc.");

KafkaStreamsExecutionContext.registerProperties(properties);
KafkaStreamsExecutionContext.registerProperties(properties, null);

assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix());
assertEquals(
Expand All @@ -57,7 +58,7 @@ void shouldNotAddPrefixToAppIdIfNoPrefix() {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");

KafkaStreamsExecutionContext.registerProperties(properties);
KafkaStreamsExecutionContext.registerProperties(properties, null);

assertEquals("", KafkaStreamsExecutionContext.getPrefix());
assertEquals("appId", KafkaStreamsExecutionContext.getProperties().get(StreamsConfig.APPLICATION_ID_CONFIG));
Expand All @@ -68,9 +69,43 @@ void shouldNotAddPrefixToAppIdIfNotAppId() {
Properties properties = new Properties();
properties.put("prefix.self", "abc.");

KafkaStreamsExecutionContext.registerProperties(properties);
KafkaStreamsExecutionContext.registerProperties(properties, null);

assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix());
assertNull(KafkaStreamsExecutionContext.getProperties().get(StreamsConfig.APPLICATION_ID_CONFIG));
}

@Test
void shouldRegisterNonKafkaPropertiesInProperties() {
Properties properties = new Properties();
properties.put("notKafka.properties.prop", "propValue");

KafkaStreamsExecutionContext.registerProperties(properties, null);

assertEquals("propValue", KafkaStreamsExecutionContext.getProperties().get("notKafka.properties.prop"));
assertNull(KafkaStreamsExecutionContext.getKafkaProperties());
}

@Test
void shouldRegisterKafkaPropertiesInProperties() {
Properties properties = new Properties();
properties.put("kafka.properties.kafkaProp", "kafkaPropValue");

KafkaStreamsExecutionContext.registerProperties(properties, null);

assertEquals(
"kafkaPropValue", KafkaStreamsExecutionContext.getProperties().get("kafkaProp"));
assertNull(KafkaStreamsExecutionContext.getProperties().get("kafka.properties.kafkaProp"));
}

@Test
void shouldRegisterKafkaPropertiesInKafkaProperties() {
Properties kafkaProperties = new Properties();
kafkaProperties.put("kafkaProp", "propValue");

KafkaStreamsExecutionContext.registerProperties(new Properties(), kafkaProperties);

assertEquals(
"propValue", KafkaStreamsExecutionContext.getKafkaProperties().get("kafkaProp"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mockStatic;

import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.property.PropertiesUtils;
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -61,9 +60,6 @@ void shouldInitProperties() {
assertNotNull(initializer.getProperties());
assertEquals(8080, initializer.getServerPort());
assertTrue(initializer.getKafkaProperties().containsKey(StreamsConfig.APPLICATION_ID_CONFIG));
assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix());
assertEquals(
"abc.appId", KafkaStreamsExecutionContext.getProperties().get(StreamsConfig.APPLICATION_ID_CONFIG));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
class KafkaStreamsStarterTest {
@Test
void shouldInstantiateKafkaStreamsStarter() {
KafkaStreamsExecutionContext.registerProperties(new Properties());
KafkaStreamsExecutionContext.registerProperties(new Properties(), null);
KafkaStreamsExecutionContext.setSerdesConfig(
Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://"));

Expand All @@ -64,7 +64,7 @@ void shouldInstantiateKafkaStreamsStarter() {

@Test
void shouldStartWithCustomUncaughtExceptionHandler() {
KafkaStreamsExecutionContext.registerProperties(new Properties());
KafkaStreamsExecutionContext.registerProperties(new Properties(), null);
KafkaStreamsExecutionContext.setSerdesConfig(
Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ protected void initProperties() {
Properties convertedAdditionalProperties = new Properties();
convertedAdditionalProperties.putAll(additionalProperties);
kafkaProperties.putAll(PropertiesUtils.loadKafkaProperties(convertedAdditionalProperties));
KafkaStreamsExecutionContext.registerProperties(kafkaProperties);
KafkaStreamsExecutionContext.registerProperties(properties, kafkaProperties);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package com.michelin.kstreamplify.property;

import static com.michelin.kstreamplify.property.PropertiesUtils.KAFKA_PROPERTIES_PREFIX;
import static com.michelin.kstreamplify.property.PropertiesUtils.PROPERTY_SEPARATOR;
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Properties;
Expand All @@ -33,7 +36,7 @@ void shouldLoadProperties() {
assertTrue(properties.containsKey("server.port"));
assertTrue(properties.containsValue(8080));

assertTrue(properties.containsKey("kafka.properties." + APPLICATION_ID_CONFIG));
assertTrue(properties.containsKey(KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR + APPLICATION_ID_CONFIG));
assertTrue(properties.containsValue("appId"));
}

Expand All @@ -44,4 +47,38 @@ void shouldLoadKafkaProperties() {
assertTrue(properties.containsKey(APPLICATION_ID_CONFIG));
assertTrue(properties.containsValue("appId"));
}

@Test
void shouldRemoveKafkaPrefix() {
Properties prop = new Properties();
prop.put(KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR + "kafkaProp", "propValue");
Properties resultProperties = PropertiesUtils.removeKafkaPrefix(prop);

assertTrue(resultProperties.containsKey("kafkaProp"));
assertTrue(resultProperties.containsValue("propValue"));
assertFalse(resultProperties.containsKey(KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR + "kafkaProp"));
}

@Test
void shouldNotRemoveKafkaPrefix() {
Properties prop = new Properties();
prop.put("another.properties.prop", "propValue");
Properties resultProperties = PropertiesUtils.removeKafkaPrefix(prop);

assertTrue(resultProperties.containsKey("another.properties.prop"));
assertTrue(resultProperties.containsValue("propValue"));
assertFalse(resultProperties.containsKey("prop"));
}

@Test
void shouldNotRemoveKafkaPropertiesStringWhenNotPrefix() {
Properties prop = new Properties();
prop.put("prefix." + KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR + "kafkaProp", "propValue");
Properties resultProperties = PropertiesUtils.removeKafkaPrefix(prop);

assertTrue(
resultProperties.containsKey("prefix." + KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR + "kafkaProp"));
assertTrue(resultProperties.containsValue("propValue"));
assertFalse(resultProperties.containsKey("prefix.kafkaProp"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void setUp() {
void testSetConfigWithDefaultValues() {
Map<String, Object> configs = new HashMap<>();
RocksDbConfig rocksDbConfig = new RocksDbConfig();
KafkaStreamsExecutionContext.registerProperties(new Properties());
KafkaStreamsExecutionContext.registerProperties(new Properties(), null);

rocksDbConfig.setConfig("storeName", options, configs);

Expand Down Expand Up @@ -79,7 +79,7 @@ void testSetConfigWithCustomValues() {
configs.put(RocksDbConfig.ROCKSDB_COMPRESSION_TYPE_CONFIG, compressionType);
Properties properties = new Properties();
properties.putAll(configs);
KafkaStreamsExecutionContext.registerProperties(properties);
KafkaStreamsExecutionContext.registerProperties(properties, null);

RocksDbConfig rocksDbConfig = new RocksDbConfig();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TopicWithSerdeTest {

@Test
void shouldCreateTopicWithSerde() {
KafkaStreamsExecutionContext.registerProperties(new Properties());
KafkaStreamsExecutionContext.registerProperties(new Properties(), null);

TopicWithSerde<String, String> topicWithSerde =
new TopicWithSerde<>("INPUT_TOPIC", Serdes.String(), Serdes.String());
Expand All @@ -44,7 +44,7 @@ void shouldCreateTopicWithSerdeWithPrefix() {
Properties properties = new Properties();
properties.put("prefix.self", "abc.");

KafkaStreamsExecutionContext.registerProperties(properties);
KafkaStreamsExecutionContext.registerProperties(properties, null);

TopicWithSerde<String, String> topicWithSerde =
new TopicWithSerde<>("INPUT_TOPIC", Serdes.String(), Serdes.String());
Expand All @@ -55,7 +55,7 @@ void shouldCreateTopicWithSerdeWithPrefix() {

@Test
void shouldCreateStream() {
KafkaStreamsExecutionContext.registerProperties(new Properties());
KafkaStreamsExecutionContext.registerProperties(new Properties(), null);

TopicWithSerde<String, String> topicWithSerde =
new TopicWithSerde<>("INPUT_TOPIC", Serdes.String(), Serdes.String());
Expand All @@ -76,7 +76,7 @@ void shouldCreateStream() {

@Test
void shouldCreateTable() {
KafkaStreamsExecutionContext.registerProperties(new Properties());
KafkaStreamsExecutionContext.registerProperties(new Properties(), null);

TopicWithSerde<String, String> topicWithSerde =
new TopicWithSerde<>("INPUT_TOPIC", Serdes.String(), Serdes.String());
Expand All @@ -100,7 +100,7 @@ void shouldCreateTable() {

@Test
void shouldCreateGlobalKtable() {
KafkaStreamsExecutionContext.registerProperties(new Properties());
KafkaStreamsExecutionContext.registerProperties(new Properties(), null);

TopicWithSerde<String, String> topicWithSerde =
new TopicWithSerde<>("INPUT_TOPIC", Serdes.String(), Serdes.String());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class KubernetesServiceTest {

@Test
void shouldGetReadinessProbeWhenRunning() {
KafkaStreamsExecutionContext.registerProperties(new Properties());
KafkaStreamsExecutionContext.registerProperties(new Properties(), null);

when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams);
when(kafkaStreams.state()).thenReturn(KafkaStreams.State.RUNNING);
Expand All @@ -60,7 +60,7 @@ void shouldGetReadinessProbeWhenRunning() {

@Test
void shouldGetReadinessProbeWhenNotRunning() {
KafkaStreamsExecutionContext.registerProperties(new Properties());
KafkaStreamsExecutionContext.registerProperties(new Properties(), null);

when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams);
when(kafkaStreams.state()).thenReturn(KafkaStreams.State.NOT_RUNNING);
Expand All @@ -81,7 +81,7 @@ void shouldGetReadinessProbeWhenNull() {

@Test
void shouldGetReadinessProbeWhenRebalancingAndAllThreadsCreated() {
KafkaStreamsExecutionContext.registerProperties(new Properties());
KafkaStreamsExecutionContext.registerProperties(new Properties(), null);

when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams);
when(kafkaStreams.state()).thenReturn(KafkaStreams.State.REBALANCING);
Expand All @@ -103,7 +103,7 @@ void shouldGetReadinessProbeWhenRebalancingAndAllThreadsCreated() {

@Test
void shouldGetReadinessProbeWhenRebalancingAndAllThreadsNotStartingOrCreated() {
KafkaStreamsExecutionContext.registerProperties(new Properties());
KafkaStreamsExecutionContext.registerProperties(new Properties(), null);

when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams);
when(kafkaStreams.state()).thenReturn(KafkaStreams.State.REBALANCING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected void startHttpServer() {
protected void initProperties() {
serverPort = springBootServerPort;
kafkaProperties = springBootKafkaProperties.asProperties();
KafkaStreamsExecutionContext.registerProperties(kafkaProperties);
KafkaStreamsExecutionContext.registerProperties(kafkaProperties, kafkaProperties);
}

/** {@inheritDoc} */
Expand Down