Skip to content

Commit 5062a8c

Browse files
authored
[Feature][RayCluster]: Implement the HeadReady condition (#2261)
1 parent 28c729f commit 5062a8c

File tree

9 files changed

+205
-11
lines changed

9 files changed

+205
-11
lines changed

helm-chart/kuberay-operator/templates/_helpers.tpl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,22 @@ Create the name of the service account to use
5555
{{- end -}}
5656
{{- end -}}
5757

58+
59+
{{/*
60+
FeatureGates
61+
*/}}
62+
{{- define "kuberay.featureGates" -}}
63+
{{- $features := "" }}
64+
{{- range .Values.featureGates }}
65+
{{- $str := printf "%s=%t," .name .enabled }}
66+
{{- $features = print $features $str }}
67+
{{- end }}
68+
{{- with .Values.featureGates }}
69+
--feature-gates={{ $features | trimSuffix "," }}
70+
{{- end }}
71+
{{- end }}
72+
73+
5874
{{/*
5975
Create a template to ensure consistency for Role and ClusterRole.
6076
*/}}

helm-chart/kuberay-operator/templates/deployment.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ spec:
5656
- /manager
5757
args:
5858
{{- $argList := list -}}
59+
{{- $argList = append $argList (include "kuberay.featureGates" . | trim) -}}
5960
{{- if .Values.batchScheduler.enabled -}}
6061
{{- $argList = append $argList "--enable-batch-scheduler" -}}
6162
{{- end -}}

helm-chart/kuberay-operator/values.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ readinessProbe:
5858
batchScheduler:
5959
enabled: false
6060

61+
featureGates:
62+
- name: RayClusterStatusConditions
63+
enabled: false
64+
65+
6166
# Set up `securityContext` to improve Pod security.
6267
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/pod-security.md for further guidance.
6368
podSecurityContext: {}

ray-operator/apis/ray/v1/raycluster_types.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,17 @@ type RayClusterStatus struct {
166166

167167
type RayClusterConditionType string
168168

169+
// Custom Reason for RayClusterCondition
169170
const (
170-
// HeadReady is added in a RayCluster when its Head Pod is ready for requests.
171-
HeadReady RayClusterConditionType = "HeadReady"
171+
// PodRunningAndReady says that the pod is running and ready.
172+
PodRunningAndReady = "PodRunningAndReady"
173+
// UnknownReason says that the reason for the condition is unknown.
174+
UnknownReason = "Unknown"
175+
)
176+
177+
const (
178+
// HeadPodReady is added in a RayCluster when its Head Pod is ready for requests.
179+
HeadPodReady RayClusterConditionType = "HeadPodReady"
172180
// RayClusterReplicaFailure is added in a RayCluster when one of its pods fails to be created or deleted.
173181
RayClusterReplicaFailure RayClusterConditionType = "ReplicaFailure"
174182
)

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,6 +1201,26 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
12011201
newInstance.Status.State = rayv1.Ready
12021202
}
12031203

1204+
// Check if the head node is running and ready by checking the head pod's status.
1205+
if features.Enabled(features.RayClusterStatusConditions) {
1206+
headPod, err := common.GetRayClusterHeadPod(ctx, r, newInstance)
1207+
if err != nil {
1208+
return nil, err
1209+
}
1210+
// GetRayClusterHeadPod can return nil, nil when pod is not found, we handle it separately.
1211+
if headPod == nil {
1212+
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
1213+
Type: string(rayv1.HeadPodReady),
1214+
Status: metav1.ConditionFalse,
1215+
Reason: "HeadPodNotFound",
1216+
Message: "Head Pod not found",
1217+
})
1218+
} else {
1219+
replicaHeadPodReadyCondition := utils.FindPodReadyCondition(headPod, rayv1.HeadPodReady)
1220+
meta.SetStatusCondition(&newInstance.Status.Conditions, replicaHeadPodReadyCondition)
1221+
}
1222+
}
1223+
12041224
if newInstance.Spec.Suspend != nil && *newInstance.Spec.Suspend && len(runtimePods.Items) == 0 {
12051225
newInstance.Status.State = rayv1.Suspended
12061226
}

