Skip to content

Add RayClusterReplicaSet initial implementation #165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ linters:
- lll
- misspell
- nakedret
- prealloc
# - prealloc
- staticcheck
- typecheck
- unconvert
Expand Down
13 changes: 11 additions & 2 deletions api/orchestration/v1alpha1/rayclusterreplicaset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// Since RayClusterReplicaSet design principle is high similar to upstream ReplicaSet, most of the spec api fields are
// from upstream codes. The only difference is it manages the RayCluster template instead of pod template.
// The other thing to notice is some of the advanced feature of replicaset may not be supported here.

// RayClusterReplicaSetSpec defines the desired state of RayClusterReplicaSet
type RayClusterReplicaSetSpec struct {
Expand Down Expand Up @@ -83,6 +84,14 @@ type RayClusterReplicaSetStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,6,rep,name=conditions"`
}

// These are valid conditions of a replica set.
const (
// RayClusterReplicaSetReplicaFailure is added in a replica set when one of its ray cluster fails to be created
// due to insufficient quota, limit ranges, pod security policy, node selectors, etc. or deleted
// due to kubelet being down or finalizers are failing.
RayClusterReplicaSetReplicaFailure string = "ReplicaFailure"
)

// +genclient
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
Expand Down
26 changes: 26 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,29 @@ rules:
- get
- patch
- update
- apiGroups:
- ray.io
resources:
- rayclusters
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ray.io
resources:
- rayclusters/finalizers
verbs:
- update
- apiGroups:
- ray.io
resources:
- rayclusters/status
verbs:
- get
- patch
- update
190 changes: 155 additions & 35 deletions pkg/controller/rayclusterreplicaset/rayclusterreplicaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package rayclusterreplicaset

import (
"context"
"fmt"
"sync"
"time"

modelv1alpha1 "github.com/aibrix/aibrix/api/model/v1alpha1"
"github.com/aibrix/aibrix/pkg/controller/util/expectation"
apierrors "k8s.io/apimachinery/pkg/api/errors"

rayclusterv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand All @@ -37,10 +40,17 @@ import (
orchestrationv1alpha1 "github.com/aibrix/aibrix/api/orchestration/v1alpha1"
)

var (
controllerName = "raycluster-replicaset-controller"
defaultRequeueDurationForWaitingExpectation = 5 * time.Second
controllerKind = modelv1alpha1.GroupVersion.WithKind("RayClusterReplicaSet")
)

// Add creates a new RayClusterReplicaSet Controller and adds it to the Manager with default RBAC.
// The Manager will set fields on the Controller and Start it when the Manager is Started.
func Add(mgr manager.Manager) error {
// TODO: check crd exists or not. If not, we should fail here directly without moving forward.
// This is used to validate whether kuberay is installed now.

r, err := newReconciler(mgr)
if err != nil {
Expand All @@ -51,22 +61,11 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
// get kubernetes client from manager
config := mgr.GetConfig()
k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Unable to create Kubernetes client: %v", err)
}

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: k8sClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(mgr.GetScheme(), corev1.EventSource{Component: "model-adapter-controller"})

reconciler := &RayClusterReplicaSetReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: recorder,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor(controllerName),
Expectations: expectation.NewControllerExpectations(),
}
return reconciler, nil
}
Expand All @@ -76,10 +75,10 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
// use the builder fashion. If we need more fine grain control later, we can switch to `controller.New()`
err := ctrl.NewControllerManagedBy(mgr).
For(&orchestrationv1alpha1.RayClusterReplicaSet{}).
Watches(&rayclusterv1.RayCluster{}, &handler.EnqueueRequestForObject{}).
Owns(&rayclusterv1.RayCluster{}).
Complete(r)

klog.V(4).InfoS("Finished to add model-adapter-controller")
klog.V(4).InfoS("Finished to add raycluster-replicaset-controller")
return err
}

Expand All @@ -90,25 +89,146 @@ type RayClusterReplicaSetReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
// A cache raycluster creates/deletes each raycluster replicaset to see
// We use replicaset namespace/name as an expectation key
// For example, there is a RayClusterReplicaSet with namespace "aibrix", name "llama7b" and replica 3,
// We will create the expectation:
// - "aibrix/llama7b", expects 3 adds.
Expectations expectation.ControllerExpectationsInterface
}

//+kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rayclusterreplicasets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rayclusterreplicasets/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rayclusterreplicasets/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the RayClusterReplicaSet object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
// +kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rayclusterreplicasets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rayclusterreplicasets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=orchestration.aibrix.ai,resources=rayclusterreplicasets/finalizers,verbs=update
// +kubebuilder:rbac:groups=ray.io,resources=rayclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=ray.io,resources=rayclusters/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=ray.io,resources=rayclusters/finalizers,verbs=update

// Reconcile method moves the RayClusterReplicaSet to desired State
func (r *RayClusterReplicaSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)

// TODO(user): your logic here
replicaset := &orchestrationv1alpha1.RayClusterReplicaSet{}
if err := r.Get(ctx, req.NamespacedName, replicaset); err != nil {
klog.ErrorS(err, "unable to fetch raycluster-replicaset")
return ctrl.Result{}, client.IgnoreNotFound(err)
}

