From 563f3472c54cd6a5020627fa3692750f7504fd60 Mon Sep 17 00:00:00 2001
From: Jhen-Yung Hsu <jhenyunghsu@gmail.com>
Date: Mon, 7 Apr 2025 12:17:55 +0800
Subject: [PATCH 01/10] KAFKA-18894: Add KIP-877 support for ConfigProvider

---
 .../kafka/common/config/AbstractConfig.java   | 22 +++--
 .../common/config/ConfigTransformer.java      | 12 ++-
 .../config/provider/ConfigProvider.java       |  4 +
 .../apache/kafka/common/internals/Plugin.java |  6 ++
 .../common/config/ConfigTransformerTest.java  | 17 ++--
 .../connect/mirror/MirrorMakerConfig.java     | 16 +--
 .../apache/kafka/connect/runtime/Worker.java  | 15 +--
 .../runtime/WorkerConfigTransformer.java      | 11 ++-
 .../connect/runtime/isolation/Plugins.java    |  7 +-
 .../runtime/WorkerConfigTransformerTest.java  | 20 ++--
 .../kafka/connect/runtime/WorkerTest.java     | 98 ++++++++++++++++++-
 .../runtime/isolation/PluginsTest.java        | 17 ++--
 12 files changed, 181 insertions(+), 64 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 9188b1687a808..bb6d51e82c9ef 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.utils.Utils;
 
 import org.slf4j.Logger;
@@ -546,16 +547,16 @@ private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) {
             configProperties = configProviderProps;
             classNameFilter = ignored -> true;
         }
-        Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
+        Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
 
-        if (!providers.isEmpty()) {
-            ConfigTransformer configTransformer = new ConfigTransformer(providers);
+        if (!providerPlugins.isEmpty()) {
+            ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
             ConfigTransformerResult result = configTransformer.transform(indirectVariables);
             if (!result.data().isEmpty()) {
                 resolvedOriginals.putAll(result.data());
             }
         }
-        providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
 
         return new ResolvingMap<>(resolvedOriginals, originals);
     }
@@ -587,14 +588,14 @@ private Map<String, Object> configProviderProperties(String configProviderPrefix
      * config.providers : A comma-separated list of names for providers.
      * config.providers.{name}.class : The Java class name for a provider.
      * config.providers.{name}.param.{param-name} : A parameter to be passed to the above Java class on initialization.
-     * returns a map of config provider name and its instance.
+     * returns a map of config provider name and its instance wrapped in a {@link org.apache.kafka.common.internals.Plugin}.
      *
      * @param indirectConfigs          The map of potential variable configs
      * @param providerConfigProperties The map of config provider configs
      * @param classNameFilter          Filter for config provider class names
-     * @return map of config provider name and its instance.
+     * @return map of config provider name and its instance wrapped in a {@link org.apache.kafka.common.internals.Plugin}.
      */
-    private Map<String, ConfigProvider> instantiateConfigProviders(
+    private Map<String, Plugin<ConfigProvider>> instantiateConfigProviders(
             Map<String, String> indirectConfigs,
             Map<String, ?> providerConfigProperties,
             Predicate<String> classNameFilter
@@ -620,21 +621,22 @@ private Map<String, ConfigProvider> instantiateConfigProviders(
             }
         }
         // Instantiate Config Providers
-        Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
+        Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
         for (Map.Entry<String, String> entry : providerMap.entrySet()) {
             try {
                 String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
                 Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
                 ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
                 provider.configure(configProperties);
-                configProviderInstances.put(entry.getKey(), provider);
+                Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
+                configProviderPluginInstances.put(entry.getKey(), providerPlugin);
             } catch (ClassNotFoundException e) {
                 log.error("Could not load config provider class {}", entry.getValue(), e);
                 throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies");
             }
         }
 
-        return configProviderInstances;
+        return configProviderPluginInstances;
     }
 
     private static String providerClassProperty(String providerName) {
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
index dbf6c7bbfcec1..b4659a07da274 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.provider.FileConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -56,15 +57,16 @@ public class ConfigTransformer {
     public static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{([^}]*?):(([^}]*?):)?([^}]*?)\\}");
     private static final String EMPTY_PATH = "";
 
