diff --git a/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java b/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java index 0ecba188..b8b45ca9 100644 --- a/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java +++ b/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java @@ -64,7 +64,7 @@ protected KafkaStreamsStarterTest() {} void generalSetUp() { Properties properties = getProperties(); - KafkaStreamsExecutionContext.registerProperties(properties); + KafkaStreamsExecutionContext.registerProperties(properties, null); String schemaRegistryUrl = properties.getProperty(SCHEMA_REGISTRY_URL_CONFIG); KafkaStreamsExecutionContext.setSerdesConfig( diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java index fb9ab836..a9e29f30 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java @@ -22,6 +22,7 @@ import static com.michelin.kstreamplify.serde.TopicWithSerde.SELF; import static com.michelin.kstreamplify.topic.TopicUtils.PREFIX_PROPERTY_NAME; +import com.michelin.kstreamplify.property.PropertiesUtils; import java.util.Map; import java.util.Properties; import lombok.Getter; @@ -45,6 +46,10 @@ public class KafkaStreamsExecutionContext { @Setter private static Properties properties; + @Getter + @Setter + private static Properties kafkaProperties; + @Getter private static String prefix; @@ -55,18 +60,20 @@ private KafkaStreamsExecutionContext() {} * * @param properties The Kafka properties */ - public static void registerProperties(Properties properties) { + public static void registerProperties(Properties properties, Properties kafkaProperties) { if (properties == null) { return; } - prefix = properties.getProperty(PREFIX_PROPERTY_NAME + PROPERTY_SEPARATOR + SELF, ""); + KafkaStreamsExecutionContext.properties = PropertiesUtils.removeKafkaPrefix(properties); + KafkaStreamsExecutionContext.kafkaProperties = kafkaProperties; + + KafkaStreamsExecutionContext.prefix = + properties.getProperty(PREFIX_PROPERTY_NAME + PROPERTY_SEPARATOR + SELF, ""); if (StringUtils.isNotBlank(prefix) && properties.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) { - properties.setProperty( + KafkaStreamsExecutionContext.properties.setProperty( StreamsConfig.APPLICATION_ID_CONFIG, prefix.concat(properties.getProperty(StreamsConfig.APPLICATION_ID_CONFIG))); } - - KafkaStreamsExecutionContext.properties = properties; } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java index b9b2cf19..e9a38bbd 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java @@ -165,7 +165,7 @@ protected void initProperties() { properties = PropertiesUtils.loadProperties(); serverPort = (Integer) properties.get(SERVER_PORT_PROPERTY_NAME); kafkaProperties = PropertiesUtils.loadKafkaProperties(properties); - KafkaStreamsExecutionContext.registerProperties(kafkaProperties); + KafkaStreamsExecutionContext.registerProperties(properties, kafkaProperties); } /** diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/property/PropertiesUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/property/PropertiesUtils.java index 8dd0c974..6ebd3f8b 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/property/PropertiesUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/property/PropertiesUtils.java @@ -57,20 +57,35 @@ public static Properties loadProperties() { } /** - * Get the Kafka properties only from the given properties. + * Get the Kafka properties from the given properties, without the kafka properties prefix. * * @param props The properties * @return The Kafka properties */ public static Properties loadKafkaProperties(Properties props) { Properties resultProperties = new Properties(); - for (var prop : props.entrySet()) { - if (StringUtils.contains(prop.getKey().toString(), KAFKA_PROPERTIES_PREFIX)) { - resultProperties.put( - StringUtils.remove(prop.getKey().toString(), KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR), - prop.getValue()); - } - } + props.keySet().stream() + .filter(key -> key.toString().startsWith(KAFKA_PROPERTIES_PREFIX)) + .forEach(key -> resultProperties.put( + key.toString().replaceAll("^" + KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR, ""), + props.get(key).toString())); + + return resultProperties; + } + + /** + * Remove the kafka properties prefix from the given properties keys. + * + * @param props The properties + * @return The updated properties + */ + public static Properties removeKafkaPrefix(Properties props) { + Properties resultProperties = new Properties(); + props.keySet() + .forEach(key -> resultProperties.put( + key.toString().replaceAll("^" + KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR, ""), + props.get(key).toString())); + return resultProperties; } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContextTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContextTest.java index 92943ddb..5df0369d 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContextTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContextTest.java @@ -35,8 +35,9 @@ void setUp() { @Test void shouldNotRegisterPropertiesWhenNull() { - KafkaStreamsExecutionContext.registerProperties(null); + KafkaStreamsExecutionContext.registerProperties(null, null); assertNull(KafkaStreamsExecutionContext.getProperties()); + assertNull(KafkaStreamsExecutionContext.getKafkaProperties()); } @Test @@ -45,7 +46,7 @@ void shouldAddPrefixToAppId() { properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); properties.put("prefix.self", "abc."); - KafkaStreamsExecutionContext.registerProperties(properties); + KafkaStreamsExecutionContext.registerProperties(properties, null); assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix()); assertEquals( @@ -57,7 +58,7 @@ void shouldNotAddPrefixToAppIdIfNoPrefix() { Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); - KafkaStreamsExecutionContext.registerProperties(properties); + KafkaStreamsExecutionContext.registerProperties(properties, null); assertEquals("", KafkaStreamsExecutionContext.getPrefix()); assertEquals("appId", KafkaStreamsExecutionContext.getProperties().get(StreamsConfig.APPLICATION_ID_CONFIG)); @@ -68,9 +69,43 @@ void shouldNotAddPrefixToAppIdIfNotAppId() { Properties properties = new Properties(); properties.put("prefix.self", "abc."); - KafkaStreamsExecutionContext.registerProperties(properties); + KafkaStreamsExecutionContext.registerProperties(properties, null); assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix()); assertNull(KafkaStreamsExecutionContext.getProperties().get(StreamsConfig.APPLICATION_ID_CONFIG)); } + + @Test + void shouldRegisterNonKafkaPropertiesInProperties() { + Properties properties = new Properties(); + properties.put("notKafka.properties.prop", "propValue"); + + KafkaStreamsExecutionContext.registerProperties(properties, null); + + assertEquals("propValue", KafkaStreamsExecutionContext.getProperties().get("notKafka.properties.prop")); + assertNull(KafkaStreamsExecutionContext.getKafkaProperties()); + } + + @Test + void shouldRegisterKafkaPropertiesInProperties() { + Properties properties = new Properties(); + properties.put("kafka.properties.kafkaProp", "kafkaPropValue"); + + KafkaStreamsExecutionContext.registerProperties(properties, null); + + assertEquals( + "kafkaPropValue", KafkaStreamsExecutionContext.getProperties().get("kafkaProp")); + assertNull(KafkaStreamsExecutionContext.getProperties().get("kafka.properties.kafkaProp")); + } + + @Test + void shouldRegisterKafkaPropertiesInKafkaProperties() { + Properties kafkaProperties = new Properties(); + kafkaProperties.put("kafkaProp", "propValue"); + + KafkaStreamsExecutionContext.registerProperties(new Properties(), kafkaProperties); + + assertEquals( + "propValue", KafkaStreamsExecutionContext.getKafkaProperties().get("kafkaProp")); + } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java index f8112530..ca3df257 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java @@ -27,7 +27,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mockStatic; -import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import com.michelin.kstreamplify.property.PropertiesUtils; import java.util.Properties; import org.apache.kafka.streams.StreamsConfig; @@ -61,9 +60,6 @@ void shouldInitProperties() { assertNotNull(initializer.getProperties()); assertEquals(8080, initializer.getServerPort()); assertTrue(initializer.getKafkaProperties().containsKey(StreamsConfig.APPLICATION_ID_CONFIG)); - assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix()); - assertEquals( - "abc.appId", KafkaStreamsExecutionContext.getProperties().get(StreamsConfig.APPLICATION_ID_CONFIG)); } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java index 9720dfa8..84ccb546 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java @@ -47,7 +47,7 @@ class KafkaStreamsStarterTest { @Test void shouldInstantiateKafkaStreamsStarter() { - KafkaStreamsExecutionContext.registerProperties(new Properties()); + KafkaStreamsExecutionContext.registerProperties(new Properties(), null); KafkaStreamsExecutionContext.setSerdesConfig( Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://")); @@ -64,7 +64,7 @@ void shouldInstantiateKafkaStreamsStarter() { @Test void shouldStartWithCustomUncaughtExceptionHandler() { - KafkaStreamsExecutionContext.registerProperties(new Properties()); + KafkaStreamsExecutionContext.registerProperties(new Properties(), null); KafkaStreamsExecutionContext.setSerdesConfig( Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://")); diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/container/KafkaIntegrationTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/container/KafkaIntegrationTest.java index 2bcae486..85018939 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/container/KafkaIntegrationTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integration/container/KafkaIntegrationTest.java @@ -142,7 +142,7 @@ protected void initProperties() { Properties convertedAdditionalProperties = new Properties(); convertedAdditionalProperties.putAll(additionalProperties); kafkaProperties.putAll(PropertiesUtils.loadKafkaProperties(convertedAdditionalProperties)); - KafkaStreamsExecutionContext.registerProperties(kafkaProperties); + KafkaStreamsExecutionContext.registerProperties(properties, kafkaProperties); } } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/property/PropertiesUtilsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/property/PropertiesUtilsTest.java index b24dbe30..7b487ab7 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/property/PropertiesUtilsTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/property/PropertiesUtilsTest.java @@ -18,7 +18,10 @@ */ package com.michelin.kstreamplify.property; +import static com.michelin.kstreamplify.property.PropertiesUtils.KAFKA_PROPERTIES_PREFIX; +import static com.michelin.kstreamplify.property.PropertiesUtils.PROPERTY_SEPARATOR; import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Properties; @@ -33,7 +36,7 @@ void shouldLoadProperties() { assertTrue(properties.containsKey("server.port")); assertTrue(properties.containsValue(8080)); - assertTrue(properties.containsKey("kafka.properties." + APPLICATION_ID_CONFIG)); + assertTrue(properties.containsKey(KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR + APPLICATION_ID_CONFIG)); assertTrue(properties.containsValue("appId")); } @@ -44,4 +47,38 @@ void shouldLoadKafkaProperties() { assertTrue(properties.containsKey(APPLICATION_ID_CONFIG)); assertTrue(properties.containsValue("appId")); } + + @Test + void shouldRemoveKafkaPrefix() { + Properties prop = new Properties(); + prop.put(KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR + "kafkaProp", "propValue"); + Properties resultProperties = PropertiesUtils.removeKafkaPrefix(prop); + + assertTrue(resultProperties.containsKey("kafkaProp")); + assertTrue(resultProperties.containsValue("propValue")); + assertFalse(resultProperties.containsKey(KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR + "kafkaProp")); + } + + @Test + void shouldNotRemoveKafkaPrefix() { + Properties prop = new Properties(); + prop.put("another.properties.prop", "propValue"); + Properties resultProperties = PropertiesUtils.removeKafkaPrefix(prop); + + assertTrue(resultProperties.containsKey("another.properties.prop")); + assertTrue(resultProperties.containsValue("propValue")); + assertFalse(resultProperties.containsKey("prop")); + } + + @Test + void shouldNotRemoveKafkaPropertiesStringWhenNotPrefix() { + Properties prop = new Properties(); + prop.put("prefix." + KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR + "kafkaProp", "propValue"); + Properties resultProperties = PropertiesUtils.removeKafkaPrefix(prop); + + assertTrue( + resultProperties.containsKey("prefix." + KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR + "kafkaProp")); + assertTrue(resultProperties.containsValue("propValue")); + assertFalse(resultProperties.containsKey("prefix.kafkaProp")); + } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/property/RocksDbConfigTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/property/RocksDbConfigTest.java index 529b62ab..80fad207 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/property/RocksDbConfigTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/property/RocksDbConfigTest.java @@ -50,7 +50,7 @@ void setUp() { void testSetConfigWithDefaultValues() { Map configs = new HashMap<>(); RocksDbConfig rocksDbConfig = new RocksDbConfig(); - KafkaStreamsExecutionContext.registerProperties(new Properties()); + KafkaStreamsExecutionContext.registerProperties(new Properties(), null); rocksDbConfig.setConfig("storeName", options, configs); @@ -79,7 +79,7 @@ void testSetConfigWithCustomValues() { configs.put(RocksDbConfig.ROCKSDB_COMPRESSION_TYPE_CONFIG, compressionType); Properties properties = new Properties(); properties.putAll(configs); - KafkaStreamsExecutionContext.registerProperties(properties); + KafkaStreamsExecutionContext.registerProperties(properties, null); RocksDbConfig rocksDbConfig = new RocksDbConfig(); diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/serde/TopicWithSerdeTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/serde/TopicWithSerdeTest.java index c5514a43..554d33e9 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/serde/TopicWithSerdeTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/serde/TopicWithSerdeTest.java @@ -30,7 +30,7 @@ class TopicWithSerdeTest { @Test void shouldCreateTopicWithSerde() { - KafkaStreamsExecutionContext.registerProperties(new Properties()); + KafkaStreamsExecutionContext.registerProperties(new Properties(), null); TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", Serdes.String(), Serdes.String()); @@ -44,7 +44,7 @@ void shouldCreateTopicWithSerdeWithPrefix() { Properties properties = new Properties(); properties.put("prefix.self", "abc."); - KafkaStreamsExecutionContext.registerProperties(properties); + KafkaStreamsExecutionContext.registerProperties(properties, null); TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", Serdes.String(), Serdes.String()); @@ -55,7 +55,7 @@ void shouldCreateTopicWithSerdeWithPrefix() { @Test void shouldCreateStream() { - KafkaStreamsExecutionContext.registerProperties(new Properties()); + KafkaStreamsExecutionContext.registerProperties(new Properties(), null); TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", Serdes.String(), Serdes.String()); @@ -76,7 +76,7 @@ void shouldCreateStream() { @Test void shouldCreateTable() { - KafkaStreamsExecutionContext.registerProperties(new Properties()); + KafkaStreamsExecutionContext.registerProperties(new Properties(), null); TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", Serdes.String(), Serdes.String()); @@ -100,7 +100,7 @@ void shouldCreateTable() { @Test void shouldCreateGlobalKtable() { - KafkaStreamsExecutionContext.registerProperties(new Properties()); + KafkaStreamsExecutionContext.registerProperties(new Properties(), null); TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", Serdes.String(), Serdes.String()); diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/service/KubernetesServiceTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/service/KubernetesServiceTest.java index c5aa9a30..ed567e74 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/service/KubernetesServiceTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/service/KubernetesServiceTest.java @@ -48,7 +48,7 @@ class KubernetesServiceTest { @Test void shouldGetReadinessProbeWhenRunning() { - KafkaStreamsExecutionContext.registerProperties(new Properties()); + KafkaStreamsExecutionContext.registerProperties(new Properties(), null); when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams); when(kafkaStreams.state()).thenReturn(KafkaStreams.State.RUNNING); @@ -60,7 +60,7 @@ void shouldGetReadinessProbeWhenRunning() { @Test void shouldGetReadinessProbeWhenNotRunning() { - KafkaStreamsExecutionContext.registerProperties(new Properties()); + KafkaStreamsExecutionContext.registerProperties(new Properties(), null); when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams); when(kafkaStreams.state()).thenReturn(KafkaStreams.State.NOT_RUNNING); @@ -81,7 +81,7 @@ void shouldGetReadinessProbeWhenNull() { @Test void shouldGetReadinessProbeWhenRebalancingAndAllThreadsCreated() { - KafkaStreamsExecutionContext.registerProperties(new Properties()); + KafkaStreamsExecutionContext.registerProperties(new Properties(), null); when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams); when(kafkaStreams.state()).thenReturn(KafkaStreams.State.REBALANCING); @@ -103,7 +103,7 @@ void shouldGetReadinessProbeWhenRebalancingAndAllThreadsCreated() { @Test void shouldGetReadinessProbeWhenRebalancingAndAllThreadsNotStartingOrCreated() { - KafkaStreamsExecutionContext.registerProperties(new Properties()); + KafkaStreamsExecutionContext.registerProperties(new Properties(), null); when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams); when(kafkaStreams.state()).thenReturn(KafkaStreams.State.REBALANCING); diff --git a/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/initializer/SpringBootKafkaStreamsInitializer.java b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/initializer/SpringBootKafkaStreamsInitializer.java index 22ff01fc..79ed79da 100644 --- a/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/initializer/SpringBootKafkaStreamsInitializer.java +++ b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/initializer/SpringBootKafkaStreamsInitializer.java @@ -84,7 +84,7 @@ protected void startHttpServer() { protected void initProperties() { serverPort = springBootServerPort; kafkaProperties = springBootKafkaProperties.asProperties(); - KafkaStreamsExecutionContext.registerProperties(kafkaProperties); + KafkaStreamsExecutionContext.registerProperties(kafkaProperties, kafkaProperties); } /** {@inheritDoc} */