Skip to content

KAFKA-18894: Add KIP-877 support for ConfigProvider #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;
@@ -546,16 +547,16 @@ private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) {
configProperties = configProviderProps;
classNameFilter = ignored -> true;
}
Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);

if (!providers.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providers);
if (!providerPlugins.isEmpty()) {
ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
ConfigTransformerResult result = configTransformer.transform(indirectVariables);
if (!result.data().isEmpty()) {
resolvedOriginals.putAll(result.data());
}
}
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));

return new ResolvingMap<>(resolvedOriginals, originals);
}
@@ -594,7 +595,7 @@ private Map<String, Object> configProviderProperties(String configProviderPrefix
* @param classNameFilter Filter for config provider class names
* @return map of config provider name and its instance.
*/
private Map<String, ConfigProvider> instantiateConfigProviders(
private Map<String, Plugin<ConfigProvider>> instantiateConfigProviders(
Map<String, String> indirectConfigs,
Map<String, ?> providerConfigProperties,
Predicate<String> classNameFilter
@@ -620,21 +621,22 @@ private Map<String, ConfigProvider> instantiateConfigProviders(
}
}
// Instantiate Config Providers
Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
for (Map.Entry<String, String> entry : providerMap.entrySet()) {
try {
String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
provider.configure(configProperties);
configProviderInstances.put(entry.getKey(), provider);
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
configProviderPluginInstances.put(entry.getKey(), providerPlugin);
} catch (ClassNotFoundException e) {
log.error("Could not load config provider class {}", entry.getValue(), e);
throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies");
}
}

return configProviderInstances;
return configProviderPluginInstances;
}

private static String providerClassProperty(String providerName) {
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.provider.FileConfigProvider;
import org.apache.kafka.common.internals.Plugin;

import java.util.ArrayList;
import java.util.HashMap;
@@ -56,15 +57,15 @@ public class ConfigTransformer {
public static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{([^}]*?):(([^}]*?):)?([^}]*?)\\}");
private static final String EMPTY_PATH = "";

private final Map<String, ConfigProvider> configProviders;
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;

/**
* Creates a ConfigTransformer with the default pattern, of the form <code>${provider:[path:]key}</code>.
*
* @param configProviders a Map of provider names and {@link ConfigProvider} instances.
* @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances.
*/
public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
this.configProviders = configProviders;
public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
this.configProviderPlugins = configProviderPlugins;
}

/**
@@ -94,13 +95,13 @@ public ConfigTransformerResult transform(Map<String, String> configs) {
Map<String, Long> ttls = new HashMap<>();
for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) {
String providerName = entry.getKey();
ConfigProvider provider = configProviders.get(providerName);
Plugin<ConfigProvider> providerPlugin = configProviderPlugins.get(providerName);
Map<String, Set<String>> keysByPath = entry.getValue();
if (provider != null && keysByPath != null) {
if (providerPlugin != null && keysByPath != null) {
for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
String path = pathWithKeys.getKey();
Set<String> keys = new HashSet<>(pathWithKeys.getValue());
ConfigData configData = provider.get(path, keys);
ConfigData configData = providerPlugin.get().get(path, keys);
Map<String, String> data = configData.data();
Long ttl = configData.ttl();
if (ttl != null && ttl >= 0) {
Original file line number Diff line number Diff line change
@@ -29,6 +29,10 @@
* <p>Kafka Connect discovers implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism.
* To support this, implementations of this interface should also contain a service provider configuration file in
* {@code META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider}.
* <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
* The following tags are automatically added to all metrics registered: <code>config</code> set to
* <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name,
* and <code>provider</code> set to the provider name.
*/
public interface ConfigProvider extends Configurable, Closeable {

Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
package org.apache.kafka.common.config;

import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -45,12 +46,12 @@ public class ConfigTransformerTest {

@BeforeEach
public void setup() {
configTransformer = new ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider()));
configTransformer = new ConfigTransformer(Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
}

@Test
public void testReplaceVariable() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKey}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
assertEquals(TEST_RESULT, data.get(MY_KEY));
@@ -59,7 +60,7 @@ public void testReplaceVariable() {

@Test
public void testReplaceVariableWithTTL() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
assertEquals(TEST_RESULT_WITH_TTL, data.get(MY_KEY));
@@ -68,28 +69,28 @@ public void testReplaceVariableWithTTL() {

@Test
public void testReplaceMultipleVariablesInValue() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
Map<String, String> data = result.data();
assertEquals("hello, testResult; goodbye, testResultWithTTL!!!", data.get(MY_KEY));
}

@Test
public void testNoReplacement() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:missingKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:missingKey}"));
Map<String, String> data = result.data();
assertEquals("${test:testPath:missingKey}", data.get(MY_KEY));
}

