Skip to content

Commit 9f978ae

Browse files
committed
Add dask and ray clusters logic
1 parent d09a672 commit 9f978ae

103 files changed

Lines changed: 39443 additions & 31684 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

PROJECT

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,47 @@
33
# and allow the plugins properly work.
44
# More info: https://book.kubebuilder.io/reference/project-config.html
55
cliVersion: 4.8.0
6-
domain: polyaxon.com
6+
domain: com
77
layout:
8-
- go.kubebuilder.io/v4
8+
- go.kubebuilder.io/v4
99
multigroup: true
1010
projectName: mloperator
1111
repo: github.com/polyaxon/mloperator
1212
resources:
13-
- api:
14-
crdVersion: v1
15-
namespaced: true
16-
controller: true
17-
domain: polyaxon.com
18-
group: core
19-
kind: Operation
20-
path: github.com/polyaxon/mloperator/api/v1
21-
version: v1
13+
- api:
14+
crdVersion: v1
15+
namespaced: true
16+
controller: true
17+
domain: com
18+
group: polyaxon
19+
kind: Job
20+
path: github.com/polyaxon/mloperator/api/v1
21+
version: v1
22+
- api:
23+
crdVersion: v1
24+
namespaced: true
25+
controller: true
26+
domain: com
27+
group: polyaxon
28+
kind: Service
29+
path: github.com/polyaxon/mloperator/api/v1
30+
version: v1
31+
- api:
32+
crdVersion: v1
33+
namespaced: true
34+
controller: true
35+
domain: com
36+
group: polyaxon
37+
kind: KfJob
38+
path: github.com/polyaxon/mloperator/api/v1
39+
version: v1
40+
- api:
41+
crdVersion: v1
42+
namespaced: true
43+
controller: true
44+
domain: com
45+
group: polyaxon
46+
kind: Cluster
47+
path: github.com/polyaxon/mloperator/api/v1
48+
version: v1
2249
version: "3"

