diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 9188b1687a808..19b5421d8ce59 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.utils.Utils;
 
 import org.slf4j.Logger;
@@ -546,16 +547,16 @@ private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) {
             configProperties = configProviderProps;
             classNameFilter = ignored -> true;
         }
-        Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
+        Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
 
-        if (!providers.isEmpty()) {
-            ConfigTransformer configTransformer = new ConfigTransformer(providers);
+        if (!providerPlugins.isEmpty()) {
+            ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
             ConfigTransformerResult result = configTransformer.transform(indirectVariables);
             if (!result.data().isEmpty()) {
                 resolvedOriginals.putAll(result.data());
             }
         }
-        providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
 
         return new ResolvingMap<>(resolvedOriginals, originals);
     }
@@ -594,7 +595,7 @@ private Map<String, Object> configProviderProperties(String configProviderPrefix
      * @param classNameFilter          Filter for config provider class names
      * @return map of config provider name and its instance.
      */
-    private Map<String, ConfigProvider> instantiateConfigProviders(
+    private Map<String, Plugin<ConfigProvider>> instantiateConfigProviders(
             Map<String, String> indirectConfigs,
             Map<String, ?> providerConfigProperties,
             Predicate<String> classNameFilter
@@ -620,21 +621,22 @@ private Map<String, ConfigProvider> instantiateConfigProviders(
             }
         }
         // Instantiate Config Providers
-        Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
+        Map<String, Plugin<ConfigProvider>> configProviderPluginInstances = new HashMap<>();
         for (Map.Entry<String, String> entry : providerMap.entrySet()) {
             try {
                 String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM;
                 Map<String, ?> configProperties = configProviderProperties(prefix, providerConfigProperties);
                 ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class);
                 provider.configure(configProperties);
-                configProviderInstances.put(entry.getKey(), provider);
+                Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG);
+                configProviderPluginInstances.put(entry.getKey(), providerPlugin);
             } catch (ClassNotFoundException e) {
                 log.error("Could not load config provider class {}", entry.getValue(), e);
                 throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies");
             }
         }
 
-        return configProviderInstances;
+        return configProviderPluginInstances;
     }
 
     private static String providerClassProperty(String providerName) {
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
index dbf6c7bbfcec1..cfb63a65f5394 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.provider.FileConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -56,15 +57,15 @@ public class ConfigTransformer {
     public static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{([^}]*?):(([^}]*?):)?([^}]*?)\\}");
     private static final String EMPTY_PATH = "";
 
-    private final Map<String, ConfigProvider> configProviders;
+    private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;
 
     /**
      * Creates a ConfigTransformer with the default pattern, of the form <code>${provider:[path:]key}</code>.
      *
-     * @param configProviders a Map of provider names and {@link ConfigProvider} instances.
+     * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances.
      */
-    public ConfigTransformer(Map<String, ConfigProvider> configProviders) {
-        this.configProviders = configProviders;
+    public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
+        this.configProviderPlugins = configProviderPlugins;
     }
 
     /**
@@ -94,13 +95,13 @@ public ConfigTransformerResult transform(Map<String, String> configs) {
         Map<String, Long> ttls = new HashMap<>();
         for (Map.Entry<String, Map<String, Set<String>>> entry : keysByProvider.entrySet()) {
             String providerName = entry.getKey();
-            ConfigProvider provider = configProviders.get(providerName);
+            Plugin<ConfigProvider> providerPlugin = configProviderPlugins.get(providerName);
             Map<String, Set<String>> keysByPath = entry.getValue();
-            if (provider != null && keysByPath != null) {
+            if (providerPlugin != null && keysByPath != null) {
                 for (Map.Entry<String, Set<String>> pathWithKeys : keysByPath.entrySet()) {
                     String path = pathWithKeys.getKey();
                     Set<String> keys = new HashSet<>(pathWithKeys.getValue());
-                    ConfigData configData = provider.get(path, keys);
+                    ConfigData configData = providerPlugin.get().get(path, keys);
                     Map<String, String> data = configData.data();
                     Long ttl = configData.ttl();
                     if (ttl != null && ttl >= 0) {
diff --git a/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
index 81f0aac0d72d1..abe12237ca3e3 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
@@ -29,6 +29,10 @@
  * <p>Kafka Connect discovers implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism.
  * To support this, implementations of this interface should also contain a service provider configuration file in
  * {@code META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider}.
+ * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics.
+ * The following tags are automatically added to all metrics registered: <code>config</code> set to
+ * <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name,
+ * and <code>provider</code> set to the provider name.
  */
 public interface ConfigProvider extends Configurable, Closeable {
 
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
index 3a5ec650fab42..b9ced57b99a40 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.config;
 
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -45,12 +46,12 @@ public class ConfigTransformerTest {
 
     @BeforeEach
     public void setup() {
-        configTransformer = new ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider()));
+        configTransformer = new ConfigTransformer(Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
     }
 
     @Test
     public void testReplaceVariable() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKey}"));
         Map<String, String> data = result.data();
         Map<String, Long> ttls = result.ttls();
         assertEquals(TEST_RESULT, data.get(MY_KEY));
