Skip to content

Commit a31aeac

Browse files
authored
[FLINK-38577] Added BlueGreen ingress that switches between active Svc + resolve path conflict on Blue and Green deployment ingresses
1 parent 2621c9a commit a31aeac

File tree

17 files changed

+576
-32
lines changed

17 files changed

+576
-32
lines changed

docs/content/docs/custom-resource/reference.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
8989
| Parameter | Type | Docs |
9090
| ----------| ---- | ---- |
9191
| configuration | java.util.Map<java.lang.String,java.lang.String> | |
92+
| ingress | org.apache.flink.kubernetes.operator.api.spec.IngressSpec | |
9293
| template | org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec | |
9394

9495
### FlinkDeploymentSpec
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
apiVersion: flink.apache.org/v1beta1
20+
kind: FlinkBlueGreenDeployment
21+
metadata:
22+
name: bg-ingress-test
23+
spec:
24+
configuration:
25+
kubernetes.operator.bluegreen.deployment-deletion.delay: "2s"
26+
# Parent-level ingress configuration
27+
ingress:
28+
template: "{{name}}.{{namespace}}.example.com"
29+
className: "nginx"
30+
annotations:
31+
nginx.ingress.kubernetes.io/rewrite-target: "/"
32+
template:
33+
spec:
34+
image: flink:1.20
35+
flinkVersion: v1_20
36+
flinkConfiguration:
37+
rest.port: "8081"
38+
taskmanager.numberOfTaskSlots: "1"
39+
serviceAccount: flink
40+
jobManager:
41+
resource:
42+
memory: 1G
43+
cpu: 1
44+
taskManager:
45+
resource:
46+
memory: 2G
47+
cpu: 1
48+
job:
49+
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
50+
parallelism: 1
51+
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
52+
args:
53+
- "--error-rate"
54+
- "0.15"
55+
- "--sleep"
56+
- "30"
57+
upgradeMode: stateless
58+
mode: native
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#!/usr/bin/env bash
2+
################################################################################
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
################################################################################
19+
20+
# This script tests the Blue/Green ingress rotation functionality:
21+
# - Create a FlinkBlueGreenDeployment with parent-level ingress spec
22+
# - Verify ingress is created and points to Blue deployment
23+
# - Trigger a transition to Green
24+
# - Verify ingress switches to point to Green deployment
25+
# - Verify Blue deployment is deleted but ingress remains
26+
# - Verify ingress configuration is preserved across transitions
27+
28+
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
29+
source "${SCRIPT_DIR}/utils.sh"
30+
31+
CLUSTER_ID="bg-ingress-test"
32+
BG_CLUSTER_ID=$CLUSTER_ID
33+
BLUE_CLUSTER_ID=$CLUSTER_ID"-blue"
34+
GREEN_CLUSTER_ID=$CLUSTER_ID"-green"
35+
36+
APPLICATION_YAML="${SCRIPT_DIR}/data/bluegreen-ingress.yaml"
37+
APPLICATION_IDENTIFIER="flinkbgdep/$CLUSTER_ID"
38+
BLUE_APPLICATION_IDENTIFIER="flinkdep/$BLUE_CLUSTER_ID"
39+
GREEN_APPLICATION_IDENTIFIER="flinkdep/$GREEN_CLUSTER_ID"
40+
TIMEOUT=300
41+
42+
echo "Deploying BlueGreen deployment with ingress..."
43+
retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
44+
45+
sleep 1
46+
wait_for_jobmanager_running $BLUE_CLUSTER_ID $TIMEOUT
47+
wait_for_status $BLUE_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE ${TIMEOUT} || exit 1
48+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
49+
wait_for_status $APPLICATION_IDENTIFIER '.status.blueGreenState' ACTIVE_BLUE ${TIMEOUT} || exit 1
50+
51+
echo "Verifying ingress created and points to Blue..."
52+
kubectl get ingress $BG_CLUSTER_ID -n default || exit 1
53+
54+
# Check ingress backend points to Blue's REST service
55+
BLUE_BACKEND=$(kubectl get ingress $BG_CLUSTER_ID -n default -o jsonpath='{.spec.rules[0].http.paths[0].backend.service.name}')
56+
EXPECTED_BLUE_BACKEND="${BLUE_CLUSTER_ID}-rest"
57+
if [ "$BLUE_BACKEND" != "$EXPECTED_BLUE_BACKEND" ]; then
58+
echo "ERROR: Ingress backend should be '$EXPECTED_BLUE_BACKEND' but got '$BLUE_BACKEND'"
59+
exit 1
60+
fi
61+
echo " Ingress correctly points to Blue deployment"
62+
63+
# Verify ingress annotations
64+
REWRITE_ANNOTATION=$(kubectl get ingress $BG_CLUSTER_ID -n default -o jsonpath='{.metadata.annotations.nginx\.ingress\.kubernetes\.io/rewrite-target}')
65+
if [ "$REWRITE_ANNOTATION" != "/" ]; then
66+
echo "ERROR: Expected rewrite annotation '/' but got '$REWRITE_ANNOTATION'"
67+
exit 1
68+
fi
69+
echo " Ingress annotations preserved"
70+
71+
echo "Triggering Blue’Green transition..."
72+
kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch '{"spec":{"template":{"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"}}}}}'
73+
74+
# Wait for Green to be ready
75+
wait_for_jobmanager_running $GREEN_CLUSTER_ID $TIMEOUT
76+
wait_for_status $GREEN_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE ${TIMEOUT} || exit 1
77+
78+
echo "Waiting for Blue deletion..."
79+
kubectl wait --for=delete deployment --timeout=${TIMEOUT}s --selector="app=${BLUE_CLUSTER_ID}" || exit 1
80+
81+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
82+
wait_for_status $APPLICATION_IDENTIFIER '.status.blueGreenState' ACTIVE_GREEN ${TIMEOUT} || exit 1
83+
84+
echo "Verifying ingress switched to Green..."
85+
# Ingress should still exist
86+
kubectl get ingress $BG_CLUSTER_ID -n default || exit 1
87+
88+
# Check ingress backend now points to Green's REST service
89+
GREEN_BACKEND=$(kubectl get ingress $BG_CLUSTER_ID -n default -o jsonpath='{.spec.rules[0].http.paths[0].backend.service.name}')
90+
EXPECTED_GREEN_BACKEND="${GREEN_CLUSTER_ID}-rest"
91+
if [ "$GREEN_BACKEND" != "$EXPECTED_GREEN_BACKEND" ]; then
92+
echo "ERROR: Ingress backend should be '$EXPECTED_GREEN_BACKEND' but got '$GREEN_BACKEND'"
93+
exit 1
94+
fi
95+
echo " Ingress correctly switched to Green deployment"
96+
97+
# Verify annotations still present after transition
98+
REWRITE_ANNOTATION=$(kubectl get ingress $BG_CLUSTER_ID -n default -o jsonpath='{.metadata.annotations.nginx\.ingress\.kubernetes\.io/rewrite-target}')
99+
if [ "$REWRITE_ANNOTATION" != "/" ]; then
100+
echo "ERROR: Annotations lost during transition"
101+
exit 1
102+
fi
103+
echo " Ingress configuration preserved across transition"
104+
105+
echo "Triggering Green’Blue transition to verify bidirectional switching..."
106+
kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch '{"spec":{"template":{"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"1"}}}}}'
107+
108+
wait_for_jobmanager_running $BLUE_CLUSTER_ID $TIMEOUT
109+
wait_for_status $BLUE_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE ${TIMEOUT} || exit 1
110+
kubectl wait --for=delete deployment --timeout=${TIMEOUT}s --selector="app=${GREEN_CLUSTER_ID}" || exit 1
111+
wait_for_status $APPLICATION_IDENTIFIER '.status.blueGreenState' ACTIVE_BLUE ${TIMEOUT} || exit 1
112+
113+
echo "Verifying ingress switched back to Blue..."
114+
BLUE_BACKEND=$(kubectl get ingress $BG_CLUSTER_ID -n default -o jsonpath='{.spec.rules[0].http.paths[0].backend.service.name}')
115+
if [ "$BLUE_BACKEND" != "$EXPECTED_BLUE_BACKEND" ]; then
116+
echo "ERROR: Ingress backend should be '$EXPECTED_BLUE_BACKEND' on return but got '$BLUE_BACKEND'"
117+
exit 1
118+
fi
119+
echo " Ingress correctly switched back to Blue"
120+
121+
echo "Cleaning up..."
122+
kubectl delete flinkbluegreendeployments/$BG_CLUSTER_ID &
123+
kubectl wait --for=delete flinkbluegreendeployments/$BG_CLUSTER_ID --timeout=${TIMEOUT}s
124+
125+
# Verify ingress is deleted with the deployment
126+
INGRESS_DELETED=$(kubectl get ingress $BG_CLUSTER_ID -n default 2>&1 || echo "NotFound")
127+
if [[ ! "$INGRESS_DELETED" =~ "NotFound" ]]; then
128+
echo "ERROR: Ingress should be deleted with BlueGreen deployment"
129+
exit 1
130+
fi
131+
echo " Ingress cleaned up correctly"
132+
133+
echo "Successfully run the Blue/Green ingress rotation test"

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import lombok.Data;
2626
import lombok.NoArgsConstructor;
2727

