Skip to content

Commit f3d8e1f

Browse files
authored
feat(backend): Selectively block v1 recurring runs (#13302)
Signed-off-by: Monica Zhang <zhangmonica1@gmail.com>
1 parent cd33c19 commit f3d8e1f

2 files changed

Lines changed: 193 additions & 0 deletions

File tree

backend/src/crd/controller/scheduledworkflow/controller.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ package main
1717
import (
1818
"context"
1919
"fmt"
20+
"strconv"
21+
"strings"
2022
"time"
2123

2224
workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
@@ -33,6 +35,7 @@ import (
3335
"github.com/prometheus/client_golang/prometheus"
3436
"github.com/prometheus/client_golang/prometheus/promauto"
3537
log "github.com/sirupsen/logrus"
38+
"github.com/spf13/viper"
3639
"google.golang.org/grpc/metadata"
3740
"google.golang.org/protobuf/types/known/structpb"
3841
corev1 "k8s.io/api/core/v1"
@@ -577,6 +580,13 @@ func (c *Controller) submitNewWorkflowIfNotAlreadySubmitted(
577580

578581
// If the workflow is not found, we need to create it.
579582
if swf.Spec.Workflow != nil && swf.Spec.Workflow.Spec != nil {
583+
// V1 recurring runs bypass the API server by embedding the workflow spec directly in the ScheduledWorkflow CRD,
584+
// so the V1 pipeline block needs to be enforced at the controller level as well.
585+
586+
if shouldEnforceV1Block(swf) {
587+
return false, "", fmt.Errorf(
588+
"namespace %s is not allowed to run v1 pipelines; please migrate to KFP v2 pipelines", swf.Namespace)
589+
}
580590
newWorkflow, err := swf.NewWorkflow(nextScheduledEpoch, nowEpoch)
581591
if err != nil {
582592
return false, "", err
@@ -689,3 +699,36 @@ func (c *Controller) updateStatus(
689699
}
690700
return nil
691701
}
702+
703+
// isV1PipelineBlocked checks if the given namespace is blocked from running V1 pipelines
704+
// based on the BLOCK_V1_PIPELINES and V1_ALLOWED_NAMESPACES environment variables.
705+
func isV1PipelineBlocked(namespace string) bool {
706+
blockV1Value := viper.GetString("BLOCK_V1_PIPELINES")
707+
blockV1, err := strconv.ParseBool(blockV1Value)
708+
if err != nil {
709+
log.WithError(err).Warnf("Invalid BLOCK_V1_PIPELINES value %q; V1 pipelines are not blocked", blockV1Value)
710+
blockV1 = false
711+
}
712+
if !blockV1 {
713+
return false
714+
}
715+
716+
allowedNamespaces := viper.GetString("V1_ALLOWED_NAMESPACES")
717+
if allowedNamespaces == "" {
718+
return true
719+
}
720+
721+
targetNamespace := strings.ToLower(strings.TrimSpace(namespace))
722+
for _, n := range strings.Split(allowedNamespaces, ",") {
723+
if strings.ToLower(strings.TrimSpace(n)) == targetNamespace {
724+
return false
725+
}
726+
}
727+
return true
728+
}
729+
730+
// shouldEnforceV1Block checks if the V1 pipeline block should be enforced for the given ScheduledWorkflow.
731+
// v2 ScheduledWorkflows can also have embedded workflow spec, but should not be blocked with this feature flag.
732+
func shouldEnforceV1Block(swf *util.ScheduledWorkflow) bool {
733+
return strings.HasPrefix(swf.APIVersion, commonutil.ApiVersionV1) && isV1PipelineBlocked(swf.Namespace)
734+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Copyright 2018 The Kubeflow Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"testing"
19+
20+
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
21+
commonutil "github.com/kubeflow/pipelines/backend/src/common/util"
22+
util "github.com/kubeflow/pipelines/backend/src/crd/controller/scheduledworkflow/util"
23+
swfapi "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
24+
"github.com/spf13/viper"
25+
"github.com/stretchr/testify/assert"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
)
28+
29+
func TestIsV1PipelineBlocked(t *testing.T) {
30+
tests := []struct {
31+
name string
32+
blockV1 string
33+
allowedNamespaces string
34+
namespace string
35+
expected bool
36+
}{
37+
{
38+
name: "Blocking disabled - not blocked",
39+
blockV1: "false",
40+
namespace: "ns1",
41+
expected: false,
42+
},
43+
{
44+
name: "Blocking not set - not blocked",
45+
blockV1: "",
46+
namespace: "ns1",
47+
expected: false,
48+
},
49+
{
50+
name: "Blocking enabled, no allowed namespaces - blocked",
51+
blockV1: "true",
52+
namespace: "ns1",
53+
expected: true,
54+
},
55+
{
56+
name: "Blocking enabled, namespace allowed - not blocked",
57+
blockV1: "true",
58+
allowedNamespaces: "ns1",
59+
namespace: "ns1",
60+
expected: false,
61+
},
62+
{
63+
name: "Blocking enabled, namespace not in allowed list - blocked",
64+
blockV1: "true",
65+
allowedNamespaces: "ns2,ns3",
66+
namespace: "ns1",
67+
expected: true,
68+
},
69+
{
70+
name: "Blocking enabled, namespace in allowed list - not blocked",
71+
blockV1: "true",
72+
allowedNamespaces: "ns1,ns2,ns3",
73+
namespace: "ns2",
74+
expected: false,
75+
},
76+
{
77+
name: "Blocking enabled, case insensitive namespace match - not blocked",
78+
blockV1: "true",
79+
allowedNamespaces: "NS1",
80+
namespace: "ns1",
81+
expected: false,
82+
},
83+
}
84+
85+
for _, tt := range tests {
86+
t.Run(tt.name, func(t *testing.T) {
87+
viper.Set(common.BlockV1Pipelines, tt.blockV1)
88+
viper.Set(common.V1NamespaceWhitelist, tt.allowedNamespaces)
89+
defer func() {
90+
viper.Set(common.BlockV1Pipelines, "")
91+
viper.Set(common.V1NamespaceWhitelist, "")
92+
}()
93+
94+
result := isV1PipelineBlocked(tt.namespace)
95+
assert.Equal(t, tt.expected, result)
96+
})
97+
}
98+
}
99+
100+
func TestShouldEnforceV1Block(t *testing.T) {
101+
tests := []struct {
102+
name string
103+
apiVersion string
104+
blockV1 string
105+
namespace string
106+
expected bool
107+
}{
108+
{
109+
name: "V1 SWF, blocking enabled - blocked",
110+
apiVersion: commonutil.ApiVersionV1,
111+
blockV1: "true",
112+
namespace: "ns1",
113+
expected: true,
114+
},
115+
{
116+
name: "V1 SWF, blocking disabled - not blocked",
117+
apiVersion: commonutil.ApiVersionV1,
118+
blockV1: "false",
119+
namespace: "ns1",
120+
expected: false,
121+
},
122+
{
123+
name: "V2 SWF, blocking enabled - not blocked",
124+
apiVersion: commonutil.ApiVersionV2,
125+
blockV1: "true",
126+
namespace: "ns1",
127+
expected: false,
128+
},
129+
{
130+
name: "V2 SWF, blocking disabled - not blocked",
131+
apiVersion: commonutil.ApiVersionV2,
132+
blockV1: "false",
133+
namespace: "ns1",
134+
expected: false,
135+
},
136+
}
137+
138+
for _, tt := range tests {
139+
t.Run(tt.name, func(t *testing.T) {
140+
viper.Set(common.BlockV1Pipelines, tt.blockV1)
141+
defer viper.Set(common.BlockV1Pipelines, "")
142+
143+
swf := util.NewScheduledWorkflow(&swfapi.ScheduledWorkflow{
144+
TypeMeta: metav1.TypeMeta{APIVersion: tt.apiVersion},
145+
ObjectMeta: metav1.ObjectMeta{Namespace: tt.namespace},
146+
})
147+
assert.Equal(t, tt.expected, shouldEnforceV1Block(swf))
148+
})
149+
}
150+
}

0 commit comments

Comments
 (0)