@@ -59,7 +60,7 @@ public void testReplaceVariable() {
 
     @Test
     public void testReplaceVariableWithTTL() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}"));
         Map<String, String> data = result.data();
         Map<String, Long> ttls = result.ttls();
         assertEquals(TEST_RESULT_WITH_TTL, data.get(MY_KEY));
@@ -68,28 +69,28 @@ public void testReplaceVariableWithTTL() {
 
     @Test
     public void testReplaceMultipleVariablesInValue() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!"));
         Map<String, String> data = result.data();
         assertEquals("hello, testResult; goodbye, testResultWithTTL!!!", data.get(MY_KEY));
     }
 
     @Test
     public void testNoReplacement() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:missingKey}"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:missingKey}"));
         Map<String, String> data = result.data();
         assertEquals("${test:testPath:missingKey}", data.get(MY_KEY));
     }
 
     @Test
     public void testSingleLevelOfIndirection() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testIndirection}"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testIndirection}"));
         Map<String, String> data = result.data();
         assertEquals("${test:testPath:testResult}", data.get(MY_KEY));
     }
 
     @Test
     public void testReplaceVariableNoPath() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testKey}"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testKey}"));
         Map<String, String> data = result.data();
         Map<String, Long> ttls = result.ttls();
         assertEquals(TEST_RESULT_NO_PATH, data.get(MY_KEY));