rsKey := req.NamespacedName.String()
if r.Expectations.SatisfiedExpectations(rsKey) {
klog.InfoS("Expectations not met, requeuing", "replicaset", rsKey)
return ctrl.Result{RequeueAfter: defaultRequeueDurationForWaitingExpectation}, nil
}

// fetch current ray cluster associated with this replicaset
rayclusterList := &rayclusterv1.RayClusterList{}
ListOps := []client.ListOption{
client.InNamespace(replicaset.Namespace),
// Note: we simplify the case a little bit and
// there's no need to follow replicaset's implementation to list all clusters here.
client.MatchingLabels(replicaset.Spec.Selector.MatchLabels),
}

if err := r.Client.List(ctx, rayclusterList, ListOps...); err != nil {
klog.ErrorS(err, "unable to list rayclusters")
return ctrl.Result{}, err
}

filteredClusters := filterActiveClusters(rayclusterList.Items)
currentReplicas := int32(len(filteredClusters))

// Determine the scaling operation (scale up or down)
desiredReplicas := *replicaset.Spec.Replicas
var scaleError error
if currentReplicas < desiredReplicas {
diff := desiredReplicas - currentReplicas
_ = r.Expectations.ExpectCreations(rsKey, int(diff))
scaleError = r.scaleUp(ctx, replicaset, int(diff))
} else if currentReplicas > desiredReplicas {
diff := currentReplicas - desiredReplicas
_ = r.Expectations.ExpectDeletions(rsKey, int(diff))
scaleError = r.scaleDown(ctx, filteredClusters, int(diff))
}

newStatus := calculateStatus(replicaset, filteredClusters, scaleError)
if err := r.updateReplicaSetStatus(replicaset, newStatus, rsKey); err != nil {
return reconcile.Result{}, err
}

return ctrl.Result{}, nil
}

// scaleUp handles RayCluster creation logic when scaling up
func (r *RayClusterReplicaSetReconciler) scaleUp(ctx context.Context, replicaset *orchestrationv1alpha1.RayClusterReplicaSet, diff int) error {
for i := 0; i < diff; i++ {
newCluster := constructRayCluster(replicaset)
if err := r.Create(ctx, newCluster); err != nil {
r.Expectations.CreationObserved(replicaset.Name)
return fmt.Errorf("failed to create pod: %w", err)
}
}
return nil
}

// scaleDown handles RayCluster deletion logic when scaling down
func (r *RayClusterReplicaSetReconciler) scaleDown(ctx context.Context, clusters []rayclusterv1.RayCluster, diff int) error {
var wg sync.WaitGroup
errCh := make(chan error, diff)

for i := 0; i < diff; i++ {
cluster := clusters[i]
wg.Add(1)
go func(cluster rayclusterv1.RayCluster) {
defer wg.Done()
if err := r.Delete(ctx, &cluster); err != nil {
r.Expectations.DeletionObserved(cluster.Name)
if !apierrors.IsNotFound(err) {
errCh <- fmt.Errorf("failed to delete pod: %w", err)
}
}
}(cluster)
}

wg.Wait()

select {
case err := <-errCh:
return err
default:
return nil
}
}

// updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
func (r *RayClusterReplicaSetReconciler) updateReplicaSetStatus(rs *orchestrationv1alpha1.RayClusterReplicaSet, newStatus orchestrationv1alpha1.RayClusterReplicaSetStatus, rsKey string) error {
// Check if the expectations have been fulfilled for this ReplicaSet
if !r.Expectations.SatisfiedExpectations(rsKey) {
klog.V(4).Info("Expectations not yet fulfilled for ReplicaSet, delaying status update", "replicaSet", rsKey)
return nil
}

if same := isStatusSame(rs, newStatus); same {
return nil
}

// Update the observed generation to ensure the status reflects the latest changes
newStatus.ObservedGeneration = rs.Generation

// Log the status update
klog.V(4).Info(fmt.Sprintf("Updating status for %v: %s/%s, ", rs.Kind, rs.Namespace, rs.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, *(rs.Spec.Replicas)) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) +
fmt.Sprintf("availableReplicas %d->%d, ", rs.Status.AvailableReplicas, newStatus.AvailableReplicas) +
fmt.Sprintf("observedGeneration %v->%v", rs.Status.ObservedGeneration, newStatus.ObservedGeneration))

// Update ReplicaSet status if necessary
newInstance := rs.DeepCopy()
newInstance.Status = newStatus
if err := r.Status().Update(context.Background(), newInstance); err != nil {
klog.ErrorS(err, "unable to update ReplicaSet status")
return err
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package rayclusterreplicaset
import (
"context"

"github.com/aibrix/aibrix/pkg/controller/util/expectation"

rayclusterv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -97,8 +99,9 @@ var _ = Describe("RayClusterReplicaSet Controller", func() {
It("should successfully reconcile the resource", func() {
By("Reconciling the created resource")
controllerReconciler := &RayClusterReplicaSetReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Expectations: expectation.NewControllerExpectations(),
}

_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
Expand Down
Loading
Loading