@Test
public void testSingleLevelOfIndirection() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testIndirection}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testIndirection}"));
Map<String, String> data = result.data();
assertEquals("${test:testPath:testResult}", data.get(MY_KEY));
}

@Test
public void testReplaceVariableNoPath() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testKey}"));
Map<String, String> data = result.data();
Map<String, Long> ttls = result.ttls();
assertEquals(TEST_RESULT_NO_PATH, data.get(MY_KEY));
@@ -98,7 +99,7 @@ public void testReplaceVariableNoPath() {

@Test
public void testReplaceMultipleVariablesWithoutPathInValue() {
ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
Map<String, String> data = result.data();
assertEquals("first testResultNoPath; second testResultNoPath", data.get(MY_KEY));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.config.provider;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Monitorable;
import org.apache.kafka.common.metrics.PluginMetrics;

import java.io.IOException;
import java.util.Map;
import java.util.Set;

public class MonitorableConfigProvider implements ConfigProvider, Monitorable {
public static final String NAME = "name";
public static final String DESCRIPTION = "description";
protected boolean configured = false;

@Override
public void withPluginMetrics(PluginMetrics metrics) {
MetricName metricName = metrics.metricName(NAME, DESCRIPTION, Map.of());
metrics.addMetric(metricName, (Measurable) (config, now) -> 123);
}

@Override
public ConfigData get(String path) {
return null;
}

@Override
public ConfigData get(String path, Set<String> keys) {
return null;
}

@Override
public void close() throws IOException {
}

@Override
public void configure(Map<String, ?> configs) {
configured = true;
}
}
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -269,18 +270,19 @@ List<String> configProviders() {
Map<String, String> transform(Map<String, String> props) {
// transform worker config according to config.providers
List<String> providerNames = configProviders();
Map<String, ConfigProvider> providers = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
for (String name : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
this,
CONFIG_PROVIDERS_CONFIG + "." + name,
Plugins.ClassLoaderUsage.PLUGINS
name,
Plugins.ClassLoaderUsage.PLUGINS,
null
);
providers.put(name, configProvider);
providerPlugins.put(name, configProviderPlugin);
}
ConfigTransformer transformer = new ConfigTransformer(providers);
ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
Map<String, String> transformed = transformer.transform(props).data();
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
return transformed;
}

Original file line number Diff line number Diff line change
@@ -209,16 +209,17 @@ public Worker(

private WorkerConfigTransformer initConfigTransformer() {
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
Map<String, ConfigProvider> providerMap = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
for (String providerName : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
config,
WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
ClassLoaderUsage.PLUGINS
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
providerMap.put(providerName, configProvider);
providerPluginMap.put(providerName, configProviderPlugin);
}
return new WorkerConfigTransformer(this, providerMap);
return new WorkerConfigTransformer(this, providerPluginMap);
}
Comment on lines 210 to 223
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Guard against resource leaks when provider instantiation fails

plugins.newConfigProvider(...) may throw (e.g. bad class‑name, constructor error).
If that happens after a few providers are already created, those plugin instances stay open because initConfigTransformer() aborts before workerConfigTransformer is constructed and the close() call that normally releases them is never reached.

Consider wrapping the loop in a try/catch that closes any already‑created plugins before re‑throwing, e.g.:

Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
try {
    for (String providerName : providerNames) {
        Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
            config, providerName, ClassLoaderUsage.PLUGINS, metrics.metrics());
        providerPluginMap.put(providerName, configProviderPlugin);
    }
} catch (Throwable t) {
+   providerPluginMap.values().forEach(p -> Utils.closeQuietly(p, "config provider plugin"));
    throw t;
}
return new WorkerConfigTransformer(this, Collections.unmodifiableMap(providerPluginMap));

This keeps the constructor exception‑safe and prevents dangling threads/metrics.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private WorkerConfigTransformer initConfigTransformer() {
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
Map<String, ConfigProvider> providerMap = new HashMap<>();
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
for (String providerName : providerNames) {
ConfigProvider configProvider = plugins.newConfigProvider(
config,
WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
ClassLoaderUsage.PLUGINS
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
providerMap.put(providerName, configProvider);
providerPluginMap.put(providerName, configProviderPlugin);
}
return new WorkerConfigTransformer(this, providerMap);
return new WorkerConfigTransformer(this, providerPluginMap);
}
private WorkerConfigTransformer initConfigTransformer() {
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
try {
for (String providerName : providerNames) {
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config,
providerName,
ClassLoaderUsage.PLUGINS,
metrics.metrics()
);
providerPluginMap.put(providerName, configProviderPlugin);
}
} catch (Throwable t) {
providerPluginMap.values().forEach(p ->
Utils.closeQuietly(p, "config provider plugin"));
throw t;
}
return new WorkerConfigTransformer(
this,
Collections.unmodifiableMap(providerPluginMap)
);
}


public WorkerConfigTransformer configTransformer() {
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigTransformerResult;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
import org.apache.kafka.connect.util.Callback;
@@ -42,12 +43,12 @@ public class WorkerConfigTransformer implements AutoCloseable {
private final Worker worker;
private final ConfigTransformer configTransformer;
private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
private final Map<String, ConfigProvider> configProviders;
private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;

public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> configProviders) {
public WorkerConfigTransformer(Worker worker, Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
this.worker = worker;
this.configProviders = configProviders;
this.configTransformer = new ConfigTransformer(configProviders);
this.configProviderPlugins = configProviderPlugins;
this.configTransformer = new ConfigTransformer(configProviderPlugins);
}

public Map<String, String> transform(Map<String, String> configs) {
@@ -97,6 +98,6 @@ private void scheduleReload(String connectorName, String path, long ttl) {

@Override
public void close() {
configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
}
}
Original file line number Diff line number Diff line change
@@ -19,6 +19,8 @@
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Connector;
@@ -627,7 +629,8 @@ private <U> U newVersionedPlugin(
return plugin;
}

public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) {
public Plugin<ConfigProvider> newConfigProvider(AbstractConfig config, String providerName, ClassLoaderUsage classLoaderUsage, Metrics metrics) {
String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
String classPropertyName = providerPrefix + ".class";
Map<String, String> originalConfig = config.originalsStrings();
if (!originalConfig.containsKey(classPropertyName)) {
@@ -643,7 +646,7 @@ public ConfigProvider newConfigProvider(AbstractConfig config, String providerPr
try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
plugin.configure(configProviderConfig);
}
return plugin;
return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
}

/**
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.internals.Plugin;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -27,7 +28,6 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -66,13 +66,13 @@ public class WorkerConfigTransformerTest {

@BeforeEach
public void setup() {
configTransformer = new WorkerConfigTransformer(worker, Collections.singletonMap("test", new TestConfigProvider()));
configTransformer = new WorkerConfigTransformer(worker, Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
}

@Test
public void testReplaceVariable() {
// Execution
Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKey}"));

// Assertions
assertEquals(TEST_RESULT, result.get(MY_KEY));
@@ -97,7 +97,7 @@ public void testReplaceVariableWithTTLAndScheduleRestart() {
when(herder.restartConnector(eq(1L), eq(MY_CONNECTOR), notNull())).thenReturn(requestId);

// Execution
Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}"));

// Assertions
assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
@@ -112,14 +112,14 @@ public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() {
when(herder.restartConnector(eq(10L), eq(MY_CONNECTOR), notNull())).thenReturn(requestId);

// Execution
Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}"));

// Assertions
assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
verify(herder).restartConnector(eq(1L), eq(MY_CONNECTOR), notNull());

// Execution
result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithLongerTTL}"));
result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithLongerTTL}"));

// Assertions
assertEquals(TEST_RESULT_WITH_LONGER_TTL, result.get(MY_KEY));
@@ -147,14 +147,14 @@ public ConfigData get(String path) {
public ConfigData get(String path, Set<String> keys) {
if (path.equals(TEST_PATH)) {
if (keys.contains(TEST_KEY)) {
return new ConfigData(Collections.singletonMap(TEST_KEY, TEST_RESULT));
return new ConfigData(Map.of(TEST_KEY, TEST_RESULT));
} else if (keys.contains(TEST_KEY_WITH_TTL)) {
return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L);
return new ConfigData(Map.of(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L);
} else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) {
return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L);
return new ConfigData(Map.of(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L);
}
}
return new ConfigData(Collections.emptyMap());
return new ConfigData(Map.of());
}

@Override
Original file line number Diff line number Diff line change
@@ -37,10 +37,14 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.provider.MockFileConfigProvider;
import org.apache.kafka.common.config.provider.MonitorableConfigProvider;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.utils.LogCaptureAppender;
@@ -91,6 +95,7 @@

import org.apache.maven.artifact.versioning.VersionRange;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.AdditionalAnswers;
@@ -109,6 +114,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -373,10 +379,14 @@ public void testStartAndStopConnector(boolean enableTopicCreation) throws Throwa
private void mockFileConfigProvider() {
MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider();
mockFileConfigProvider.configure(Collections.singletonMap("testId", mockFileProviderTestId));
Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(mockFileConfigProvider,
null,
WorkerConfig.CONFIG_PROVIDERS_CONFIG,
Map.of("provider", "file"));
when(plugins.newConfigProvider(any(AbstractConfig.class),
eq("config.providers.file"),
any(ClassLoaderUsage.class)))
.thenReturn(mockFileConfigProvider);
eq("file"),
any(ClassLoaderUsage.class),
any(Metrics.class))).thenReturn(providerPlugin);
}

@ParameterizedTest
@@ -2891,6 +2901,54 @@ private void testStartTaskWithTooManyTaskConfigs(boolean enforced) {
}
}

@Test
public void testMonitorableConfigProvider() {
setup(false);
Map<String, String> props = new HashMap<>(this.workerProps);
props.put("config.providers", "monitorable,monitorable2");
config = new StandaloneConfig(props);
mockKafkaClusterId();
when(plugins.newConfigProvider(any(AbstractConfig.class), any(String.class), any(ClassLoaderUsage.class), any(Metrics.class)))
.thenAnswer(invocation -> {
String providerName = invocation.getArgument(1);
Metrics metrics = invocation.getArgument(3);
return Plugin.wrapInstance(new MonitorableConfigProvider(), metrics,
WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
});

worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
Metrics metrics = worker.metrics().metrics();
assertMetrics(metrics,
1,
expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable")));
assertMetrics(metrics,
1,
expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable2")));
}

private static Map<String, String> expectedTags(String config, String clazz, Map<String, String> extraTags) {
Map<String, String> tags = new LinkedHashMap<>();
tags.put("config", config);
tags.put("class", clazz);
tags.putAll(extraTags);
return tags;
}

private void assertMetrics(Metrics metrics, int expected, Map<String, String> expectedTags) {
int found = 0;
for (MetricName metricName : metrics.metrics().keySet()) {
if (metricName.group().equals("plugins")) {
Map<String, String> tags = metricName.tags();
if (expectedTags.equals(tags)) {
assertEquals(MonitorableConfigProvider.NAME, metricName.name());
assertEquals(MonitorableConfigProvider.DESCRIPTION, metricName.description());
found++;
}
}
}
assertEquals(expected, found);
}

private void assertTasksMaxExceededMessage(String connector, int numTasks, int maxTasks, String message) {
String expectedPrefix = "The connector " + connector
+ " has generated "
@@ -3216,5 +3274,4 @@ public void stop() {
}

}

}
Original file line number Diff line number Diff line change
@@ -22,6 +22,10 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.provider.MonitorableConfigProvider;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.components.Versioned;
@@ -374,7 +378,8 @@ public void newConverterShouldConfigureWithPluginClassLoader() {

@Test
public void newConfigProviderShouldConfigureWithPluginClassLoader() {
String providerPrefix = "some.provider";
String providerName = "customProvider";
String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
props.put(providerPrefix + ".class", TestPlugin.SAMPLING_CONFIG_PROVIDER.className());

PluginClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONFIG_PROVIDER.className());
@@ -383,16 +388,27 @@ public void newConfigProviderShouldConfigureWithPluginClassLoader() {
createConfig();
}

ConfigProvider plugin = plugins.newConfigProvider(
Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
config,
providerPrefix,
ClassLoaderUsage.PLUGINS
providerName,
ClassLoaderUsage.PLUGINS,
null
);

assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin.get()).flatten();
assertTrue(samples.containsKey("configure"));
assertPluginClassLoaderAlwaysActive(plugin);
assertPluginClassLoaderAlwaysActive(plugin.get());
}

@Test
public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() {
String providerName = "monitorable";
String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
props.put(providerPrefix + ".class", CustomMonitorableConfigProvider.class.getName());
createConfig();
Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics());
assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
}

@Test
@@ -790,4 +806,13 @@ public void configure(Map<String, ?> configs) {
super.configure(configs);
}
}

public static class CustomMonitorableConfigProvider extends MonitorableConfigProvider {

@Override
public void withPluginMetrics(PluginMetrics metrics) {
assertTrue(configured);
}
}

}