@@ -98,7 +99,7 @@ public void testReplaceVariableNoPath() {
 
     @Test
     public void testReplaceMultipleVariablesWithoutPathInValue() {
-        ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
+        ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "first ${test:testKey}; second ${test:testKey}"));
         Map<String, String> data = result.data();
         assertEquals("first testResultNoPath; second testResultNoPath", data.get(MY_KEY));
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
new file mode 100644
index 0000000000000..ddef7eda59599
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+public class MonitorableConfigProvider implements ConfigProvider, Monitorable {
+    public static final String NAME = "name";
+    public static final String DESCRIPTION = "description";
+    protected boolean configured = false;
+
+    @Override
+    public void withPluginMetrics(PluginMetrics metrics) {
+        MetricName metricName = metrics.metricName(NAME, DESCRIPTION, Map.of());
+        metrics.addMetric(metricName, (Measurable) (config, now) -> 123);
+    }
+
+    @Override
+    public ConfigData get(String path) {
+        return null;
+    }
+
+    @Override
+    public ConfigData get(String path, Set<String> keys) {
+        return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        configured = true;
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index aba62cf8464ff..7afcee06bd9e5 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -23,6 +23,7 @@
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -269,18 +270,19 @@ List<String> configProviders() {
     Map<String, String> transform(Map<String, String> props) {
         // transform worker config according to config.providers
         List<String> providerNames = configProviders();
-        Map<String, ConfigProvider> providers = new HashMap<>();
+        Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>();
         for (String name : providerNames) {
-            ConfigProvider configProvider = plugins.newConfigProvider(
+            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
                     this,
-                    CONFIG_PROVIDERS_CONFIG + "." + name,
-                    Plugins.ClassLoaderUsage.PLUGINS
+                    name,
+                    Plugins.ClassLoaderUsage.PLUGINS,
+                    null
             );
-            providers.put(name, configProvider);
+            providerPlugins.put(name, configProviderPlugin);
         }
-        ConfigTransformer transformer = new ConfigTransformer(providers);
+        ConfigTransformer transformer = new ConfigTransformer(providerPlugins);
         Map<String, String> transformed = transformer.transform(props).data();
-        providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+        providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
         return transformed;
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 4d5b23b63a826..fc0d38521aa35 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -209,16 +209,17 @@ public Worker(
 
     private WorkerConfigTransformer initConfigTransformer() {
         final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG);
-        Map<String, ConfigProvider> providerMap = new HashMap<>();
+        Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
         for (String providerName : providerNames) {
-            ConfigProvider configProvider = plugins.newConfigProvider(
-                    config,
-                    WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName,
-                    ClassLoaderUsage.PLUGINS
+            Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
+                config,
+                providerName,
+                ClassLoaderUsage.PLUGINS,
+                metrics.metrics()
             );
-            providerMap.put(providerName, configProvider);
+            providerPluginMap.put(providerName, configProviderPlugin);
         }
-        return new WorkerConfigTransformer(this, providerMap);
+        return new WorkerConfigTransformer(this, providerPluginMap);
     }
 
     public WorkerConfigTransformer configTransformer() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index df16955c941a2..596c0fb55e8ae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigTransformerResult;
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
 import org.apache.kafka.connect.util.Callback;
@@ -42,12 +43,12 @@ public class WorkerConfigTransformer implements AutoCloseable {
     private final Worker worker;
     private final ConfigTransformer configTransformer;
     private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
-    private final Map<String, ConfigProvider> configProviders;
+    private final Map<String, Plugin<ConfigProvider>> configProviderPlugins;
 
-    public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> configProviders) {
+    public WorkerConfigTransformer(Worker worker, Map<String, Plugin<ConfigProvider>> configProviderPlugins) {
         this.worker = worker;
-        this.configProviders = configProviders;
-        this.configTransformer = new ConfigTransformer(configProviders);
+        this.configProviderPlugins = configProviderPlugins;
+        this.configTransformer = new ConfigTransformer(configProviderPlugins);
     }
 
     public Map<String, String> transform(Map<String, String> configs) {
@@ -97,6 +98,6 @@ private void scheduleReload(String connectorName, String path, long ttl) {
 
     @Override
     public void close() {
-        configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+        configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 98f33ea582bf0..2574732d24e57 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -19,6 +19,8 @@
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.Connector;
@@ -627,7 +629,8 @@ private <U> U newVersionedPlugin(
         return plugin;
     }
 
-    public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) {
+    public Plugin<ConfigProvider> newConfigProvider(AbstractConfig config, String providerName, ClassLoaderUsage classLoaderUsage, Metrics metrics) {
+        String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
         String classPropertyName = providerPrefix + ".class";
         Map<String, String> originalConfig = config.originalsStrings();
         if (!originalConfig.containsKey(classPropertyName)) {
@@ -643,7 +646,7 @@ public ConfigProvider newConfigProvider(AbstractConfig config, String providerPr
         try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
             plugin.configure(configProviderConfig);
         }
-        return plugin;
+        return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
     }
 
     /**
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
index c3a8f151750ec..e099e2cabce53 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.common.config.ConfigData;
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -27,7 +28,6 @@
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -66,13 +66,13 @@ public class WorkerConfigTransformerTest {
 
     @BeforeEach
     public void setup() {
-        configTransformer = new WorkerConfigTransformer(worker, Collections.singletonMap("test", new TestConfigProvider()));
+        configTransformer = new WorkerConfigTransformer(worker, Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers")));
     }
 
     @Test
     public void testReplaceVariable() {
         // Execution
-        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKey}"));
 
         // Assertions
         assertEquals(TEST_RESULT, result.get(MY_KEY));
@@ -97,7 +97,7 @@ public void testReplaceVariableWithTTLAndScheduleRestart() {
         when(herder.restartConnector(eq(1L), eq(MY_CONNECTOR), notNull())).thenReturn(requestId);
 
         // Execution
-        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}"));
 
         // Assertions
         assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
@@ -112,14 +112,14 @@ public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() {
         when(herder.restartConnector(eq(10L), eq(MY_CONNECTOR), notNull())).thenReturn(requestId);
 
         // Execution
-        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
+        Map<String, String> result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}"));
 
         // Assertions
         assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
         verify(herder).restartConnector(eq(1L), eq(MY_CONNECTOR), notNull());
 
         // Execution
-        result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithLongerTTL}"));
+        result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithLongerTTL}"));
 
         // Assertions
         assertEquals(TEST_RESULT_WITH_LONGER_TTL, result.get(MY_KEY));
@@ -147,14 +147,14 @@ public ConfigData get(String path) {
         public ConfigData get(String path, Set<String> keys) {
             if (path.equals(TEST_PATH)) {
                 if (keys.contains(TEST_KEY)) {
-                    return new ConfigData(Collections.singletonMap(TEST_KEY, TEST_RESULT));
+                    return new ConfigData(Map.of(TEST_KEY, TEST_RESULT));
                 } else if (keys.contains(TEST_KEY_WITH_TTL)) {
-                    return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L);
+                    return new ConfigData(Map.of(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L);
                 } else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) {
-                    return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L);
+                    return new ConfigData(Map.of(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L);
                 }
             }
-            return new ConfigData(Collections.emptyMap());
+            return new ConfigData(Map.of());
         }
 
         @Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 2f7af629f0665..4af8c0df96ae3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -37,10 +37,14 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.provider.MockFileConfigProvider;
+import org.apache.kafka.common.config.provider.MonitorableConfigProvider;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.utils.LogCaptureAppender;
@@ -91,6 +95,7 @@
 
 import org.apache.maven.artifact.versioning.VersionRange;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.AdditionalAnswers;
@@ -109,6 +114,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -373,10 +379,14 @@ public void testStartAndStopConnector(boolean enableTopicCreation) throws Throwa
     private void mockFileConfigProvider() {
         MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider();
         mockFileConfigProvider.configure(Collections.singletonMap("testId", mockFileProviderTestId));
+        Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(mockFileConfigProvider,
+            null,
+            WorkerConfig.CONFIG_PROVIDERS_CONFIG,
+            Map.of("provider", "file"));
         when(plugins.newConfigProvider(any(AbstractConfig.class),
-                                       eq("config.providers.file"),
-                                       any(ClassLoaderUsage.class)))
-               .thenReturn(mockFileConfigProvider);
+            eq("file"),
+            any(ClassLoaderUsage.class),
+            any(Metrics.class))).thenReturn(providerPlugin);
     }
 
     @ParameterizedTest
@@ -2891,6 +2901,54 @@ private void testStartTaskWithTooManyTaskConfigs(boolean enforced) {
         }
     }
 
+    @Test
+    public void testMonitorableConfigProvider() {
+        setup(false);
+        Map<String, String> props = new HashMap<>(this.workerProps);
+        props.put("config.providers", "monitorable,monitorable2");
+        config = new StandaloneConfig(props);
+        mockKafkaClusterId();
+        when(plugins.newConfigProvider(any(AbstractConfig.class), any(String.class), any(ClassLoaderUsage.class), any(Metrics.class)))
+            .thenAnswer(invocation -> {
+                String providerName = invocation.getArgument(1);
+                Metrics metrics = invocation.getArgument(3);
+                return Plugin.wrapInstance(new MonitorableConfigProvider(), metrics,
+                    WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName));
+            });
+
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
+        Metrics metrics = worker.metrics().metrics();
+        assertMetrics(metrics,
+            1,
+            expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable")));
+        assertMetrics(metrics,
+            1,
+            expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable2")));
+    }
+
+    private static Map<String, String> expectedTags(String config, String clazz, Map<String, String> extraTags) {
+        Map<String, String> tags = new LinkedHashMap<>();
+        tags.put("config", config);
+        tags.put("class", clazz);
+        tags.putAll(extraTags);
+        return tags;
+    }
+
+    private void assertMetrics(Metrics metrics, int expected, Map<String, String> expectedTags) {
+        int found = 0;
+        for (MetricName metricName : metrics.metrics().keySet()) {
+            if (metricName.group().equals("plugins")) {
+                Map<String, String> tags = metricName.tags();
+                if (expectedTags.equals(tags)) {
+                    assertEquals(MonitorableConfigProvider.NAME, metricName.name());
+                    assertEquals(MonitorableConfigProvider.DESCRIPTION, metricName.description());
+                    found++;
+                }
+            }
+        }
+        assertEquals(expected, found);
+    }
+
     private void assertTasksMaxExceededMessage(String connector, int numTasks, int maxTasks, String message) {
         String expectedPrefix = "The connector " + connector
                 + " has generated "
@@ -3216,5 +3274,4 @@ public void stop() {
         }
 
     }
-
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index ca4c29931d088..9c722402cc582 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -22,6 +22,10 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.config.provider.MonitorableConfigProvider;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.components.Versioned;
@@ -374,7 +378,8 @@ public void newConverterShouldConfigureWithPluginClassLoader() {
 
     @Test
     public void newConfigProviderShouldConfigureWithPluginClassLoader() {
-        String providerPrefix = "some.provider";
+        String providerName = "customProvider";
+        String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
         props.put(providerPrefix + ".class", TestPlugin.SAMPLING_CONFIG_PROVIDER.className());
 
         PluginClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONFIG_PROVIDER.className());
@@ -383,16 +388,27 @@ public void newConfigProviderShouldConfigureWithPluginClassLoader() {
             createConfig();
         }
 
-        ConfigProvider plugin = plugins.newConfigProvider(
+        Plugin<ConfigProvider> plugin = plugins.newConfigProvider(
             config,
-            providerPrefix,
-            ClassLoaderUsage.PLUGINS
+            providerName,
+            ClassLoaderUsage.PLUGINS,
+            null
         );
 
-        assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
-        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
+        assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples");
+        Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin.get()).flatten();
         assertTrue(samples.containsKey("configure"));
-        assertPluginClassLoaderAlwaysActive(plugin);
+        assertPluginClassLoaderAlwaysActive(plugin.get());
+    }
+
+    @Test
+    public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() {
+        String providerName = "monitorable";
+        String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName;
+        props.put(providerPrefix + ".class", CustomMonitorableConfigProvider.class.getName());
+        createConfig();
+        Plugin<ConfigProvider> plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics());
+        assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get());
     }
 
     @Test
@@ -790,4 +806,13 @@ public void configure(Map<String, ?> configs) {
             super.configure(configs);
         }
     }
+
+    public static class CustomMonitorableConfigProvider extends MonitorableConfigProvider {
+
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            assertTrue(configured);
+        }
+    }
+
 }