ray-operator/controllers/ray/raycluster_controller_unit_test.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1676,6 +1676,12 @@ func TestCalculateStatus(t *testing.T) {
16761676
Status: corev1.PodStatus{
16771677
PodIP: headNodeIP,
16781678
Phase: corev1.PodRunning,
1679+
Conditions: []corev1.PodCondition{
1680+
{
1681+
Type: corev1.PodReady,
1682+
Status: corev1.ConditionTrue,
1683+
},
1684+
},
16791685
},
16801686
}
16811687
runtimeObjects := []runtime.Object{headPod, headService}
@@ -1705,8 +1711,41 @@ func TestCalculateStatus(t *testing.T) {
17051711
assert.Nil(t, err)
17061712
assert.Empty(t, newInstance.Status.Conditions)
17071713

1708-
// Test reconcilePodsErr with the feature gate enabled
1714+
// enable feature gate for the following tests
17091715
defer features.SetFeatureGateDuringTest(t, features.RayClusterStatusConditions, true)()
1716+
1717+
// Test CheckRayHeadRunningAndReady with head pod running and ready
1718+
newInstance, _ = r.calculateStatus(ctx, testRayCluster, nil)
1719+
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.HeadPodReady), metav1.ConditionTrue))
1720+
condition := meta.FindStatusCondition(newInstance.Status.Conditions, string(rayv1.HeadPodReady))
1721+
assert.Equal(t, metav1.ConditionTrue, condition.Status)
1722+
1723+
// Test CheckRayHeadRunningAndReady with head pod not ready
1724+
headPod.Status.Conditions = []corev1.PodCondition{
1725+
{
1726+
Type: corev1.PodReady,
1727+
Status: corev1.ConditionFalse,
1728+
},
1729+
}
1730+
runtimeObjects = []runtime.Object{headPod, headService}
1731+
fakeClient = clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
1732+
r.Client = fakeClient
1733+
newInstance, _ = r.calculateStatus(ctx, testRayCluster, nil)
1734+
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.HeadPodReady), metav1.ConditionFalse))
1735+
condition = meta.FindStatusCondition(newInstance.Status.Conditions, string(rayv1.HeadPodReady))
1736+
assert.Equal(t, metav1.ConditionFalse, condition.Status)
1737+
1738+
// Test CheckRayHeadRunningAndReady with head pod not running
1739+
headPod.Status.Phase = corev1.PodFailed
1740+
runtimeObjects = []runtime.Object{headPod, headService}
1741+
fakeClient = clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
1742+
r.Client = fakeClient
1743+
newInstance, _ = r.calculateStatus(ctx, testRayCluster, nil)
1744+
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.HeadPodReady), metav1.ConditionFalse))
1745+
condition = meta.FindStatusCondition(newInstance.Status.Conditions, string(rayv1.HeadPodReady))
1746+
assert.Equal(t, metav1.ConditionFalse, condition.Status)
1747+
1748+
// Test reconcilePodsErr with the feature gate enabled
17101749
newInstance, err = r.calculateStatus(ctx, testRayCluster, utils.ErrFailedCreateHeadPod)
17111750
assert.Nil(t, err)
17121751
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.RayClusterReplicaFailure), metav1.ConditionTrue))

