Skip to content

Commit cf1ead4

Browse files
authored
a client builder for manifestworks (#273)
Signed-off-by: Wei Liu <[email protected]>
1 parent 3f32850 commit cf1ead4

File tree

3 files changed

+220
-0
lines changed

3 files changed

+220
-0
lines changed

cloudevents/work/agent/client/manifestwork.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ var manifestWorkGR = schema.GroupResource{Group: workv1.GroupName, Resource: "ma
4141

4242
var _ workv1client.ManifestWorkInterface = &ManifestWorkAgentClient{}
4343

44+
func NewManifestWorkAgentClient(cloudEventsClient *generic.CloudEventAgentClient[*workv1.ManifestWork], watcher *watcher.ManifestWorkWatcher) *ManifestWorkAgentClient {
45+
return &ManifestWorkAgentClient{
46+
cloudEventsClient: cloudEventsClient,
47+
watcher: watcher,
48+
}
49+
}
50+
51+
func (c *ManifestWorkAgentClient) SetLister(lister workv1lister.ManifestWorkNamespaceLister) {
52+
c.lister = lister
53+
}
54+
4455
func (c *ManifestWorkAgentClient) Create(ctx context.Context, manifestWork *workv1.ManifestWork, opts metav1.CreateOptions) (*workv1.ManifestWork, error) {
4556
return nil, errors.NewMethodNotSupported(manifestWorkGR, "create")
4657
}

cloudevents/work/clientbuilder.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package work
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"k8s.io/client-go/rest"
9+
"k8s.io/klog/v2"
10+
11+
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
12+
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
13+
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
14+
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
15+
"open-cluster-management.io/api/cloudevents/generic"
16+
"open-cluster-management.io/api/cloudevents/generic/options/mqtt"
17+
agentclient "open-cluster-management.io/api/cloudevents/work/agent/client"
18+
agenthandler "open-cluster-management.io/api/cloudevents/work/agent/handler"
19+
"open-cluster-management.io/api/cloudevents/work/internal"
20+
"open-cluster-management.io/api/cloudevents/work/watcher"
21+
workv1 "open-cluster-management.io/api/work/v1"
22+
)
23+
24+
const defaultInformerResyncTime = 10 * time.Minute
25+
26+
// ClientHolder holds a manifestwork client that implements the ManifestWorkInterface based on different configuration
27+
// and a ManifestWorkInformer that is built with the manifestWork client.
28+
//
29+
// ClientHolder also implements the ManifestWorksGetter interface.
30+
type ClientHolder struct {
31+
workClient workv1client.WorkV1Interface
32+
manifestWorkInformer workv1informers.ManifestWorkInformer
33+
}
34+
35+
var _ workv1client.ManifestWorksGetter = &ClientHolder{}
36+
37+
// ManifestWorks returns a ManifestWorkInterface
38+
func (h *ClientHolder) ManifestWorks(namespace string) workv1client.ManifestWorkInterface {
39+
return h.workClient.ManifestWorks(namespace)
40+
}
41+
42+
// ManifestWorkInformer returns a ManifestWorkInformer
43+
func (h *ClientHolder) ManifestWorkInformer() workv1informers.ManifestWorkInformer {
44+
return h.manifestWorkInformer
45+
}
46+
47+
// ClientHolderBuilder builds the ClientHolder with different configuration.
48+
type ClientHolderBuilder struct {
49+
config any
50+
codecs []generic.Codec[*workv1.ManifestWork]
51+
informerOptions []workinformers.SharedInformerOption
52+
informerResyncTime time.Duration
53+
clusterName string
54+
clientID string
55+
}
56+
57+
// NewClientHolderBuilder returns a ClientHolderBuilder with a given configuration.
58+
//
59+
// Available configurations:
60+
// - Kubeconfig (*rest.Config): builds a manifestwork client with kubeconfig
61+
// - MQTTOptions (*mqtt.MQTTOptions): builds a manifestwork client based on cloudevents with MQTT
62+
func NewClientHolderBuilder(clientID string, config any) *ClientHolderBuilder {
63+
return &ClientHolderBuilder{
64+
clientID: clientID,
65+
config: config,
66+
informerResyncTime: defaultInformerResyncTime,
67+
}
68+
}
69+
70+
// WithClusterName set the managed cluster name when building a manifestwork client for an agent.
71+
func (b *ClientHolderBuilder) WithClusterName(clusterName string) *ClientHolderBuilder {
72+
b.clusterName = clusterName
73+
return b
74+
}
75+
76+
// WithCodecs add codecs when building a manifestwork client based on cloudevents.
77+
func (b *ClientHolderBuilder) WithCodecs(codecs ...generic.Codec[*workv1.ManifestWork]) *ClientHolderBuilder {
78+
b.codecs = codecs
79+
return b
80+
}
81+
82+
// WithInformerConfig set the ManifestWorkInformer configs. If the resync time is not set, the default time (10 minutes)
83+
// will be used when building the ManifestWorkInformer.
84+
func (b *ClientHolderBuilder) WithInformerConfig(
85+
resyncTime time.Duration, options ...workinformers.SharedInformerOption) *ClientHolderBuilder {
86+
b.informerResyncTime = resyncTime
87+
b.informerOptions = options
88+
return b
89+
}
90+
91+
// NewClientHolder returns a ClientHolder for works.
92+
func (b *ClientHolderBuilder) NewClientHolder(ctx context.Context) (*ClientHolder, error) {
93+
switch config := b.config.(type) {
94+
case *rest.Config:
95+
kubeWorkClientSet, err := workclientset.NewForConfig(config)
96+
if err != nil {
97+
return nil, err
98+
}
99+
100+
factory := workinformers.NewSharedInformerFactoryWithOptions(kubeWorkClientSet, b.informerResyncTime, b.informerOptions...)
101+
102+
return &ClientHolder{
103+
workClient: kubeWorkClientSet.WorkV1(),
104+
manifestWorkInformer: factory.Work().V1().ManifestWorks(),
105+
}, nil
106+
case *mqtt.MQTTOptions:
107+
if len(b.clusterName) != 0 {
108+
return b.newAgentClients(ctx, config)
109+
}
110+
111+
//TODO build manifestwork clients for source
112+
return nil, nil
113+
default:
114+
return nil, fmt.Errorf("unsupported client configuration type %T", config)
115+
}
116+
}
117+
118+
func (b *ClientHolderBuilder) newAgentClients(ctx context.Context, config *mqtt.MQTTOptions) (*ClientHolder, error) {
119+
workLister := &ManifestWorkLister{}
120+
watcher := watcher.NewManifestWorkWatcher()
121+
agentOptions := mqtt.NewAgentOptions(config, b.clusterName, b.clientID)
122+
cloudEventsClient, err := generic.NewCloudEventAgentClient[*workv1.ManifestWork](
123+
ctx,
124+
agentOptions,
125+
workLister,
126+
ManifestWorkStatusHash,
127+
b.codecs...,
128+
)
129+
if err != nil {
130+
return nil, err
131+
}
132+
133+
manifestWorkClient := agentclient.NewManifestWorkAgentClient(cloudEventsClient, watcher)
134+
workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient}
135+
workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient}
136+
factory := workinformers.NewSharedInformerFactoryWithOptions(workClientSet, b.informerResyncTime, b.informerOptions...)
137+
informers := factory.Work().V1().ManifestWorks()
138+
manifestWorkLister := informers.Lister()
139+
namespacedLister := manifestWorkLister.ManifestWorks(b.clusterName)
140+
141+
// Set informer lister back to work lister and client.
142+
workLister.Lister = manifestWorkLister
143+
// TODO the work client and informer share a same store in the current implementation, ideally, the store should be
144+
// only written from the server. we may need to revisit the implementation in the future.
145+
manifestWorkClient.SetLister(namespacedLister)
146+
147+
go func() {
148+
err := cloudEventsClient.Subscribe(ctx, agenthandler.NewManifestWorkAgentHandler(namespacedLister, watcher))
149+
if err != nil {
150+
// TODO (skeeey) consider how to retry to connect the broker again
151+
klog.Errorf("failed to subscribe to %s, %v", config.BrokerHost, err)
152+
}
153+
}()
154+
155+
return &ClientHolder{
156+
workClient: workClient,
157+
manifestWorkInformer: informers,
158+
}, nil
159+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package internal
2+
3+
import (
4+
discovery "k8s.io/client-go/discovery"
5+
"k8s.io/client-go/rest"
6+
7+
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
8+
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
9+
workv1alpha1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1alpha1"
10+
)
11+
12+
// WorkClientSetWrapper wraps a work client that has a manifestwork client to a work clientset interface, this wrapper
13+
// will helps us to build manifestwork informer factory easily.
14+
type WorkClientSetWrapper struct {
15+
WorkV1ClientWrapper *WorkV1ClientWrapper
16+
}
17+
18+
var _ workclientset.Interface = &WorkClientSetWrapper{}
19+
20+
func (c *WorkClientSetWrapper) WorkV1() workv1client.WorkV1Interface {
21+
return c.WorkV1ClientWrapper
22+
}
23+
24+
func (c *WorkClientSetWrapper) WorkV1alpha1() workv1alpha1client.WorkV1alpha1Interface {
25+
return nil
26+
}
27+
28+
func (c *WorkClientSetWrapper) Discovery() discovery.DiscoveryInterface {
29+
return nil
30+
}
31+
32+
// WorkV1ClientWrapper wraps a manifestwork client to a WorkV1Interface
33+
type WorkV1ClientWrapper struct {
34+
ManifestWorkClient workv1client.ManifestWorkInterface
35+
}
36+
37+
var _ workv1client.WorkV1Interface = &WorkV1ClientWrapper{}
38+
39+
func (c *WorkV1ClientWrapper) ManifestWorks(namespace string) workv1client.ManifestWorkInterface {
40+
// TODO if the ManifestWorkClient is ManifestWorkSourceClient, we need set namespace here
41+
return c.ManifestWorkClient
42+
}
43+
44+
func (c *WorkV1ClientWrapper) AppliedManifestWorks() workv1client.AppliedManifestWorkInterface {
45+
return nil
46+
}
47+
48+
func (c *WorkV1ClientWrapper) RESTClient() rest.Interface {
49+
return nil
50+
}

0 commit comments

Comments
 (0)