1
1
/*
2
- Copyright 2024 The Aibrix Team.
2
+ Copyright 2025 The Aibrix Team.
3
3
4
4
Licensed under the Apache License, Version 2.0 (the "License");
5
5
you may not use this file except in compliance with the License.
@@ -18,45 +18,138 @@ package roleset
18
18
19
19
import (
20
20
"context"
21
+ "fmt"
22
+ "time"
21
23
24
+ v1 "k8s.io/api/core/v1"
25
+ apiequality "k8s.io/apimachinery/pkg/api/equality"
22
26
"k8s.io/apimachinery/pkg/runtime"
27
+ "k8s.io/client-go/tools/record"
28
+ "k8s.io/klog/v2"
23
29
ctrl "sigs.k8s.io/controller-runtime"
24
30
"sigs.k8s.io/controller-runtime/pkg/client"
25
- "sigs.k8s.io/controller-runtime/pkg/log"
31
+ "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
32
+ "sigs.k8s.io/controller-runtime/pkg/manager"
33
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
26
34
27
35
orchestrationv1alpha1 "github.com/vllm-project/aibrix/api/orchestration/v1alpha1"
36
+ "github.com/vllm-project/aibrix/pkg/config"
37
+ orchestrationctrl "github.com/vllm-project/aibrix/pkg/controller/util/orchestration"
38
+ "github.com/vllm-project/aibrix/pkg/controller/util/patch"
28
39
)
29
40
41
+ const (
42
+ ControllerName = "roleset-controller"
43
+ PodBurst = 500
44
+ PodOperationInitBatchSize = 16
45
+ RoleSetFinalizer = "roleset-finalizer"
46
+ )
47
+
48
+ // controllerKind contains the schema.GroupVersionKind for this controller type.
49
+ var controllerKind = orchestrationv1alpha1 .SchemeGroupVersion .WithKind (orchestrationv1alpha1 .RoleSetKind )
50
+
51
+ // Add creates a new ModelAdapter Controller and adds it to the Manager with default RBAC.
52
+ // The Manager will set fields on the Controller and Start it when the Manager is Started.
53
+ func Add (mgr manager.Manager , runtimeConfig config.RuntimeConfig ) error {
54
+ r , err := newReconciler (mgr , runtimeConfig )
55
+ if err != nil {
56
+ return err
57
+ }
58
+ return add (mgr , r )
59
+ }
60
+
61
+ // add adds a new Controller to mgr with r as the reconcile.Reconciler
62
+ func add (mgr manager.Manager , r reconcile.Reconciler ) error {
63
+ // use the builder fashion. If we need more fine grain control later, we can switch to `controller.New()`
64
+ err := ctrl .NewControllerManagedBy (mgr ).
65
+ Named (ControllerName ).
66
+ For (& orchestrationv1alpha1.RoleSet {}).
67
+ Owns (& v1.Pod {}).
68
+ // TODO: Bring PodGroup back later
69
+ //Owns(&schedv1alpha1.PodGroup{}).
70
+ Complete (r )
71
+ if err != nil {
72
+ return err
73
+ }
74
+
75
+ klog .InfoS ("Finished to add roleset-controller" )
76
+ return nil
77
+ }
78
+
79
+ // newReconciler returns a new reconcile.Reconciler
80
+ func newReconciler (mgr manager.Manager , runtimeConfig config.RuntimeConfig ) (reconcile.Reconciler , error ) {
81
+ reconciler := & RoleSetReconciler {
82
+ Client : mgr .GetClient (),
83
+ Scheme : mgr .GetScheme (),
84
+ EventRecorder : mgr .GetEventRecorderFor (ControllerName ),
85
+ }
86
+ return reconciler , nil
87
+ }
88
+
30
89
// RoleSetReconciler reconciles a RoleSet object
31
90
type RoleSetReconciler struct {
32
91
client.Client
33
92
Scheme * runtime.Scheme
93
+
94
+ EventRecorder record.EventRecorder
34
95
}
35
96
36
97
//+kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rolesets,verbs=get;list;watch;create;update;patch;delete
37
98
//+kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rolesets/status,verbs=get;update;patch
38
99
//+kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rolesets/finalizers,verbs=update
39
100
40
- // Reconcile is part of the main kubernetes reconciliation loop which aims to
41
- // move the current state of the cluster closer to the desired state.
42
- // TODO(user): Modify the Reconcile function to compare the state specified by
43
- // the RoleSet object against the actual cluster state, and then
44
- // perform operations to make the cluster state reflect the state specified by
45
- // the user.
46
- //
47
- // For more details, check Reconcile and its Result here:
48
- // - https://pkg.go.dev/sigs.k8s.io/[email protected] /pkg/reconcile
49
101
func (r * RoleSetReconciler ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
50
- _ = log .FromContext (ctx )
102
+ klog .Infof ("Reconciling RoleSet %s" , req .NamespacedName .String ())
103
+ ctx , cancel := context .WithTimeout (ctx , 1 * time .Minute )
104
+ defer cancel ()
105
+ roleSet := & orchestrationv1alpha1.RoleSet {}
106
+ if err := r .Client .Get (ctx , req .NamespacedName , roleSet ); err != nil {
107
+ return ctrl.Result {}, client .IgnoreNotFound (err )
108
+ }
109
+ if roleSet .DeletionTimestamp != nil {
110
+ if done , err := r .finalize (ctx , roleSet ); err != nil {
111
+ klog .Errorf ("Reconciling RoleSet %s finalize error %v" , req .NamespacedName .String (), err )
112
+ return ctrl.Result {RequeueAfter : 30 * time .Second }, err
113
+ } else if ! done {
114
+ return ctrl.Result {RequeueAfter : 30 * time .Second }, nil
115
+ }
116
+ return ctrl.Result {}, nil
117
+ } else if ! controllerutil .ContainsFinalizer (roleSet , RoleSetFinalizer ) {
118
+ // add finalizer if not exist
119
+ if err := orchestrationctrl .Patch (ctx , r .Client , roleSet , patch .AddFinalizerPatch (roleSet , RoleSetFinalizer )); err != nil {
120
+ klog .Errorf ("Adding RoleSet %s finalizer error %v" , req .NamespacedName .String (), err )
121
+ return ctrl.Result {RequeueAfter : 30 * time .Second }, err
122
+ }
123
+ return ctrl.Result {}, nil
124
+ }
51
125
52
- // TODO(user): your logic here
126
+ var managedErrors [] error
53
127
54
- return ctrl.Result {}, nil
55
- }
128
+ // TODO: disable pod group at this moment, consider to switch to open source pod group api later.
129
+ // 1. sync pod group
130
+ //if err := r.syncPodGroup(ctx, roleSet, &roleSet.Spec); err != nil {
131
+ // managedErrors = append(managedErrors, fmt.Errorf("sync pod group error %v", err))
132
+ //}
56
133
57
- // SetupWithManager sets up the controller with the Manager.
58
- func (r * RoleSetReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
59
- return ctrl .NewControllerManagedBy (mgr ).
60
- For (& orchestrationv1alpha1.RoleSet {}).
61
- Complete (r )
134
+ // 2. sync pods
135
+ err := r .syncPods (ctx , roleSet )
136
+ if err != nil {
137
+ managedErrors = append (managedErrors , fmt .Errorf ("sync pod error %v" , err ))
138
+ }
139
+
140
+ // 3. update roleset status
141
+ status , err := r .calculateStatus (ctx , roleSet , managedErrors )
142
+ if err != nil {
143
+ klog .Infof ("roleset %s/%s calculate status error %v" , roleSet .Namespace , roleSet .Name , err )
144
+ return ctrl.Result {RequeueAfter : 1 * time .Minute }, err
145
+ }
146
+ if apiequality .Semantic .DeepEqual (& roleSet .Status , status ) {
147
+ return ctrl.Result {}, nil
148
+ }
149
+ roleSet .Status = * status
150
+ if err := orchestrationctrl .UpdateStatus (ctx , r .Scheme , r .Client , roleSet ); err != nil {
151
+ klog .Infof ("roleset %s/%s update status error %v" , roleSet .Namespace , roleSet .Name , err )
152
+ return ctrl.Result {RequeueAfter : 1 * time .Minute }, err
153
+ }
154
+ return ctrl.Result {}, nil
62
155
}
0 commit comments