api/v1/batch_job.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ import (
1111
// +k8s:openapi-gen=true
1212
type BatchJobSpec struct {
1313
// Template describes the pods that will be created.
14+
// +kubebuilder:pruning:PreserveUnknownFields
1415
Template corev1.PodTemplateSpec `json:"template" protobuf:"bytes,1,opt,name=template"`
1516
}

api/v1/cluster_types.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package v1
2+
3+
import (
4+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
5+
)
6+
7+
// +kubebuilder:object:root=true
8+
9+
// Cluster is the Schema for the clusters API
10+
// +k8s:openapi-gen=true
11+
// +kubebuilder:subresource:status
12+
type Cluster struct {
13+
metav1.TypeMeta `json:",inline"`
14+
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
15+
16+
// Specifies the number of retries before marking this job failed.
17+
// +optional
18+
Termination TerminationSpec `json:"termination,omitempty" protobuf:"bytes,2,opt,name=termination"`
19+
20+
// Flag to set a finalizer for collecting logs
21+
// +optional
22+
CollectLogs bool `json:"collectLogs" protobuf:"bytes,3,opt,name=collectLogs"`
23+
24+
// Flag to set tell if Polyaxon should sync statuses with control plane
25+
// +optional
26+
SyncStatuses bool `json:"syncStatuses" protobuf:"bytes,4,opt,name=syncStatuses"`
27+
28+
// Specification of the desired behavior of a DaskCluster.
29+
DaskClusterSpec *DaskClusterSpec `json:"daskClusterSpec,omitempty" protobuf:"bytes,14,opt,name=daskClusterSpec"`
30+
31+
// Specification of the desired behavior of a RayCluster.
32+
RayClusterSpec *RayClusterSpec `json:"rayClusterSpec,omitempty" protobuf:"bytes,15,opt,name=rayClusterSpec"`
33+
34+
// Current status of an op.
35+
// +optional
36+
Status OperationStatus `json:"status,omitempty" protobuf:"bytes,11,opt,name=status"`
37+
}
38+
39+
// +kubebuilder:object:root=true
40+
41+
// ClusterList contains a list of Cluster
42+
type ClusterList struct {
43+
metav1.TypeMeta `json:",inline"`
44+
metav1.ListMeta `json:"metadata,omitempty"`
45+
Items []Cluster `json:"items"`
46+
}
47+
48+
func init() {
49+
SchemeBuilder.Register(&Cluster{}, &ClusterList{})
50+
}

api/v1/condition.go

Lines changed: 151 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -150,32 +150,165 @@ func hasLastOperationCondition(status OperationStatus, condType OperationConditi
150150
return false
151151
}
152152

153-
// isOperationStarting checks if an ml operation status is in starting condition
154-
func isOperationStarting(status OperationStatus) bool {
155-
return hasLastOperationCondition(status, OperationStarting)
153+
// IsStarting checks if an ml operation status is in starting condition
154+
func (status *OperationStatus) IsStarting() bool {
155+
return hasLastOperationCondition(*status, OperationStarting)
156156
}
157157

158-
// isOperationRunning checks if an ml operation status is in running condition
159-
func isOperationRunning(status OperationStatus) bool {
160-
return hasLastOperationCondition(status, OperationRunning)
158+
// IsRunning checks if an ml operation status is in running condition
159+
func (status *OperationStatus) IsRunning() bool {
160+
return hasLastOperationCondition(*status, OperationRunning)
161161
}
162162

163-
// isOperationWarning checks if an ml operation status is in warning condition
164-
func isOperationWarning(status OperationStatus) bool {
165-
return hasLastOperationCondition(status, OperationWarning)
163+
// IsWarning checks if an ml operation status is in warning condition
164+
func (status *OperationStatus) IsWarning() bool {
165+
return hasLastOperationCondition(*status, OperationWarning)
166166
}
167167

168-
// isOperationSucceeded checks if an ml operation status is succeeded
169-
func isOperationSucceeded(status OperationStatus) bool {
170-
return hasOperationCondition(status, OperationSucceeded)
168+
// IsSucceeded checks if an ml operation status is succeeded
169+
func (status *OperationStatus) IsSucceeded() bool {
170+
return hasOperationCondition(*status, OperationSucceeded)
171171
}
172172

173-
// isOperationFailed checks if an ml operation status is failed
174-
func isOperationFailed(status OperationStatus) bool {
175-
return hasOperationCondition(status, OperationFailed)
173+
// IsFailed checks if an ml operation status is failed
174+
func (status *OperationStatus) IsFailed() bool {
175+
return hasOperationCondition(*status, OperationFailed)
176176
}
177177

178-
// isOperationStopped checks if an ml operation status is stopped
179-
func isOperationStopped(status OperationStatus) bool {
180-
return hasOperationCondition(status, OperationStopped)
178+
// IsStopped checks if an ml operation status is stopped
179+
func (status *OperationStatus) IsStopped() bool {
180+
return hasOperationCondition(*status, OperationStopped)
181+
}
182+
183+
// IsDone checks if it the Operation reached a final condition
184+
func (status *OperationStatus) IsDone() bool {
185+
return status.IsSucceeded() || status.IsFailed() || status.IsStopped()
186+
}
187+
188+
// IsOperationBeingDeleted checks if a Kubernetes resource is being deleted
189+
// This works with any type that implements metav1.Object (Job, Service, Operation, etc.)
190+
func IsOperationBeingDeleted[T metav1.Object](obj T) bool {
191+
return !obj.GetDeletionTimestamp().IsZero()
192+
}
193+
194+
// removeCondition removes a condition of the specified type from the status
195+
func removeCondition(status *OperationStatus, conditionType OperationConditionType) {
196+
var newConditions []OperationCondition
197+
for _, c := range status.Conditions {
198+
if c.Type == conditionType {
199+
continue
200+
}
201+
newConditions = append(newConditions, c)
202+
}
203+
status.Conditions = newConditions
204+
}
205+
206+
// logCondition logs a condition to the status
207+
func logCondition(status *OperationStatus, condType OperationConditionType, conditionStatus corev1.ConditionStatus, reason, message string) bool {
208+
currentCond := getLastEntityCondition(*status, condType)
209+
cond, isUpdated := getOrUpdateOperationCondition(currentCond, condType, conditionStatus, reason, message)
210+
if isUpdated && cond != nil {
211+
removeCondition(status, condType)
212+
status.Conditions = append(status.Conditions, *cond)
213+
return true
214+
}
215+
return false
216+
}
217+
218+
// LogStarting sets Operation to starting
219+
func (status *OperationStatus) LogStarting() bool {
220+
return logCondition(status, OperationStarting, corev1.ConditionTrue, "OperatorController", "Operation is starting")
221+
}
222+
223+
// LogRunning sets Operation to running
224+
func (status *OperationStatus) LogRunning() bool {
225+
return logCondition(status, OperationRunning, corev1.ConditionTrue, "OperatorController", "Operation is running")
226+
}
227+
228+
// LogWarning sets Operation to succeeded
229+
func (status *OperationStatus) LogWarning(reason, message string) bool {
230+
if reason == "" {
231+
reason = "OperatorController"
232+
}
233+
if message == "" {
234+
message = "Underlaying job has an issue"
235+
}
236+
return logCondition(status, OperationWarning, corev1.ConditionTrue, reason, message)
237+
}
238+
239+
// LogSucceeded sets Operation to succeeded
240+
func (status *OperationStatus) LogSucceeded() bool {
241+
return logCondition(status, OperationSucceeded, corev1.ConditionTrue, "OperatorController", "Operation has succeeded")
242+
}
243+
244+
// LogFailed sets Operation to failed
245+
func (status *OperationStatus) LogFailed(reason, message string) bool {
246+
return logCondition(status, OperationFailed, corev1.ConditionTrue, reason, message)
247+
}
248+
249+
// LogStopped sets Operation to stopped
250+
func (status *OperationStatus) LogStopped(reason, message string) bool {
251+
return logCondition(status, OperationStopped, corev1.ConditionTrue, reason, message)
252+
}
253+
254+
// ShouldMarkJobAsDeleted checks if a job that doesn't exist should be marked as deleted
255+
// rather than being recreated. It returns true if the job was previously created and running
256+
// but is now missing (deleted externally).
257+
//
258+
// This function checks if the operation has progressed beyond initial creation by examining
259+
// the status conditions. If the last condition indicates the job was running or had warnings,
260+
// it means the job existed before and should not be recreated.
261+
//
262+
// Returns:
263+
// - shouldMarkDeleted: true if the job should be marked as deleted instead of recreated
264+
// - isDone: true if the operation is already in a terminal state
265+
func (status *OperationStatus) ShouldMarkJobAsDeleted() (shouldMarkDeleted bool, isDone bool) {
266+
// If operation is already done, don't recreate or mark as deleted
267+
if status.IsDone() {
268+
return false, true
269+
}
270+
271+
// Check if the job was previously created (status has progressed beyond creation)
272+
if len(status.Conditions) == 0 {
273+
// No conditions mean the job was never created, so it's safe to create it
274+
return false, false
275+
}
276+
277+
lastCond := status.Conditions[len(status.Conditions)-1]
278+
279+
// If the operation was previously running or had warnings, it means
280+
// the job existed before and was deleted externally
281+
if lastCond.Type == OperationRunning || lastCond.Type == OperationWarning {
282+
return true, false
283+
}
284+
285+
// For other states (Starting, etc.), allow job creation
286+
return false, false
287+
}
288+
289+
// MarkJobAsDeleted marks an operation as failed or stopped due to external job deletion.
290+
// This is a helper function that sets the appropriate status when a job is detected
291+
// as deleted externally.
292+
//
293+
// Parameters:
294+
// - jobType: a descriptive name for the job type (e.g., "Kubernetes Job", "TFJob")
295+
// - useFailed: if true, marks as Failed; if false, marks as Stopped
296+
//
297+
// Returns true if the status was updated
298+
func (status *OperationStatus) MarkJobAsDeleted(jobType string, useFailed bool) bool {
299+
now := metav1.Now()
300+
message := "The underlying " + jobType + " was deleted externally"
301+
302+
var updated bool
303+
if useFailed {
304+
updated = status.LogFailed("JobDeleted", message)
305+
} else {
306+
updated = status.LogStopped("JobDeleted", message)
307+
}
308+
309+
if updated {
310+
status.CompletionTime = &now
311+
}
312+
313+
return updated
181314
}

0 commit comments

Comments
 (0)