diff --git a/controllers/clustercache/cluster_accessor.go b/controllers/clustercache/cluster_accessor.go index 710bda38cdce..4f0fa56d42c3 100644 --- a/controllers/clustercache/cluster_accessor.go +++ b/controllers/clustercache/cluster_accessor.go @@ -154,6 +154,11 @@ type clusterAccessorHealthProbeConfig struct { // and health checking information (e.g. lastProbeSuccessTimestamp, consecutiveFailures). // lockedStateLock must be *always* held (via lock or rLock) before accessing this field. type clusterAccessorLockedState struct { + // kubeconfigResourceVersion is the resource version of the kubeconfig secret. + // This is used to detect if the kubeconfig secret has changed and we need to re-create the connection. + // It is set when the connection is created. + kubeconfigResourceVersion string + // lastConnectionCreationErrorTimestamp is the timestamp when connection creation failed the last time. lastConnectionCreationErrorTimestamp time.Time @@ -273,6 +278,12 @@ func (ca *clusterAccessor) Connect(ctx context.Context) (retErr error) { log.Info("Connected") + kubeconfigSecret, err := ca.getKubeConfigSecret(ctx) + if err != nil { + return err + } + ca.lockedState.kubeconfigResourceVersion = kubeconfigSecret.ResourceVersion + // Only generate the clientCertificatePrivateKey once as there is no need to regenerate it after disconnect/connect. // Note: This has to be done before setting connection, because otherwise this code wouldn't be re-entrant if the // private key generation fails because we check Connected above. @@ -414,6 +425,18 @@ func (ca *clusterAccessor) GetRESTConfig(ctx context.Context) (*rest.Config, err return ca.lockedState.connection.restConfig, nil } +func (ca *clusterAccessor) KubeConfigUpdated(ctx context.Context) (bool, error) { + ca.rLock(ctx) + defer ca.rUnlock(ctx) + + kubeconfigSecret, err := ca.getKubeConfigSecret(ctx) + if err != nil { + return false, err + } + + return kubeconfigSecret.ResourceVersion != ca.lockedState.kubeconfigResourceVersion, nil +} + func (ca *clusterAccessor) GetClientCertificatePrivateKey(ctx context.Context) *rsa.PrivateKey { ca.rLock(ctx) defer ca.rUnlock(ctx) diff --git a/controllers/clustercache/cluster_accessor_client.go b/controllers/clustercache/cluster_accessor_client.go index 7e5ae2484626..d2ac788a3c0a 100644 --- a/controllers/clustercache/cluster_accessor_client.go +++ b/controllers/clustercache/cluster_accessor_client.go @@ -39,6 +39,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/apiutil" kcfg "sigs.k8s.io/cluster-api/util/kubeconfig" + "sigs.k8s.io/cluster-api/util/secret" ) type createConnectionResult struct { @@ -48,6 +49,14 @@ type createConnectionResult struct { Cache *stoppableCache } +func (ca *clusterAccessor) getKubeConfigSecret(ctx context.Context) (*corev1.Secret, error) { + kubeconfigSecret, err := secret.Get(ctx, ca.config.SecretClient, ca.cluster, secret.Kubeconfig) + if err != nil { + return nil, errors.Wrapf(err, "error getting kubeconfig secret") + } + return kubeconfigSecret, nil +} + func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnectionResult, error) { log := ctrl.LoggerFrom(ctx) log.V(6).Info("Creating connection") diff --git a/controllers/clustercache/cluster_cache.go b/controllers/clustercache/cluster_cache.go index 50869683aec2..fbfafc695dfd 100644 --- a/controllers/clustercache/cluster_cache.go +++ b/controllers/clustercache/cluster_cache.go @@ -450,6 +450,7 @@ func (cc *clusterCache) Reconcile(ctx context.Context, req reconcile.Request) (r // Try to connect, if not connected. connected := accessor.Connected(ctx) + if !connected { lastConnectionCreationErrorTimestamp := accessor.GetLastConnectionCreationErrorTimestamp(ctx) @@ -511,6 +512,16 @@ func (cc *clusterCache) Reconcile(ctx context.Context, req reconcile.Request) (r requeueAfterDurations = append(requeueAfterDurations, accessor.config.HealthProbe.Interval) } } + + // Check if kubeconfig was updated + kubeconfigUpdated, err := accessor.KubeConfigUpdated(ctx) + if err != nil { + log.Error(err, "error checking if kubeconfig was updated") + } else if kubeconfigUpdated { + log.Info("Kubeconfig was updated, disconnecting to re-connect with the new kubeconfig") + accessor.Disconnect(ctx) + didDisconnect = true + } } // Send events to cluster sources. diff --git a/controllers/clustercache/cluster_cache_test.go b/controllers/clustercache/cluster_cache_test.go index 7294fdf0bbe5..657c2623e052 100644 --- a/controllers/clustercache/cluster_cache_test.go +++ b/controllers/clustercache/cluster_cache_test.go @@ -781,3 +781,62 @@ func getCounterMetric(metricFamilyName, controllerName string) (float64, error) return 0, fmt.Errorf("failed to find %q metric", metricFamilyName) } + +func TestReconcile_KubeconfigSecretResourceVersionChange(t *testing.T) { + g := NewWithT(t) + ctx := context.Background() + + testCluster := &clusterv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-kubeconfig-rv", + Namespace: metav1.NamespaceDefault, + }, + } + clusterKey := client.ObjectKeyFromObject(testCluster) + g.Expect(env.CreateAndWait(ctx, testCluster)).To(Succeed()) + defer func() { g.Expect(client.IgnoreNotFound(env.CleanupAndWait(ctx, testCluster))).To(Succeed()) }() + + opts := Options{ + SecretClient: env.GetClient(), + Client: ClientOptions{ + UserAgent: remote.DefaultClusterAPIUserAgent("test-controller-manager"), + Timeout: 10 * time.Second, + }, + Cache: CacheOptions{ + Indexes: []CacheOptionsIndex{NodeProviderIDIndex}, + }, + } + accessorConfig := buildClusterAccessorConfig(env.GetScheme(), opts, nil) + cc := &clusterCache{ + client: env.GetAPIReader(), + clusterAccessorConfig: accessorConfig, + clusterAccessors: make(map[client.ObjectKey]*clusterAccessor), + cacheCtx: context.Background(), + } + + // Set Cluster.Status.InfrastructureReady == true + patch := client.MergeFrom(testCluster.DeepCopy()) + testCluster.Status.Initialization = &clusterv1.ClusterInitializationStatus{InfrastructureProvisioned: true} + g.Expect(env.Status().Patch(ctx, testCluster, patch)).To(Succeed()) + + // Create kubeconfig Secret + kubeconfigSecret := kubeconfig.GenerateSecret(testCluster, kubeconfig.FromEnvTestConfig(env.Config, testCluster)) + g.Expect(env.CreateAndWait(ctx, kubeconfigSecret)).To(Succeed()) + defer func() { g.Expect(env.CleanupAndWait(ctx, kubeconfigSecret)).To(Succeed()) }() + + // Initial reconcile to connect + res, err := cc.Reconcile(ctx, reconcile.Request{NamespacedName: clusterKey}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(res.RequeueAfter).To(BeNumerically(">=", accessorConfig.HealthProbe.Interval-2*time.Second)) + g.Expect(cc.getClusterAccessor(clusterKey).Connected(ctx)).To(BeTrue()) + + // Simulate kubeconfig Secret update (resourceVersion change) + kubeconfigSecret.Data["dummy"] = []byte("changed") + g.Expect(env.Update(ctx, kubeconfigSecret)).To(Succeed()) + + // Reconcile again, should detect update and disconnect + _ = cc.getClusterAccessor(clusterKey) // ensure accessor is present + res, err = cc.Reconcile(ctx, reconcile.Request{NamespacedName: clusterKey}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(cc.getClusterAccessor(clusterKey).Connected(ctx)).To(BeFalse()) +}