28+
import javax.annotation.Nullable;
29+
2830
import java.util.Map;
2931

3032
/** Spec that describes a Flink application with blue/green deployment capabilities. */
@@ -38,5 +40,7 @@ public class FlinkBlueGreenDeploymentSpec {
3840
@JsonProperty("configuration")
3941
private Map<String, String> configuration;
4042

43+
@Nullable private IngressSpec ingress;
44+
4145
private FlinkDeploymentTemplateSpec template;
4246
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,8 @@ void registerBlueGreenController() {
261261
MetricManager.createFlinkBlueGreenDeploymentMetricManager(baseConfig, metricGroup);
262262
var statusRecorder =
263263
StatusRecorder.createForFlinkBlueGreenDeployment(client, metricManager, listeners);
264-
var controller = new FlinkBlueGreenDeploymentController(ctxFactory, statusRecorder);
265-
264+
var controller =
265+
new FlinkBlueGreenDeploymentController(ctxFactory, configManager, statusRecorder);
266266
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
267267
}
268268

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
2222
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
2323
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
24+
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
2425
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
2526
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;
2627
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenStateHandlerRegistry;
2728
import org.apache.flink.kubernetes.operator.controller.bluegreen.handlers.BlueGreenStateHandler;
2829
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
30+
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
2931
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
3032

3133
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
@@ -71,15 +73,18 @@ public class FlinkBlueGreenDeploymentController implements Reconciler<FlinkBlueG
7173

7274
private final FlinkResourceContextFactory ctxFactory;
7375
private final BlueGreenStateHandlerRegistry handlerRegistry;
76+
private final FlinkConfigManager flinkConfigManager;
7477
private final StatusRecorder<FlinkBlueGreenDeployment, FlinkBlueGreenDeploymentStatus>
7578
statusRecorder;
7679

7780
public FlinkBlueGreenDeploymentController(
7881
FlinkResourceContextFactory ctxFactory,
82+
FlinkConfigManager flinkConfigManager,
7983
StatusRecorder<FlinkBlueGreenDeployment, FlinkBlueGreenDeploymentStatus>
8084
statusRecorder) {
8185
this.ctxFactory = ctxFactory;
8286
this.handlerRegistry = new BlueGreenStateHandlerRegistry();
87+
this.flinkConfigManager = flinkConfigManager;
8388
this.statusRecorder = statusRecorder;
8489
}
8590

@@ -99,6 +104,9 @@ public List<EventSource<?, FlinkBlueGreenDeployment>> prepareEventSources(
99104

100105
eventSources.add(new InformerEventSource<>(config, context));
101106

107+
if (flinkConfigManager.getOperatorConfiguration().isManageIngress()) {
108+
eventSources.add(EventSourceUtils.getBlueGreenIngressInformerEventSource(context));
109+
}
102110
return eventSources;
103111
}
104112

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
3131
import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments;
3232
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
33+
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
3334
import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils;
3435
import org.apache.flink.util.Preconditions;
3536

@@ -733,12 +734,84 @@ public UpdateControl<FlinkBlueGreenDeployment> finalizeBlueGreenDeployment(
733734
context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
734735
context.getDeploymentStatus().setSavepointTriggerId(null);
735736

737+
updateBlueGreenIngress(context, nextState);
738+
736739
// Finalize status and reschedule immediately so any pending spec changes
737740
// (e.g., suspend requested during transition) are picked up on next reconcile
738741
return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING, null)
739742
.rescheduleAfter(0);
740743
}
741744

