Skip to content

Commit 44d96b3

Browse files
authored
first available implementation for pytorch-operator-extension (#4)
* download pytorch job apis from kubeflow * use codegen tool to re-generate clientset for pytorch job * add main function and other functions * modify const define and add nil judgment for qu.Spec.ConsumerRef
1 parent 9e0abca commit 44d96b3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+3428
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,5 @@
1313

1414
# Dependency directories (remove the comment below to include it)
1515
# vendor/
16+
17+
*.idea

Dockerfile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM golang:1.15.3-alpine3.12 as build
2+
RUN apk add make git
3+
ADD . /go/src/github.com/kube-queue/pytorch-operator-extension
4+
5+
WORKDIR /go/src/github.com/kube-queue/pytorch-operator-extension
6+
RUN make
7+
8+
FROM alpine:3.12
9+
COPY --from=build /go/src/github.com/kube-queue/pytorch-operator-extension/bin/pytorch-operator-extension /usr/bin/pytorch-operator-extension
10+
RUN chmod +x /usr/bin/pytorch-operator-extension
11+
ENTRYPOINT ["/usr/bin/pytorch-operator-extension"]

Makefile

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
COMMONENVVAR=GOOS=$(shell uname -s | tr A-Z a-z) GOARCH=$(subst x86_64,amd64,$(patsubst i%86,386,$(shell uname -m)))
2+
BUILDENVVAR=CGO_ENABLED=0
3+
4+
.PHONY: all
5+
all: build
6+
7+
.PHONY: build
8+
build: build-extension
9+
10+
.PHONY: build-extension
11+
build-extension: fixcodec
12+
$(COMMONENVVAR) $(BUILDENVVAR) go build -mod vendor -ldflags '-w' -o bin/pytorch-operator-extension cmd/main.go
13+
14+
.PHONY: fixcodec
15+
hack/fix-codec-factory.sh

cmd/app/options/options.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package options
2+
3+
import "flag"
4+
5+
type ServerOption struct {
6+
KubeConfig string
7+
}
8+
9+
func NewServerOption() *ServerOption {
10+
s := ServerOption{}
11+
return &s
12+
}
13+
14+
func (s *ServerOption) AddFlags(fs *flag.FlagSet) {
15+
fs.StringVar(&s.KubeConfig, "kubeconfig", "", "The path of kubeconfig file")
16+
}

cmd/app/server.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package app
2+
3+
import (
4+
v1 "github.com/kube-queue/pytorch-operator-extension/pkg/pytorch-operator/apis/pytorch/v1"
5+
"k8s.io/client-go/rest"
6+
"k8s.io/client-go/tools/cache"
7+
"k8s.io/client-go/tools/clientcmd"
8+
"k8s.io/sample-controller/pkg/signals"
9+
10+
"github.com/kube-queue/api/pkg/apis/scheduling/v1alpha1"
11+
queueversioned "github.com/kube-queue/api/pkg/client/clientset/versioned"
12+
queueinformers "github.com/kube-queue/api/pkg/client/informers/externalversions"
13+
"github.com/kube-queue/pytorch-operator-extension/cmd/app/options"
14+
"github.com/kube-queue/pytorch-operator-extension/pkg/contorller"
15+
pytorchjobversioned "github.com/kube-queue/pytorch-operator-extension/pkg/pytorch-operator/client/clientset/versioned"
16+
pytorchjobinformers "github.com/kube-queue/pytorch-operator-extension/pkg/pytorch-operator/client/informers/externalversions"
17+
"k8s.io/klog/v2"
18+
)
19+
20+
const (
21+
ConsumerRefKind = v1.Kind
22+
ConsumerRefAPIVersion = v1.GroupName + "/" + v1.GroupVersion
23+
)
24+
25+
// Run runs the server.
26+
func Run(opt *options.ServerOption) error {
27+
var restConfig *rest.Config
28+
var err error
29+
// Set up signals so we handle the first shutdown signal gracefully.
30+
stopCh := signals.SetupSignalHandler()
31+
32+
if restConfig, err = rest.InClusterConfig(); err != nil {
33+
if restConfig, err = clientcmd.BuildConfigFromFlags("", opt.KubeConfig); err != nil {
34+
return err
35+
}
36+
}
37+
38+
queueClient, err := queueversioned.NewForConfig(restConfig)
39+
if err != nil {
40+
return err
41+
}
42+
43+
pytorchJobClient, err := pytorchjobversioned.NewForConfig(restConfig)
44+
if err != nil {
45+
return err
46+
}
47+
48+
queueInformerFactory := queueinformers.NewSharedInformerFactory(queueClient, 0)
49+
queueInformer := queueInformerFactory.Scheduling().V1alpha1().QueueUnits().Informer()
50+
pytorchJobInformerFactory := pytorchjobinformers.NewSharedInformerFactory(pytorchJobClient, 0)
51+
pytorchJobInformer := pytorchJobInformerFactory.Kubeflow().V1().PyTorchJobs().Informer()
52+
53+
pytorchExtensionController := contorller.NewPyTorchExtensionController(queueInformerFactory.Scheduling().V1alpha1().QueueUnits(),
54+
queueClient,
55+
pytorchJobInformerFactory.Kubeflow().V1().PyTorchJobs(),
56+
pytorchJobClient,
57+
)
58+
59+
queueInformer.AddEventHandler(
60+
cache.FilteringResourceEventHandler{
61+
FilterFunc: func(obj interface{}) bool {
62+
switch qu := obj.(type) {
63+
case *v1alpha1.QueueUnit:
64+
if qu.Spec.ConsumerRef != nil &&
65+
qu.Spec.ConsumerRef.Kind == ConsumerRefKind &&
66+
qu.Spec.ConsumerRef.APIVersion == ConsumerRefAPIVersion {
67+
return true
68+
}
69+
return false
70+
default:
71+
return false
72+
}
73+
},
74+
Handler: cache.ResourceEventHandlerFuncs{
75+
AddFunc: pytorchExtensionController.AddQueueUnit,
76+
UpdateFunc: pytorchExtensionController.UpdateQueueUnit,
77+
DeleteFunc: pytorchExtensionController.DeleteQueueUnit,
78+
},
79+
},
80+
)
81+
82+
pytorchJobInformer.AddEventHandler(
83+
cache.FilteringResourceEventHandler{
84+
FilterFunc: func(obj interface{}) bool {
85+
switch obj.(type) {
86+
case *v1.PyTorchJob:
87+
return true
88+
default:
89+
return false
90+
}
91+
},
92+
Handler: cache.ResourceEventHandlerFuncs{
93+
AddFunc: pytorchExtensionController.AddPytorchJob,
94+
UpdateFunc: pytorchExtensionController.UpdatePytorchJob,
95+
DeleteFunc: pytorchExtensionController.DeletePytorchJob,
96+
},
97+
},
98+
)
99+
100+
// start queueunit informer
101+
go queueInformerFactory.Start(stopCh)
102+
// start pytorchjob informer
103+
go pytorchJobInformerFactory.Start(stopCh)
104+
105+
err = pytorchExtensionController.Run(2, stopCh)
106+
if err != nil {
107+
klog.Fatalf("Error running pytorchExtensionController", err.Error())
108+
return err
109+
}
110+
111+
return nil
112+
}

cmd/main.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
6+
"github.com/kube-queue/pytorch-operator-extension/cmd/app"
7+
"github.com/kube-queue/pytorch-operator-extension/cmd/app/options"
8+
"k8s.io/klog/v2"
9+
)
10+
11+
func main() {
12+
s := options.NewServerOption()
13+
s.AddFlags(flag.CommandLine)
14+
flag.Parse()
15+
16+
if err := app.Run(s); err != nil {
17+
klog.Fatalf("Failed to run: %v", err)
18+
}
19+
}

go.mod

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
module github.com/kube-queue/pytorch-operator-extension
2+
3+
go 1.15
4+
5+
require (
6+
github.com/kube-queue/api v0.0.0-20210623033849-bffe1acb5aa9
7+
k8s.io/api v0.21.2
8+
k8s.io/apimachinery v0.21.2
9+
k8s.io/client-go v0.21.2
10+
k8s.io/code-generator v0.21.2
11+
k8s.io/klog/v2 v2.8.0
12+
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7
13+
k8s.io/sample-controller v0.21.2
14+
)
15+
16+
replace (
17+
k8s.io/api => k8s.io/api v0.18.5
18+
k8s.io/apimachinery => k8s.io/apimachinery v0.18.5
19+
k8s.io/client-go => k8s.io/client-go v0.18.5
20+
k8s.io/code-generator => k8s.io/code-generator v0.18.5
21+
k8s.io/sample-controller => k8s.io/sample-controller v0.18.5
22+
)

0 commit comments

Comments
 (0)