Skip to content

Commit f1d064a

Browse files
committed
feat: add multi-cluster replication and remote cleanup support
1 parent d4cc9bf commit f1d064a

File tree

12 files changed

+773
-69
lines changed

12 files changed

+773
-69
lines changed

cmd/flagger/main.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ var (
8888
kubeconfigServiceMesh string
8989
clusterName string
9090
noCrossNamespaceRefs bool
91+
istioMultiClusterEnabled bool
92+
istioMultiClusterLabel string
93+
istioMultiClusterNs string
9194
)
9295

9396
func init() {
@@ -123,6 +126,9 @@ func init() {
123126
flag.StringVar(&kubeconfigServiceMesh, "kubeconfig-service-mesh", "", "Path to a kubeconfig for the service mesh control plane cluster.")
124127
flag.StringVar(&clusterName, "cluster-name", "", "Cluster name to be included in alert msgs.")
125128
flag.BoolVar(&noCrossNamespaceRefs, "no-cross-namespace-refs", false, "When set to true, Flagger can only refer to resources in the same namespace.")
129+
flag.BoolVar(&istioMultiClusterEnabled, "istio-multicluster-enabled", false, "Enable Istio multi-cluster support.")
130+
flag.StringVar(&istioMultiClusterLabel, "istio-multicluster-secret-label", "istio/multiCluster=true", "Label on secrets for Istio multi-cluster discovery.")
131+
flag.StringVar(&istioMultiClusterNs, "istio-multicluster-secret-namespace", "istio-system", "Namespace where Istio multi-cluster secrets are located.")
126132
}
127133

128134
func main() {
@@ -228,7 +234,13 @@ func main() {
228234
setOwnerRefs = false
229235
}
230236

231-
routerFactory := router.NewFactory(cfg, kubeClient, flaggerClient, knativeClient, ingressAnnotationsPrefix, ingressClass, logger, meshClient, setOwnerRefs)
237+
var clusterManager *router.ClusterManager
238+
if istioMultiClusterEnabled {
239+
clusterManager = router.NewClusterManager(kubeClient, meshClient, logger, istioMultiClusterLabel, istioMultiClusterNs)
240+
clusterManager.Start(stopCh)
241+
}
242+
243+
routerFactory := router.NewFactory(cfg, kubeClient, flaggerClient, knativeClient, ingressAnnotationsPrefix, ingressClass, logger, meshClient, setOwnerRefs, clusterManager)
232244

233245
var configTracker canary.Tracker
234246
if enableConfigTracking {

pkg/controller/controller.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,9 @@ func NewController(
159159

160160
ctrl.enqueue(new)
161161
} else if !newCanary.DeletionTimestamp.IsZero() && hasFinalizer(&newCanary) ||
162-
!hasFinalizer(&newCanary) && newCanary.Spec.RevertOnDeletion {
162+
!hasFinalizer(&newCanary) && (newCanary.Spec.RevertOnDeletion || ctrl.routerFactory.IsMultiClusterEnabled()) {
163163
// If this was marked for deletion and has finalizers enqueue for finalizing or
164-
// if this canary doesn't have finalizers and RevertOnDeletion is true updated speck enqueue
164+
// if this canary doesn't have finalizers and RevertOnDeletion is true (or multi-cluster is enabled) updated speck enqueue
165165
ctrl.enqueue(new)
166166
}
167167

@@ -266,8 +266,8 @@ func (c *Controller) syncHandler(key string) error {
266266
return fmt.Errorf("invalid canary spec: %s", err)
267267
}
268268

269-
// Finalize if canary has been marked for deletion and revert is desired
270-
if cd.Spec.RevertOnDeletion && cd.ObjectMeta.DeletionTimestamp != nil {
269+
// Finalize if canary has been marked for deletion and revert is desired (or multi-cluster is enabled)
270+
if (cd.Spec.RevertOnDeletion || c.routerFactory.IsMultiClusterEnabled()) && cd.ObjectMeta.DeletionTimestamp != nil {
271271
// If finalizers have been previously removed proceed
272272
if !hasFinalizer(cd) {
273273
c.logger.Infof("Canary %s.%s has been finalized", cd.Name, cd.Namespace)
@@ -306,8 +306,8 @@ func (c *Controller) syncHandler(key string) error {
306306

307307
c.canaries.Store(fmt.Sprintf("%s.%s", cd.Name, cd.Namespace), cd)
308308

309-
// If opt in for revertOnDeletion add finalizer if not present
310-
if cd.Spec.RevertOnDeletion && !hasFinalizer(cd) {
309+
// If opt in for revertOnDeletion (or multi-cluster is enabled) add finalizer if not present
310+
if (cd.Spec.RevertOnDeletion || c.routerFactory.IsMultiClusterEnabled()) && !hasFinalizer(cd) {
311311
if err := c.addFinalizer(cd); err != nil {
312312
return fmt.Errorf("unable to add finalizer to canary %s.%s: %w", cd.Name, cd.Namespace, err)
313313
}

pkg/controller/finalizer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package controller
1818

1919
import (
2020
"context"
21-
"errors"
2221
"fmt"
22+
"strings"
2323
"time"
2424

2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -80,12 +80,12 @@ func (c *Controller) finalize(old interface{}) error {
8080
Cap: canary.GetAnalysisInterval(),
8181
Steps: 4,
8282
}
83-
retry.OnError(backoff, func(err error) bool {
84-
return err.Error() == "retriable error"
83+
err = retry.OnError(backoff, func(err error) bool {
84+
return strings.Contains(err.Error(), "retriable: ")
8585
}, func() error {
8686
retriable, err := canaryController.IsCanaryReady(canary)
8787
if err != nil && retriable {
88-
return errors.New("retriable error")
88+
return fmt.Errorf("retriable: %w", err)
8989
}
9090
return err
9191
})
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package controller
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
12+
"github.com/fluxcd/flagger/pkg/router"
13+
)
14+
15+
func TestController_MultiClusterFinalizer(t *testing.T) {
16+
// 1. Setup Canary WITHOUT RevertOnDeletion
17+
canary := newDeploymentTestCanary()
18+
canary.Spec.RevertOnDeletion = false
19+
20+
// 2. Setup Fixture with Multi-Cluster ENABLED
21+
// We'll mock the ClusterManager directly in the factory
22+
mocks := newDeploymentFixture(canary)
23+
24+
// Inject ClusterManager to simulate multi-cluster enabled
25+
cm := router.NewClusterManager(mocks.kubeClient, mocks.flaggerClient, mocks.logger, "istio/multiCluster=true", "istio-system")
26+
rf := router.NewFactory(nil, mocks.kubeClient, mocks.flaggerClient, nil, "", "", mocks.logger, mocks.flaggerClient, true, cm)
27+
28+
mocks.ctrl.routerFactory = rf // Override with our multi-cluster factory
29+
30+
// 3. Sync the canary - this should ADD the finalizer because multi-cluster is enabled
31+
key := fmt.Sprintf("%s/%s", canary.Namespace, canary.Name)
32+
err := mocks.ctrl.syncHandler(key)
33+
require.NoError(t, err)
34+
35+
// 4. Verify Finalizer was added
36+
c, err := mocks.flaggerClient.FlaggerV1beta1().Canaries(canary.Namespace).Get(context.Background(), canary.Name, metav1.GetOptions{})
37+
require.NoError(t, err)
38+
assert.True(t, hasFinalizer(c), "Finalizer should be added in multi-cluster mode even if RevertOnDeletion is false")
39+
40+
// Initialize canary (creates primary, set status etc)
41+
mocks.ctrl.advanceCanary(canary.Name, canary.Namespace)
42+
mocks.makePrimaryReady(t)
43+
mocks.makeCanaryReady(t)
44+
mocks.ctrl.advanceCanary(canary.Name, canary.Namespace)
45+
46+
// Update informer indexer
47+
c, _ = mocks.flaggerClient.FlaggerV1beta1().Canaries(canary.Namespace).Get(context.Background(), canary.Name, metav1.GetOptions{})
48+
err = mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Update(c)
49+
require.NoError(t, err)
50+
51+
// 5. Mark for Deletion
52+
now := metav1.Now()
53+
c.DeletionTimestamp = &now
54+
c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries(canary.Namespace).Update(context.Background(), c, metav1.UpdateOptions{})
55+
require.NoError(t, err)
56+
// Update informer indexer
57+
err = mocks.ctrl.flaggerInformers.CanaryInformer.Informer().GetIndexer().Update(c)
58+
require.NoError(t, err)
59+
60+
// 6. Sync again - this should TRIGGER finalization
61+
err = mocks.ctrl.syncHandler(key)
62+
require.NoError(t, err)
63+
64+
// 7. Verify Finalizer was removed (after successful finalization)
65+
c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries(canary.Namespace).Get(context.Background(), canary.Name, metav1.GetOptions{})
66+
require.NoError(t, err)
67+
assert.False(t, hasFinalizer(c), "Finalizer should be removed after successful finalization")
68+
}

pkg/controller/scheduler.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -550,9 +550,6 @@ func (c *Controller) runPromotionTrafficShift(canary *flaggerv1.Canary, canaryCo
550550
}
551551
}
552552
}
553-
554-
return
555-
556553
}
557554

558555
func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller,

pkg/controller/scheduler_daemonset_fixture_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture {
9292
}
9393

9494
// init router
95-
rf := router.NewFactory(nil, kubeClient, flaggerClient, nil, "annotationsPrefix", "", logger, flaggerClient, true)
95+
rf := router.NewFactory(nil, kubeClient, flaggerClient, nil, "annotationsPrefix", "", logger, flaggerClient, true, nil)
9696

9797
// init observer
9898
observerFactory, _ := observers.NewFactory(testMetricsServerURL)

pkg/controller/scheduler_deployment_fixture_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture {
121121
}
122122

123123
// init router
124-
rf := router.NewFactory(nil, kubeClient, flaggerClient, nil, "annotationsPrefix", "", logger, flaggerClient, true)
124+
rf := router.NewFactory(nil, kubeClient, flaggerClient, nil, "annotationsPrefix", "", logger, flaggerClient, true, nil)
125125

126126
// init observer
127127
observerFactory, _ := observers.NewFactory(testMetricsServerURL)

pkg/router/factory.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type Factory struct {
3838
ingressClass string
3939
logger *zap.SugaredLogger
4040
setOwnerRefs bool
41+
clusterManager *ClusterManager
4142
}
4243

4344
func NewFactory(kubeConfig *restclient.Config, kubeClient kubernetes.Interface,
@@ -47,7 +48,8 @@ func NewFactory(kubeConfig *restclient.Config, kubeClient kubernetes.Interface,
4748
ingressClass string,
4849
logger *zap.SugaredLogger,
4950
meshClient clientset.Interface,
50-
setOwnerRefs bool) *Factory {
51+
setOwnerRefs bool,
52+
clusterManager *ClusterManager) *Factory {
5153
return &Factory{
5254
kubeConfig: kubeConfig,
5355
meshClient: meshClient,
@@ -58,6 +60,7 @@ func NewFactory(kubeConfig *restclient.Config, kubeClient kubernetes.Interface,
5860
ingressClass: ingressClass,
5961
logger: logger,
6062
setOwnerRefs: setOwnerRefs,
63+
clusterManager: clusterManager,
6164
}
6265
}
6366

@@ -109,11 +112,12 @@ func (factory *Factory) MeshRouter(provider string, labelSelector string) Interf
109112
}
110113
case provider == flaggerv1.IstioProvider:
111114
return &IstioRouter{
112-
logger: factory.logger,
113-
flaggerClient: factory.flaggerClient,
114-
kubeClient: factory.kubeClient,
115-
istioClient: factory.meshClient,
116-
setOwnerRefs: factory.setOwnerRefs,
115+
logger: factory.logger,
116+
flaggerClient: factory.flaggerClient,
117+
kubeClient: factory.kubeClient,
118+
istioClients: factory.getMeshClients(),
119+
setOwnerRefs: factory.setOwnerRefs,
120+
clusterManager: factory.clusterManager,
117121
}
118122
case strings.HasPrefix(provider, flaggerv1.SMIProvider+":v1alpha1"):
119123
mesh := strings.TrimPrefix(provider, flaggerv1.SMIProvider+":v1alpha1:")
@@ -225,11 +229,24 @@ func (factory *Factory) MeshRouter(provider string, labelSelector string) Interf
225229
return &NopRouter{}
226230
default:
227231
return &IstioRouter{
228-
logger: factory.logger,
229-
flaggerClient: factory.flaggerClient,
230-
kubeClient: factory.kubeClient,
231-
istioClient: factory.meshClient,
232-
setOwnerRefs: factory.setOwnerRefs,
232+
logger: factory.logger,
233+
flaggerClient: factory.flaggerClient,
234+
kubeClient: factory.kubeClient,
235+
istioClients: factory.getMeshClients(),
236+
setOwnerRefs: factory.setOwnerRefs,
237+
clusterManager: factory.clusterManager,
233238
}
234239
}
235240
}
241+
242+
func (factory *Factory) getMeshClients() []clientset.Interface {
243+
if factory.clusterManager != nil {
244+
return factory.clusterManager.GetClients()
245+
}
246+
return []clientset.Interface{factory.meshClient}
247+
}
248+
249+
// IsMultiClusterEnabled returns true if multi-cluster support is enabled
250+
func (factory *Factory) IsMultiClusterEnabled() bool {
251+
return factory.clusterManager != nil
252+
}

0 commit comments

Comments
 (0)