-    private final Map<String, ConfigProvider> configProviders;
+    private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;
 
     /**
      * Creates a ConfigTransformer with the default pattern, of the form <code>${provider:[path:]key}</code>.
      *
-     * @param configProviders a Map of provider names and {@link ConfigProvider} instances.
+     * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances, where each instance
+     *                              is wrapped in a {@link org.apache.kafka.common.internals.Plugin}.
      */
-    public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
-        this.configProviders = configProviders;
+    public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
+        this.configProviderPlugins = configProviderPlugins;
     }
 
     /**
@@ -94,7 +96,7 @@ public ConfigTransformerResult transform(Map<String, String> configs) {
         Map<String, Long> ttls = new HashMap<>();
         for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) {
             String providerName = entry.getKey();
-            ConfigProvider provider = configProviders.get(providerName);
+            ConfigProvider provider = configProviderPlugins.get(providerName).get();
             Map<String, Set<String>> keysByPath = entry.getValue();
             if (provider != null && keysByPath != null) {
                 for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
diff --git a/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
index 81f0aac0d72d1..abe12237ca3e3 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
@@ -29,6 +29,10 @@
  * <p>Kafka Connect discovers implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism.
  * To support this, implementations of this interface should also contain a service provider configuration file in
  * {@code META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider}.
+ * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
+ * The following tags are automatically added to all metrics registered: <code>config</code> set to
+ * <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name,
+ * and <code>provider</code> set to the provider name.
  */
 public interface ConfigProvider extends Configurable, Closeable {
 
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
index 620cd0c07ec0f..587f9b74a8d91 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
@@ -44,6 +44,12 @@ public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key
         return wrapInstance(instance, metrics, () -> tags(key, instance));
     }
 
+    public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key, Map<String, String> extraTags) {
+        Map<String, String> tags = tags(key, instance);
+        tags.putAll(extraTags);
+        return wrapInstance(instance, metrics, () -> tags);
+    }
+
     private static <T> Map<String, String> tags(String key, T instance) {
         Map<String, String> tags = new LinkedHashMap<>();
         tags.put("config", key);
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
index 3a5ec650fab42..b9ced57b99a40 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.config;
 
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -45,12 +46,12 @@ public class ConfigTransformerTest {
 
     @BeforeEach
     public void setup() {
-        configTransformer = new ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider()));
+        configTransformer = new ConfigTransformer(Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
     }
 
     @Test
     public void testReplaceVariable() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKey}"));
         Map<String, String> data = result.data();
         Map<String, Long> ttls = result.ttls();
         assertEquals(TEST_RESULT, data.get(MY_KEY));
@@ -59,7 +60,7 @@ public void testReplaceVariable() {
 
     @Test
     public void testReplaceVariableWithTTL() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}"));
         Map<String, String> data = result.data();
         Map<String, Long> ttls = result.ttls();
         assertEquals(TEST_RESULT_WITH_TTL, data.get(MY_KEY));
@@ -68,28 +69,28 @@ public void testReplaceVariableWithTTL() {
 
     @Test
     public void testReplaceMultipleVariablesInValue() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
         Map<String, String> data = result.data();
         assertEquals("hello, testResult; goodbye, testResultWithTTL!!!", data.get(MY_KEY));
     }
 
     @Test
     public void testNoReplacement() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:missingKey}"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:missingKey}"));
         Map<String, String> data = result.data();
         assertEquals("${test:testPath:missingKey}", data.get(MY_KEY));
     }
 
     @Test
     public void testSingleLevelOfIndirection() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testIndirection}"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testIndirection}"));
         Map<String, String> data = result.data();
         assertEquals("${test:testPath:testResult}", data.get(MY_KEY));
     }
 
     @Test
     public void testReplaceVariableNoPath() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testKey}"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testKey}"));
         Map<String, String> data = result.data();
         Map<String, Long> ttls = result.ttls();
         assertEquals(TEST_RESULT_NO_PATH, data.get(MY_KEY));