ray-operator/controllers/ray/rayservice_controller.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,26 @@ import (
1515
networkingv1 "k8s.io/api/networking/v1"
1616

1717
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
18+
"github.com/ray-project/kuberay/ray-operator/pkg/features"
1819

1920
cmap "github.com/orcaman/concurrent-map/v2"
2021

2122
"github.com/go-logr/logr"
2223
fmtErrors "github.com/pkg/errors"
2324
corev1 "k8s.io/api/core/v1"
2425
"k8s.io/apimachinery/pkg/api/errors"
26+
"k8s.io/apimachinery/pkg/api/meta"
2527
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
29+
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
30+
2631
"k8s.io/client-go/tools/record"
2732
"sigs.k8s.io/controller-runtime/pkg/builder"
2833
"sigs.k8s.io/controller-runtime/pkg/controller"
2934
"sigs.k8s.io/controller-runtime/pkg/manager"
3035
"sigs.k8s.io/controller-runtime/pkg/predicate"
3136
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3237

33-
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
34-
3538
"k8s.io/apimachinery/pkg/runtime"
3639
ctrl "sigs.k8s.io/controller-runtime"
3740
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -1055,13 +1058,22 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
10551058
// after the head pod is running and ready. Hence, some requests to the Dashboard (e.g. `UpdateDeployments`) may fail.
10561059
// This is not an issue since `UpdateDeployments` is an idempotent operation.
10571060
logger.Info("Check the head Pod status of the pending RayCluster", "RayCluster name", rayClusterInstance.Name)
1058-
if isRunningAndReady, err := r.isHeadPodRunningAndReady(ctx, rayClusterInstance); err != nil || !isRunningAndReady {
1059-
if err != nil {
1060-
logger.Error(err, "Failed to check if head Pod is running and ready!")
1061-
} else {
1062-
logger.Info("Skipping the update of Serve deployments because the Ray head Pod is not ready.")
1061+
1062+
// check the latest condition of the head Pod to see if it is ready.
1063+
if features.Enabled(features.RayClusterStatusConditions) {
1064+
if !meta.IsStatusConditionTrue(rayClusterInstance.Status.Conditions, string(rayv1.HeadPodReady)) {
1065+
logger.Info("The head Pod is not ready, requeue the resource event to avoid redundant custom resource status updates.")
1066+
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, nil
1067+
}
1068+
} else {
1069+
if isRunningAndReady, err := r.isHeadPodRunningAndReady(ctx, rayClusterInstance); err != nil || !isRunningAndReady {
1070+
if err != nil {
1071+
logger.Error(err, "Failed to check if head Pod is running and ready!")
1072+
} else {
1073+
logger.Info("Skipping the update of Serve deployments because the Ray head Pod is not ready.")
1074+
}
1075+
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
10631076
}
1064-
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
10651077
}
10661078

10671079
// TODO(architkulkarni): Check the RayVersion. If < 2.8.0, error.

ray-operator/controllers/ray/utils/util.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,38 @@ func IsCreated(pod *corev1.Pod) bool {
7171
return pod.Status.Phase != ""
7272
}
7373

74+
func FindPodReadyCondition(pod *corev1.Pod, condType rayv1.RayClusterConditionType) metav1.Condition {
75+
replicaPodReadyCondition := metav1.Condition{
76+
Type: string(condType),
77+
Status: metav1.ConditionFalse,
78+
Reason: rayv1.UnknownReason,
79+
}
80+
81+
for _, cond := range pod.Status.Conditions {
82+
if cond.Type != corev1.PodReady {
83+
continue
84+
}
85+
// Set the status based on the PodReady condition
86+
replicaPodReadyCondition.Status = metav1.ConditionStatus(cond.Status)
87+
replicaPodReadyCondition.Message = cond.Message
88+
89+
// Determine the reason; default to PodRunningAndReady if the pod is ready but no specific reason is provided
90+
reason := cond.Reason
91+
if cond.Status == corev1.ConditionTrue && reason == "" {
92+
reason = rayv1.PodRunningAndReady
93+
}
94+
95+
// Update the reason if it's not empty
96+
if reason != "" {
97+
replicaPodReadyCondition.Reason = reason
98+
}
99+
100+
// Since we're only interested in the PodReady condition, break after processing it
101+
break
102+
}
103+
return replicaPodReadyCondition
104+
}
105+
74106
// IsRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady.
75107
func IsRunningAndReady(pod *corev1.Pod) bool {
76108
if pod.Status.Phase != corev1.PodRunning {

ray-operator/controllers/ray/utils/util_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,31 @@ func createSomePodWithCondition(typ corev1.PodConditionType, status corev1.Condi
265265
}
266266
}
267267

268+
func createRayHeadPodWithPhaseAndCondition(phase corev1.PodPhase, typ corev1.PodConditionType, status corev1.ConditionStatus) (pod *corev1.Pod) {
269+
return &corev1.Pod{
270+
TypeMeta: metav1.TypeMeta{
271+
APIVersion: "v1",
272+
Kind: "Pod",
273+
},
274+
ObjectMeta: metav1.ObjectMeta{
275+
Name: "raycluster-sample-head",
276+
Namespace: "default",
277+
Labels: map[string]string{
278+
"ray.io/node-type": string(rayv1.HeadNode),
279+
},
280+
},
281+
Status: corev1.PodStatus{
282+
Phase: phase,
283+
Conditions: []corev1.PodCondition{
284+
{
285+
Type: typ,
286+
Status: status,
287+
},
288+
},
289+
},
290+
}
291+
}
292+
268293
func TestGetHeadGroupServiceAccountName(t *testing.T) {
269294
tests := map[string]struct {
270295
input *rayv1.RayCluster
@@ -588,6 +613,42 @@ env_vars:
588613
}
589614
}
590615

616+
func TestFindHeadPodReadyCondition(t *testing.T) {
617+
tests := map[string]struct {
618+
pod *corev1.Pod
619+
expected metav1.Condition
620+
}{
621+
"condition true if Ray head pod is running and ready": {
622+
pod: createRayHeadPodWithPhaseAndCondition(corev1.PodRunning, corev1.PodReady, corev1.ConditionTrue),
623+
expected: metav1.Condition{
624+
Type: string(rayv1.HeadPodReady),
625+
Status: metav1.ConditionTrue,
626+
},
627+
},
628+
"condition false if Ray head pod is not running": {
629+
pod: createRayHeadPodWithPhaseAndCondition(corev1.PodPending, corev1.PodReady, corev1.ConditionFalse),
630+
expected: metav1.Condition{
631+
Type: string(rayv1.HeadPodReady),
632+
Status: metav1.ConditionFalse,
633+
},
634+
},
635+
"condition false if Ray head pod is not ready": {
636+
pod: createRayHeadPodWithPhaseAndCondition(corev1.PodRunning, corev1.PodReady, corev1.ConditionFalse),
637+
expected: metav1.Condition{
638+
Type: string(rayv1.HeadPodReady),
639+
Status: metav1.ConditionFalse,
640+
},
641+
},
642+
}
643+
644+
for name, tc := range tests {
645+
t.Run(name, func(t *testing.T) {
646+
replicaHeadPodReadyCondition := FindPodReadyCondition(tc.pod, rayv1.HeadPodReady)
647+
assert.Equal(t, tc.expected.Status, replicaHeadPodReadyCondition.Status)
648+
})
649+
}
650+
}
651+
591652
func TestErrRayClusterReplicaFailureReason(t *testing.T) {
592653
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteAllPods), "FailedDeleteAllPods")
593654
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteHeadPod), "FailedDeleteHeadPod")

0 commit comments

Comments
 (0)