745+
/**
746+
* Reconciles ingress for the active deployment in ACTIVE states. This handles ingress spec
747+
* changes that occur while the deployment is stable (not transitioning).
748+
*
749+
* @param context the Blue/Green context
750+
* @param activeDeploymentType which deployment (BLUE or GREEN) is currently active
751+
*/
752+
public void reconcileIngressForActiveDeployment(
753+
BlueGreenContext context, BlueGreenDeploymentType activeDeploymentType) {
754+
FlinkDeployment activeDeployment = context.getDeploymentByType(activeDeploymentType);
755+
if (activeDeployment == null) {
756+
return;
757+
}
758+
759+
var flinkResourceContext =
760+
context.getCtxFactory()
761+
.getResourceContext(activeDeployment, context.getJosdkContext());
762+
763+
if (!flinkResourceContext.getOperatorConfig().isManageIngress()) {
764+
return;
765+
}
766+
767+
IngressUtils.reconcileBlueGreenIngress(
768+
context,
769+
true,
770+
activeDeployment,
771+
flinkResourceContext.getDeployConfig(activeDeployment.getSpec()),
772+
context.getJosdkContext());
773+
774+
LOG.info(
775+
"Successfully reconciled ingress for active deployment: {}",
776+
activeDeployment.getMetadata().getName());
777+
}
778+
779+
/**
780+
* Updates the ingress for Blue/Green deployment during transitions, pointing to the newly
781+
* active deployment.
782+
*
783+
* @param blueGreenContext the Blue/Green context
784+
* @param nextState which deployment (ACTIVE_BLUE or ACTIVE_GREEN) is becoming active
785+
*/
786+
public void updateBlueGreenIngress(
787+
BlueGreenContext blueGreenContext, FlinkBlueGreenDeploymentState nextState) {
788+
FlinkDeployment activeDeployment;
789+
switch (nextState) {
790+
case ACTIVE_BLUE:
791+
activeDeployment = blueGreenContext.getBlueDeployment();
792+
break;
793+
case ACTIVE_GREEN:
794+
activeDeployment = blueGreenContext.getGreenDeployment();
795+
break;
796+
default:
797+
LOG.info("Skipping ingress reconciliation for non-active state: {}", nextState);
798+
return;
799+
}
800+
801+
// Create a FlinkResourceContext for the active deployment to get proper config
802+
var flinkResourceContext =
803+
blueGreenContext
804+
.getCtxFactory()
805+
.getResourceContext(activeDeployment, blueGreenContext.getJosdkContext());
806+
807+
IngressUtils.reconcileBlueGreenIngress(
808+
blueGreenContext,
809+
flinkResourceContext.getOperatorConfig().isManageIngress(),
810+
activeDeployment,
811+
flinkResourceContext.getDeployConfig(activeDeployment.getSpec()),
812+
blueGreenContext.getJosdkContext());
813+
}
814+
742815
// ==================== Common Utility Methods ====================
743816

744817
public static UpdateControl<FlinkBlueGreenDeployment> patchStatusUpdateControl(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/ActiveStateHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public ActiveStateHandler(
3737
@Override
3838
public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext context) {
3939
BlueGreenDeploymentType currentType = getCurrentDeploymentType();
40+
41+
// Reconcile ingress for the active deployment (handles spec changes)
42+
deploymentService.reconcileIngressForActiveDeployment(context, currentType);
43+
4044
return deploymentService.checkAndInitiateDeployment(context, currentType);
4145
}
4246

0 commit comments

Comments
 (0)