@@ -98,7 +99,7 @@ public void testReplaceVariableNoPath() {
 
     @Test
     public void testReplaceMultipleVariablesWithoutPathInValue() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
         Map<String, String> data = result.data();
         assertEquals("first testResultNoPath; second testResultNoPath", data.get(MY_KEY));
     }
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index aba62cf8464ff..9bc197af7d8a9 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -23,6 +23,7 @@
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -269,18 +270,19 @@ List<String> configProviders() {
     Map<String, String> transform(Map<String, String> props) {
         // transform worker config according to config.providers
         List<String> providerNames = configProviders();
-        Map<String, ConfigProvider> providers = new HashMap<>();
+        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
         for (String name : providerNames) {
-            ConfigProvider configProvider = plugins.newConfigProvider(
+            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
                     this,
-                    CONFIG_PROVIDERS_CONFIG + "." + name,
-                    Plugins.ClassLoaderUsage.PLUGINS
+                    name,
+                    Plugins.ClassLoaderUsage.PLUGINS,
+                    null
             );
-            providers.put(name, configProvider);
+            providerPlugins.put(name, configProviderPlugin);
         }
-        ConfigTransformer transformer = new ConfigTransformer(providers);
+        ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
         Map<String, String> transformed = transformer.transform(props).data();
-        providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
         return transformed;
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 4d5b23b63a826..fc0d38521aa35 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -209,16 +209,17 @@ public Worker(
 
     private WorkerConfigTransformer initConfigTransformer() {
         final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
-        Map<String, ConfigProvider> providerMap = new HashMap<>();
+        Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
         for (String providerName : providerNames) {
-            ConfigProvider configProvider = plugins.newConfigProvider(
-                    config,
-                    WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
-                    ClassLoaderUsage.PLUGINS
+            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
+                config,
+                providerName,
+                ClassLoaderUsage.PLUGINS,
+                metrics.metrics()
             );
-            providerMap.put(providerName, configProvider);
+            providerPluginMap.put(providerName, configProviderPlugin);
         }
-        return new WorkerConfigTransformer(this, providerMap);
+        return new WorkerConfigTransformer(this, providerPluginMap);
     }
 
     public WorkerConfigTransformer configTransformer() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index df16955c941a2..35dfb0a6ad36a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigTransformerResult;
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
 import org.apache.kafka.connect.util.Callback;
@@ -42,12 +43,12 @@ public class WorkerConfigTransformer implements AutoCloseable {
     private final Worker worker;
     private final ConfigTransformer configTransformer;
     private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
-    private final Map<String, ConfigProvider> configProviders;
+    private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;
 
-    public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> configProviders) {
+    public WorkerConfigTransformer(Worker worker, Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
         this.worker = worker;
-        this.configProviders = configProviders;
-        this.configTransformer = new ConfigTransformer(configProviders);
+        this.configProviderPlugins = configProviderPlugins;
+        this.configTransformer = new ConfigTransformer(configProviderPlugins);
     }
 
     public Map<String, String> transform(Map<String, String> configs) {
@@ -97,6 +98,6 @@ private void scheduleReload(String connectorName, String path, long ttl) {
 
     @Override
     public void close() {
-        configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+        configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 98f33ea582bf0..2574732d24e57 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -19,6 +19,8 @@
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.Connector;
@@ -627,7 +629,8 @@ private <U> U newVersionedPlugin(
         return plugin;
     }
 
-    public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) {
+    public Plugin<ConfigProvider> newConfigProvider(AbstractConfig config, String providerName, ClassLoaderUsage classLoaderUsage, Metrics metrics) {
+        String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
         String classPropertyName = providerPrefix + ".class";
         Map<String, String> originalConfig = config.originalsStrings();
         if (!originalConfig.containsKey(classPropertyName)) {
@@ -643,7 +646,7 @@ public ConfigProvider newConfigProvider(AbstractConfig config, String providerPr
         try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
             plugin.configure(configProviderConfig);
         }
-        return plugin;
+        return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
     }
 
     /**
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
index c3a8f151750ec..e099e2cabce53 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.common.config.ConfigData;
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -27,7 +28,6 @@
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -66,13 +66,13 @@ public class WorkerConfigTransformerTest {
 
     @BeforeEach
     public void setup() {
-        configTransformer = new WorkerConfigTransformer(worker, Collections.singletonMap("test", new TestConfigProvider()));
+        configTransformer = new WorkerConfigTransformer(worker, Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
     }
 
     @Test
     public void testReplaceVariable() {
         // Execution
-        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKey}"));
 
         // Assertions
         assertEquals(TEST_RESULT, result.get(MY_KEY));
@@ -97,7 +97,7 @@ public void testReplaceVariableWithTTLAndScheduleRestart() {
         when(herder.restartConnector(eq(1L), eq(MY_CONNECTOR), notNull())).thenReturn(requestId);
 
         // Execution
-        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}"));
 
         // Assertions
         assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
@@ -112,14 +112,14 @@ public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() {
         when(herder.restartConnector(eq(10L), eq(MY_CONNECTOR), notNull())).thenReturn(requestId);
 
         // Execution
-        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}"));
 
         // Assertions
         assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
         verify(herder).restartConnector(eq(1L), eq(MY_CONNECTOR), notNull());
 
         // Execution
-        result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithLongerTTL}"));
+        result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithLongerTTL}"));
 
         // Assertions
         assertEquals(TEST_RESULT_WITH_LONGER_TTL, result.get(MY_KEY));
@@ -147,14 +147,14 @@ public ConfigData get(String path) {
         public ConfigData get(String path, Set<String> keys) {
             if (path.equals(TEST_PATH)) {
                 if (keys.contains(TEST_KEY)) {
-                    return new ConfigData(Collections.singletonMap(TEST_KEY, TEST_RESULT));
+                    return new ConfigData(Map.of(TEST_KEY, TEST_RESULT));
                 } else if (keys.contains(TEST_KEY_WITH_TTL)) {
-                    return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L);
+                    return new ConfigData(Map.of(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L);
                 } else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) {
-                    return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L);
+                    return new ConfigData(Map.of(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L);
                 }
             }
-            return new ConfigData(Collections.emptyMap());
+            return new ConfigData(Map.of());
         }
 
         @Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 2f7af629f0665..b95a34c39cf09 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -36,12 +36,19 @@
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigData;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.provider.MockFileConfigProvider;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.MockTime;
@@ -91,6 +98,7 @@
 
 import org.apache.maven.artifact.versioning.VersionRange;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.AdditionalAnswers;
@@ -103,6 +111,7 @@
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.quality.Strictness;
 
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.Arrays;
 import java.util.Collection;
@@ -110,6 +119,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -373,10 +383,14 @@ public void testStartAndStopConnector(boolean enableTopicCreation) throws Throwa
     private void mockFileConfigProvider() {
         MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider();
         mockFileConfigProvider.configure(Collections.singletonMap("testId", mockFileProviderTestId));
+        Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(mockFileConfigProvider,
+            null,
+            WorkerConfig.CONFIG_PROVIDERS_CONFIG,
+            Map.of("provider", "file"));
         when(plugins.newConfigProvider(any(AbstractConfig.class),
-                                       eq("config.providers.file"),
-                                       any(ClassLoaderUsage.class)))
-               .thenReturn(mockFileConfigProvider);
+            eq("file"),
+            any(ClassLoaderUsage.class),
+            any(Metrics.class))).thenReturn(providerPlugin);
     }
 
     @ParameterizedTest
@@ -2891,6 +2905,56 @@ private void testStartTaskWithTooManyTaskConfigs(boolean enforced) {
         }
     }
 
+    @Test
+    public void testMonitorableConfigProvider() {
+        setup(false);
+        Map<String, String> props = new HashMap<>(this.workerProps);
+        props.put("config.providers", "monitorable,monitorable2");
+        props.put("config.providers.monitorable.class", MonitorableConfigProvider.class.getName());
+        props.put("config.providers.monitorable2.class", MonitorableConfigProvider.class.getName());
+        config = new StandaloneConfig(props);
+        mockKafkaClusterId();
+        when(plugins.newConfigProvider(any(AbstractConfig.class), any(String.class), any(ClassLoaderUsage.class), any(Metrics.class)))
+            .thenAnswer(invocation -> {
+                String providerName = invocation.getArgument(1);
+                Metrics metrics = invocation.getArgument(3);
+                return Plugin.wrapInstance(new MonitorableConfigProvider(), metrics,
+                    WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
+            });
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
+        Metrics metrics = worker.metrics().metrics();
+        assertMetrics(metrics,
+            1,
+            expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable")));
+        assertMetrics(metrics,
+            1,
+            expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable2")));
+    }
+
+    private static Map<String, String> expectedTags(String config, String clazz, Map<String, String> extraTags) {
+        Map<String, String> tags = new LinkedHashMap<>();
+        tags.put("config", config);
+        tags.put("class", clazz);
+        tags.putAll(extraTags);
+        return tags;
+    }
+
+    private void assertMetrics(Metrics metrics, int expected, Map<String, String> expectedTags) {
+        int found = 0;
+        for (MetricName metricName : metrics.metrics().keySet()) {
+            if (metricName.group().equals("plugins")) {
+                Map<String, String> tags = metricName.tags();
+                if (expectedTags.equals(tags)) {
+                    assertEquals(MonitorableConfigProvider.NAME, metricName.name());
+                    assertEquals(MonitorableConfigProvider.DESCRIPTION, metricName.description());
+                    found++;
+                }
+            }
+        }
+        assertEquals(expected, found);
+    }
+
     private void assertTasksMaxExceededMessage(String connector, int numTasks, int maxTasks, String message) {
         String expectedPrefix = "The connector " + connector
                 + " has generated "
@@ -3217,4 +3281,32 @@ public void stop() {
 
     }
 
+    public static class MonitorableConfigProvider implements ConfigProvider, Monitorable {
+        private static final String NAME = "name";
+        private static final String DESCRIPTION = "description";
+
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            MetricName metricName = metrics.metricName(NAME, DESCRIPTION, Map.of());
+            metrics.addMetric(metricName, (Measurable) (config, now) -> 123);
+        }
+
+        @Override
+        public ConfigData get(String path) {
+            return null;
+        }
+
+        @Override
+        public ConfigData get(String path, Set<String> keys) {
+            return null;
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index ca4c29931d088..6ec2ce9677dc3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.components.Versioned;
@@ -374,7 +375,8 @@ public void newConverterShouldConfigureWithPluginClassLoader() {
 
     @Test
     public void newConfigProviderShouldConfigureWithPluginClassLoader() {
-        String providerPrefix = "some.provider";
+        String providerName = "customProvider";
+        String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
         props.put(providerPrefix + ".class", TestPlugin.SAMPLING_CONFIG_PROVIDER.className());
 
         PluginClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONFIG_PROVIDER.className());
@@ -383,16 +385,17 @@ public void newConfigProviderShouldConfigureWithPluginClassLoader() {
             createConfig();
         }
 
-        ConfigProvider plugin = plugins.newConfigProvider(
+        Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
             config,
-            providerPrefix,
-            ClassLoaderUsage.PLUGINS
+            providerName,
+            ClassLoaderUsage.PLUGINS,
+            null
         );
 
-        assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
-        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
+        assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples");
+        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin.get()).flatten();
         assertTrue(samples.containsKey("configure"));
-        assertPluginClassLoaderAlwaysActive(plugin);
+        assertPluginClassLoaderAlwaysActive(plugin.get());
     }
 
     @Test

From 08dff599d976d17c89a23c9c32b134561cb7cf9f Mon Sep 17 00:00:00 2001
From: Jhen-Yung Hsu <jhenyunghsu@gmail.com>
Date: Tue, 8 Apr 2025 01:52:45 +0800
Subject: [PATCH 02/10] spotlessApply

---
 .../test/java/org/apache/kafka/connect/runtime/WorkerTest.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index b95a34c39cf09..50081409f4a84 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -118,8 +118,8 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;

From 9e221e30c29690b2359e4d4fe3cb28952910e01f Mon Sep 17 00:00:00 2001
From: Jhen-Yung Hsu <jhenyunghsu@gmail.com>
Date: Tue, 8 Apr 2025 02:52:17 +0800
Subject: [PATCH 03/10] Update msg in Utils.closeQuietly

---
 .../java/org/apache/kafka/common/config/AbstractConfig.java     | 2 +-
 .../java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java | 2 +-
 .../apache/kafka/connect/runtime/WorkerConfigTransformer.java   | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index bb6d51e82c9ef..77921d7545a7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -556,7 +556,7 @@ private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) {
                 resolvedOriginals.putAll(result.data());
             }
         }
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
 
         return new ResolvingMap<>(resolvedOriginals, originals);
     }
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 9bc197af7d8a9..7afcee06bd9e5 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -282,7 +282,7 @@ Map<String, String> transform(Map<String, String> props) {
         }
         ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
         Map<String, String> transformed = transformer.transform(props).data();
-        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
         return transformed;
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 35dfb0a6ad36a..596c0fb55e8ae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -98,6 +98,6 @@ private void scheduleReload(String connectorName, String path, long ttl) {
 
     @Override
     public void close() {
-        configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+        configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
     }
 }

From 1dd8f41f953bb6dbca8aaf3b6519817ff311f868 Mon Sep 17 00:00:00 2001
From: Jhen-Yung Hsu <jhenyunghsu@gmail.com>
Date: Tue, 8 Apr 2025 03:26:43 +0800
Subject: [PATCH 04/10] Add a test to verify withPluginMetrics is called after
 configure

---
 .../runtime/isolation/PluginsTest.java        | 44 +++++++++++++++++++
 1 file changed, 44 insertions(+)

diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 6ec2ce9677dc3..06d7844d071c8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -19,10 +19,14 @@
 
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigData;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.components.Versioned;
@@ -52,6 +56,7 @@
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -398,6 +403,16 @@ public void newConfigProviderShouldConfigureWithPluginClassLoader() {
         assertPluginClassLoaderAlwaysActive(plugin.get());
     }
 
+    @Test
+    public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() {
+        String providerName = "monitorable";
+        String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
+        props.put(providerPrefix + ".class", MonitorableConfigProvider .class.getName());
+        createConfig();
+        Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics());
+        assertInstanceOf(MonitorableConfigProvider.class, plugin.get());
+    }
+
     @Test
     public void newHeaderConverterShouldConfigureWithPluginClassLoader() {
         props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestPlugin.SAMPLING_HEADER_CONVERTER.className());
@@ -793,4 +808,33 @@ public void configure(Map<String, ?> configs) {
             super.configure(configs);
         }
     }
+
+    public static class MonitorableConfigProvider implements ConfigProvider, Monitorable {
+        private boolean configured = false;
+
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            assertTrue(configured);
+        }
+
+        @Override
+        public ConfigData get(String path) {
+            return null;
+        }
+
+        @Override
+        public ConfigData get(String path, Set<String> keys) {
+            return null;
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            configured = true;
+        }
+    }
+
 }

From 204c8a95d0bd67483afbd2732befdbabe8b450f2 Mon Sep 17 00:00:00 2001
From: Jhen-Yung Hsu <jhenyunghsu@gmail.com>
Date: Tue, 8 Apr 2025 03:57:03 +0800
Subject: [PATCH 05/10] Refactor the test

---
 .../provider/MonitorableConfigProvider.java   | 58 +++++++++++++++++++
 .../kafka/connect/runtime/WorkerTest.java     | 35 +----------
 .../runtime/isolation/PluginsTest.java        | 30 ++--------
 3 files changed, 63 insertions(+), 60 deletions(-)
 create mode 100644 clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java

diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
new file mode 100644
index 0000000000000..e1759acc4e9fc
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+public class MonitorableConfigProvider implements ConfigProvider, Monitorable {
+    public static final String NAME = "name";
+    public static final String DESCRIPTION = "description";
+    protected boolean configured = false;
+
+    @Override
+    public void withPluginMetrics(PluginMetrics metrics) {
+        MetricName metricName = metrics.metricName(NAME, DESCRIPTION, Map.of());
+        metrics.addMetric(metricName, (Measurable) (config, now) -> 123);
+    }
+
+    @Override
+    public ConfigData get(String path) {
+        return null;
+    }
+
+    @Override
+    public ConfigData get(String path, Set<String> keys) {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        configured = true;
+    }
+}
\ No newline at end of file
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 50081409f4a84..d065f0ffa9f70 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -36,19 +36,16 @@
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigData;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.provider.MockFileConfigProvider;
+import org.apache.kafka.common.config.provider.MonitorableConfigProvider;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.Monitorable;
-import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.MockTime;
@@ -111,7 +108,6 @@
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.quality.Strictness;
 
-import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.Arrays;
 import java.util.Collection;
@@ -3280,33 +3276,4 @@ public void stop() {
         }
 
     }
-
-    public static class MonitorableConfigProvider implements ConfigProvider, Monitorable {
-        private static final String NAME = "name";
-        private static final String DESCRIPTION = "description";
-
-        @Override
-        public void withPluginMetrics(PluginMetrics metrics) {
-            MetricName metricName = metrics.metricName(NAME, DESCRIPTION, Map.of());
-            metrics.addMetric(metricName, (Measurable) (config, now) -> 123);
-        }
-
-        @Override
-        public ConfigData get(String path) {
-            return null;
-        }
-
-        @Override
-        public ConfigData get(String path, Set<String> keys) {
-            return null;
-        }
-
-        @Override
-        public void close() throws IOException {
-        }
-
-        @Override
-        public void configure(Map<String, ?> configs) {
-        }
-    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 06d7844d071c8..946467240c3de 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -19,13 +19,12 @@
 
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigData;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.config.provider.MonitorableConfigProvider;
 import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Monitorable;
 import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.Utils;
@@ -56,7 +55,6 @@
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
-import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -407,10 +405,10 @@ public void newConfigProviderShouldConfigureWithPluginClassLoader() {
     public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() {
         String providerName = "monitorable";
         String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
-        props.put(providerPrefix + ".class", MonitorableConfigProvider .class.getName());
+        props.put(providerPrefix + ".class", CustomMonitorableConfigProvider .class.getName());
         createConfig();
         Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics());
-        assertInstanceOf(MonitorableConfigProvider.class, plugin.get());
+        assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
     }
 
     @Test
@@ -809,32 +807,12 @@ public void configure(Map<String, ?> configs) {
         }
     }
 
-    public static class MonitorableConfigProvider implements ConfigProvider, Monitorable {
-        private boolean configured = false;
+    public static class CustomMonitorableConfigProvider extends MonitorableConfigProvider {
 
         @Override
         public void withPluginMetrics(PluginMetrics metrics) {
             assertTrue(configured);
         }
-
-        @Override
-        public ConfigData get(String path) {
-            return null;
-        }
-
-        @Override
-        public ConfigData get(String path, Set<String> keys) {
-            return null;
-        }
-
-        @Override
-        public void close() throws IOException {
-        }
-
-        @Override
-        public void configure(Map<String, ?> configs) {
-            configured = true;
-        }
     }
 
 }

From 96fd5a67e588eaf5ba56e6dc98d4b99ef8300117 Mon Sep 17 00:00:00 2001
From: Jhen-Yung Hsu <jhenyunghsu@gmail.com>
Date: Tue, 8 Apr 2025 05:37:58 +0800
Subject: [PATCH 06/10] Refactor

---
 .../test/java/org/apache/kafka/connect/runtime/WorkerTest.java  | 2 --
 .../org/apache/kafka/connect/runtime/isolation/PluginsTest.java | 2 +-
 2 files changed, 1 insertion(+), 3 deletions(-)

diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index d065f0ffa9f70..4af8c0df96ae3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -2906,8 +2906,6 @@ public void testMonitorableConfigProvider() {
         setup(false);
         Map<String, String> props = new HashMap<>(this.workerProps);
         props.put("config.providers", "monitorable,monitorable2");
-        props.put("config.providers.monitorable.class", MonitorableConfigProvider.class.getName());
-        props.put("config.providers.monitorable2.class", MonitorableConfigProvider.class.getName());
         config = new StandaloneConfig(props);
         mockKafkaClusterId();
         when(plugins.newConfigProvider(any(AbstractConfig.class), any(String.class), any(ClassLoaderUsage.class), any(Metrics.class)))
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 946467240c3de..9c722402cc582 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -405,7 +405,7 @@ public void newConfigProviderShouldConfigureWithPluginClassLoader() {
     public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() {
         String providerName = "monitorable";
         String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
-        props.put(providerPrefix + ".class", CustomMonitorableConfigProvider .class.getName());
+        props.put(providerPrefix + ".class", CustomMonitorableConfigProvider.class.getName());
         createConfig();
         Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics());
         assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());

From 6ef4001d0346d750b0a51052c5c852d20aec85e3 Mon Sep 17 00:00:00 2001
From: Jhen-Yung Hsu <jhenyunghsu@gmail.com>
Date: Tue, 8 Apr 2025 05:40:11 +0800
Subject: [PATCH 07/10] Add a newline at the end of the file

---
 .../kafka/common/config/provider/MonitorableConfigProvider.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
index e1759acc4e9fc..ddef7eda59599 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
@@ -55,4 +55,4 @@ public void close() throws IOException {
     public void configure(Map<String, ?> configs) {
         configured = true;
     }
-}
\ No newline at end of file
+}

From 44ecfdb5c63f4f63d9e35d4ade9f7a787f400ee2 Mon Sep 17 00:00:00 2001
From: Jhen-Yung Hsu <jhenyunghsu@gmail.com>
Date: Tue, 8 Apr 2025 11:11:09 +0800
Subject: [PATCH 08/10] Fix NPE in transform method

---
 .../java/org/apache/kafka/common/config/AbstractConfig.java | 1 +
 .../org/apache/kafka/common/config/ConfigTransformer.java   | 6 +++---
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 77921d7545a7e..369c546af9919 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -624,6 +624,7 @@ private Map<String, Plugin<ConfigProvider>> instantiateConfigProviders(
         Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
         for (Map.Entry<String, String> entry : providerMap.entrySet()) {
             try {
+                System.out.println("Checking:" + entry);
                 String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
                 Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
                 ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
index b4659a07da274..adf97c5aaad3b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
@@ -96,13 +96,13 @@ public ConfigTransformerResult transform(Map<String, String> configs) {
         Map<String, Long> ttls = new HashMap<>();
         for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) {
             String providerName = entry.getKey();
-            ConfigProvider provider = configProviderPlugins.get(providerName).get();
+            Plugin<ConfigProvider> providerPlugin = configProviderPlugins.get(providerName);
             Map<String, Set<String>> keysByPath = entry.getValue();
-            if (provider != null && keysByPath != null) {
+            if (providerPlugin != null && keysByPath != null) {
                 for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
                     String path = pathWithKeys.getKey();
                     Set<String> keys = new HashSet<>(pathWithKeys.getValue());
-                    ConfigData configData = provider.get(path, keys);
+                    ConfigData configData = providerPlugin.get().get(path, keys);
                     Map<String, String> data = configData.data();
                     Long ttl = configData.ttl();
                     if (ttl != null && ttl >= 0) {

From e26a06f634d0eae0f57a2d47df2c12e91e362e2d Mon Sep 17 00:00:00 2001
From: Jhen-Yung Hsu <jhenyunghsu@gmail.com>
Date: Tue, 8 Apr 2025 11:13:58 +0800
Subject: [PATCH 09/10] Remove debug print

---
 .../main/java/org/apache/kafka/common/config/AbstractConfig.java | 1 -
 1 file changed, 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 369c546af9919..77921d7545a7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -624,7 +624,6 @@ private Map<String, Plugin<ConfigProvider>> instantiateConfigProviders(
         Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
         for (Map.Entry<String, String> entry : providerMap.entrySet()) {
             try {
-                System.out.println("Checking:" + entry);
                 String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
                 Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
                 ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);

From b92a2bb2ddf809bfa0455bfb237ea55bb70016f0 Mon Sep 17 00:00:00 2001
From: Jhen-Yung Hsu <jhenyunghsu@gmail.com>
Date: Sat, 12 Apr 2025 10:02:08 +0800
Subject: [PATCH 10/10] Address m1a2st comments

---
 .../java/org/apache/kafka/common/config/AbstractConfig.java   | 4 ++--
 .../org/apache/kafka/common/config/ConfigTransformer.java     | 3 +--
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 77921d7545a7e..19b5421d8ce59 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -588,12 +588,12 @@ private Map<String, Object> configProviderProperties(String configProviderPrefix
      * config.providers : A comma-separated list of names for providers.
      * config.providers.{name}.class : The Java class name for a provider.
      * config.providers.{name}.param.{param-name} : A parameter to be passed to the above Java class on initialization.
-     * returns a map of config provider name and its instance wrapped in a {@link org.apache.kafka.common.internals.Plugin}.
+     * returns a map of config provider name and its instance.
      *
      * @param indirectConfigs          The map of potential variable configs
      * @param providerConfigProperties The map of config provider configs
      * @param classNameFilter          Filter for config provider class names
-     * @return map of config provider name and its instance wrapped in a {@link org.apache.kafka.common.internals.Plugin}.
+     * @return map of config provider name and its instance.
      */
     private Map<String, Plugin<ConfigProvider>> instantiateConfigProviders(
             Map<String, String> indirectConfigs,
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
index adf97c5aaad3b..cfb63a65f5394 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
@@ -62,8 +62,7 @@ public class ConfigTransformer {
     /**
      * Creates a ConfigTransformer with the default pattern, of the form <code>${provider:[path:]key}</code>.
      *
-     * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances, where each instance
-     *                              is wrapped in a {@link org.apache.kafka.common.internals.Plugin}.
+     * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances.
      */
     public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
         this.configProviderPlugins = configProviderPlugins;