@@ -18,14 +18,17 @@ package rayclusterreplicaset
18
18
19
19
import (
20
20
"context"
21
+ "fmt"
22
+ "sync"
23
+ "time"
24
+
25
+ modelv1alpha1 "github.com/aibrix/aibrix/api/model/v1alpha1"
26
+ "github.com/aibrix/aibrix/pkg/controller/util/expectation"
27
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
21
28
22
29
rayclusterv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
23
- corev1 "k8s.io/api/core/v1"
24
- "k8s.io/client-go/kubernetes"
25
- clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
26
30
"k8s.io/client-go/tools/record"
27
31
"k8s.io/klog/v2"
28
- "sigs.k8s.io/controller-runtime/pkg/handler"
29
32
"sigs.k8s.io/controller-runtime/pkg/manager"
30
33
"sigs.k8s.io/controller-runtime/pkg/reconcile"
31
34
@@ -37,10 +40,17 @@ import (
37
40
orchestrationv1alpha1 "github.com/aibrix/aibrix/api/orchestration/v1alpha1"
38
41
)
39
42
43
+ var (
44
+ controllerName = "raycluster-replicaset-controller"
45
+ defaultRequeueDurationForWaitingExpectation = 5 * time .Second
46
+ controllerKind = modelv1alpha1 .GroupVersion .WithKind ("RayClusterReplicaSet" )
47
+ )
48
+
40
49
// Add creates a new RayClusterReplicaSet Controller and adds it to the Manager with default RBAC.
41
50
// The Manager will set fields on the Controller and Start it when the Manager is Started.
42
51
func Add (mgr manager.Manager ) error {
43
52
// TODO: check crd exists or not. If not, we should fail here directly without moving forward.
53
+ // This is used to validate whether kuberay is installed now.
44
54
45
55
r , err := newReconciler (mgr )
46
56
if err != nil {
@@ -51,22 +61,11 @@ func Add(mgr manager.Manager) error {
51
61
52
62
// newReconciler returns a new reconcile.Reconciler
53
63
func newReconciler (mgr manager.Manager ) (reconcile.Reconciler , error ) {
54
- // get kubernetes client from manager
55
- config := mgr .GetConfig ()
56
- k8sClient , err := kubernetes .NewForConfig (config )
57
- if err != nil {
58
- klog .Fatalf ("Unable to create Kubernetes client: %v" , err )
59
- }
60
-
61
- eventBroadcaster := record .NewBroadcaster ()
62
- eventBroadcaster .StartLogging (klog .Infof )
63
- eventBroadcaster .StartRecordingToSink (& clientv1core.EventSinkImpl {Interface : k8sClient .CoreV1 ().Events ("" )})
64
- recorder := eventBroadcaster .NewRecorder (mgr .GetScheme (), corev1.EventSource {Component : "model-adapter-controller" })
65
-
66
64
reconciler := & RayClusterReplicaSetReconciler {
67
- Client : mgr .GetClient (),
68
- Scheme : mgr .GetScheme (),
69
- Recorder : recorder ,
65
+ Client : mgr .GetClient (),
66
+ Scheme : mgr .GetScheme (),
67
+ Recorder : mgr .GetEventRecorderFor (controllerName ),
68
+ Expectations : expectation .NewControllerExpectations (),
70
69
}
71
70
return reconciler , nil
72
71
}
@@ -76,10 +75,10 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
76
75
// use the builder fashion. If we need more fine grain control later, we can switch to `controller.New()`
77
76
err := ctrl .NewControllerManagedBy (mgr ).
78
77
For (& orchestrationv1alpha1.RayClusterReplicaSet {}).
79
- Watches (& rayclusterv1.RayCluster {}, & handler. EnqueueRequestForObject {}).
78
+ Owns (& rayclusterv1.RayCluster {}).
80
79
Complete (r )
81
80
82
- klog .V (4 ).InfoS ("Finished to add model-adapter -controller" )
81
+ klog .V (4 ).InfoS ("Finished to add raycluster-replicaset -controller" )
83
82
return err
84
83
}
85
84
@@ -90,25 +89,146 @@ type RayClusterReplicaSetReconciler struct {
90
89
client.Client
91
90
Scheme * runtime.Scheme
92
91
Recorder record.EventRecorder
92
+ // A cache raycluster creates/deletes each raycluster replicaset to see
93
+ // We use replicaset namespace/name as an expectation key
94
+ // For example, there is a RayClusterReplicaSet with namespace "aibrix", name "llama7b" and replica 3,
95
+ // We will create the expectation:
96
+ // - "aibrix/llama7b", expects 3 adds.
97
+ Expectations expectation.ControllerExpectationsInterface
93
98
}
94
99
95
- //+kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rayclusterreplicasets,verbs=get;list;watch;create;update;patch;delete
96
- //+kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rayclusterreplicasets/status,verbs=get;update;patch
97
- //+kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rayclusterreplicasets/finalizers,verbs=update
98
-
99
- // Reconcile is part of the main kubernetes reconciliation loop which aims to
100
- // move the current state of the cluster closer to the desired state.
101
- // TODO(user): Modify the Reconcile function to compare the state specified by
102
- // the RayClusterReplicaSet object against the actual cluster state, and then
103
- // perform operations to make the cluster state reflect the state specified by
104
- // the user.
105
- //
106
- // For more details, check Reconcile and its Result here:
107
- // - https://pkg.go.dev/sigs.k8s.io/[email protected] /pkg/reconcile
100
+ // +kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rayclusterreplicasets,verbs=get;list;watch;create;update;patch;delete
101
+ // +kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rayclusterreplicasets/status,verbs=get;update;patch
102
+ // +kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rayclusterreplicasets/finalizers,verbs=update
103
+ // +kubebuilder:rbac:groups=ray.io,resources=rayclusters,verbs=get;list;watch;create;update;patch;delete
104
+ // +kubebuilder:rbac:groups=ray.io,resources=rayclusters/status,verbs=get;update;patch
105
+ // +kubebuilder:rbac:groups=ray.io,resources=rayclusters/finalizers,verbs=update
106
+
107
+ // Reconcile method moves the RayClusterReplicaSet to desired State
108
108
func (r * RayClusterReplicaSetReconciler ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
109
109
_ = log .FromContext (ctx )
110
110
111
- // TODO(user): your logic here
111
+ replicaset := & orchestrationv1alpha1.RayClusterReplicaSet {}
112
+ if err := r .Get (ctx , req .NamespacedName , replicaset ); err != nil {
113
+ klog .ErrorS (err , "unable to fetch raycluster-replicaset" )
114
+ return ctrl.Result {}, client .IgnoreNotFound (err )
115
+ }
116
+
117
+ rsKey := req .NamespacedName .String ()
118
+ if r .Expectations .SatisfiedExpectations (rsKey ) {
119
+ klog .InfoS ("Expectations not met, requeuing" , "replicaset" , rsKey )
120
+ return ctrl.Result {RequeueAfter : defaultRequeueDurationForWaitingExpectation }, nil
121
+ }
122
+
123
+ // fetch current ray cluster associated with this replicaset
124
+ rayclusterList := & rayclusterv1.RayClusterList {}
125
+ ListOps := []client.ListOption {
126
+ client .InNamespace (replicaset .Namespace ),
127
+ // Note: we simplify the case a little bit and
128
+ // there's no need to follow replicaset's implementation to list all clusters here.
129
+ client .MatchingLabels (replicaset .Spec .Selector .MatchLabels ),
130
+ }
131
+
132
+ if err := r .Client .List (ctx , rayclusterList , ListOps ... ); err != nil {
133
+ klog .ErrorS (err , "unable to list rayclusters" )
134
+ return ctrl.Result {}, err
135
+ }
136
+
137
+ filteredClusters := filterActiveClusters (rayclusterList .Items )
138
+ currentReplicas := int32 (len (filteredClusters ))
139
+
140
+ // Determine the scaling operation (scale up or down)
141
+ desiredReplicas := * replicaset .Spec .Replicas
142
+ var scaleError error
143
+ if currentReplicas < desiredReplicas {
144
+ diff := desiredReplicas - currentReplicas
145
+ _ = r .Expectations .ExpectCreations (rsKey , int (diff ))
146
+ scaleError = r .scaleUp (ctx , replicaset , int (diff ))
147
+ } else if currentReplicas > desiredReplicas {
148
+ diff := currentReplicas - desiredReplicas
149
+ _ = r .Expectations .ExpectDeletions (rsKey , int (diff ))
150
+ scaleError = r .scaleDown (ctx , filteredClusters , int (diff ))
151
+ }
152
+
153
+ newStatus := calculateStatus (replicaset , filteredClusters , scaleError )
154
+ if err := r .updateReplicaSetStatus (replicaset , newStatus , rsKey ); err != nil {
155
+ return reconcile.Result {}, err
156
+ }
112
157
113
158
return ctrl.Result {}, nil
114
159
}
160
+
161
+ // scaleUp handles RayCluster creation logic when scaling up
162
+ func (r * RayClusterReplicaSetReconciler ) scaleUp (ctx context.Context , replicaset * orchestrationv1alpha1.RayClusterReplicaSet , diff int ) error {
163
+ for i := 0 ; i < diff ; i ++ {
164
+ newCluster := constructRayCluster (replicaset )
165
+ if err := r .Create (ctx , newCluster ); err != nil {
166
+ r .Expectations .CreationObserved (replicaset .Name )
167
+ return fmt .Errorf ("failed to create pod: %w" , err )
168
+ }
169
+ }
170
+ return nil
171
+ }
172
+
173
+ // scaleDown handles RayCluster deletion logic when scaling down
174
+ func (r * RayClusterReplicaSetReconciler ) scaleDown (ctx context.Context , clusters []rayclusterv1.RayCluster , diff int ) error {
175
+ var wg sync.WaitGroup
176
+ errCh := make (chan error , diff )
177
+
178
+ for i := 0 ; i < diff ; i ++ {
179
+ cluster := clusters [i ]
180
+ wg .Add (1 )
181
+ go func (cluster rayclusterv1.RayCluster ) {
182
+ defer wg .Done ()
183
+ if err := r .Delete (ctx , & cluster ); err != nil {
184
+ r .Expectations .DeletionObserved (cluster .Name )
185
+ if ! apierrors .IsNotFound (err ) {
186
+ errCh <- fmt .Errorf ("failed to delete pod: %w" , err )
187
+ }
188
+ }
189
+ }(cluster )
190
+ }
191
+
192
+ wg .Wait ()
193
+
194
+ select {
195
+ case err := <- errCh :
196
+ return err
197
+ default :
198
+ return nil
199
+ }
200
+ }
201
+
202
+ // updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
203
+ func (r * RayClusterReplicaSetReconciler ) updateReplicaSetStatus (rs * orchestrationv1alpha1.RayClusterReplicaSet , newStatus orchestrationv1alpha1.RayClusterReplicaSetStatus , rsKey string ) error {
204
+ // Check if the expectations have been fulfilled for this ReplicaSet
205
+ if ! r .Expectations .SatisfiedExpectations (rsKey ) {
206
+ klog .V (4 ).Info ("Expectations not yet fulfilled for ReplicaSet, delaying status update" , "replicaSet" , rsKey )
207
+ return nil
208
+ }
209
+
210
+ if same := isStatusSame (rs , newStatus ); same {
211
+ return nil
212
+ }
213
+
214
+ // Update the observed generation to ensure the status reflects the latest changes
215
+ newStatus .ObservedGeneration = rs .Generation
216
+
217
+ // Log the status update
218
+ klog .V (4 ).Info (fmt .Sprintf ("Updating status for %v: %s/%s, " , rs .Kind , rs .Namespace , rs .Name ) +
219
+ fmt .Sprintf ("replicas %d->%d (need %d), " , rs .Status .Replicas , newStatus .Replicas , * (rs .Spec .Replicas )) +
220
+ fmt .Sprintf ("fullyLabeledReplicas %d->%d, " , rs .Status .FullyLabeledReplicas , newStatus .FullyLabeledReplicas ) +
221
+ fmt .Sprintf ("readyReplicas %d->%d, " , rs .Status .ReadyReplicas , newStatus .ReadyReplicas ) +
222
+ fmt .Sprintf ("availableReplicas %d->%d, " , rs .Status .AvailableReplicas , newStatus .AvailableReplicas ) +
223
+ fmt .Sprintf ("observedGeneration %v->%v" , rs .Status .ObservedGeneration , newStatus .ObservedGeneration ))
224
+
225
+ // Update ReplicaSet status if necessary
226
+ newInstance := rs .DeepCopy ()
227
+ newInstance .Status = newStatus
228
+ if err := r .Status ().Update (context .Background (), newInstance ); err != nil {
229
+ klog .ErrorS (err , "unable to update ReplicaSet status" )
230
+ return err
231
+ }
232
+
233
+ return nil
234
+ }
0 commit comments