|
| 1 | +# Cloudevents Clients |
| 2 | + |
| 3 | +We have implemented the [cloudevents](https://cloudevents.io/)-based clients in this package to assist developers in |
| 4 | +easily implementing the [Event Based Manifestwork](https://github.com/open-cluster-management-io/enhancements/tree/main/enhancements/sig-architecture/224-event-based-manifestwork) |
| 5 | +proposal. |
| 6 | + |
| 7 | +## Generic Clients |
| 8 | + |
| 9 | +The generic client (`generic.CloudEventsClient`) is used to resync/publish/subscribe resource objects between sources |
| 10 | +and agents with cloudevents. |
| 11 | + |
| 12 | +A resource object can be any object that implements the `generic.ResourceObject` interface. |
| 13 | + |
| 14 | +### Building a generic client on a source |
| 15 | + |
| 16 | +Developers can use `generic.NewCloudEventSourceClient` method to build a generic client on the source. To build this |
| 17 | +client the developers need to provide |
| 18 | + |
| 19 | +1. A cloudevents source options (`options.CloudEventsSourceOptions`), this options have two parts |
| 20 | + - `sourceID`, it is a unique identifier for a source, for example, it can generate a source ID by hashing the hub |
| 21 | + cluster URL and appending the controller name. Similarly, a RESTful service can select a unique name or generate a |
| 22 | + unique ID in the associated database for its source identification. |
| 23 | + - `CloudEventsOptions`, it provides cloudevents clients to send/receive cloudevents based on different event |
| 24 | + protocol. We have supported the MQTT protocol (`mqtt.NewSourceOptions`), developers can use it directly. |
| 25 | + |
| 26 | +2. A resource lister (`generic.Lister`), it is used to list the resource objects on the source when resyncing the |
| 27 | +resources between sources and agents, for example, a hub controller can list the resources from the resource informers, |
| 28 | +and a RESTful service can list its resources from a database. |
| 29 | + |
| 30 | +3. A resource status hash getter method (`generic.StatusHashGetter`), this method will be used to calculate the resource |
| 31 | +status hash when resyncing the resource status between sources and agents. |
| 32 | + |
| 33 | +4. Codecs (`generic.Codec`), they are used to encode a resource object into a cloudevent and decode a cloudevent into a |
| 34 | +resource object with a given cloudevent data type. We have provided two data types (`io.open-cluster-management.works.v1alpha1.manifests` |
| 35 | +that contains a single resource object in the cloudevent payload and `io.open-cluster-management.works.v1alpha1.manifestbundles` |
| 36 | +that contains a list of resource objects in the cloudevent payload) for `ManifestWork`, they can be found in the `work/payload` |
| 37 | +package. |
| 38 | + |
| 39 | +5. Resource handler methods (`generic.ResourceHandler`), they are used to handle the resources status after the client |
| 40 | +received the resources status from agents. |
| 41 | + |
| 42 | +for example: |
| 43 | + |
| 44 | +```golang |
| 45 | +// build a client for the source1 |
| 46 | +client, err := generic.NewCloudEventSourceClient[*CustomerResource]( |
| 47 | + ctx, |
| 48 | + mqtt.NewSourceOptions(mqtt.NewMQTTOptions(), "source1"), |
| 49 | + customerResourceLister, |
| 50 | + customerResourceStatusHashGetter, |
| 51 | + customerResourceCodec, |
| 52 | + ) |
| 53 | + |
| 54 | +// start a go routine to receive the resources status from agents |
| 55 | +go func() { |
| 56 | + if err := client.Subscribe(ctx, customerResourceHandler); err != nil { |
| 57 | + //TODO handle this error when subscribing the cloudevents failed |
| 58 | + } |
| 59 | +}() |
| 60 | +``` |
| 61 | + |
| 62 | +You may refer to the [cloudevents client integration test](../test/integration/cloudevents/source) as an example. |
| 63 | + |
| 64 | +### Building a generic client on a manged cluster |
| 65 | + |
| 66 | +Developers can use `generic.NewCloudEventAgentClient` method to build a generic client on a managed cluster. To build |
| 67 | +this client the developers need to provide |
| 68 | + |
| 69 | +1. A cloudevents agent options (`options.CloudEventsAgentOptions`), this options have three parts |
| 70 | + - `agentID`, it is a unique identifier for an agent, for example, it can consist of a managed cluster name and an |
| 71 | + agent name. |
| 72 | + - `clusterName`, it is the name of a managed cluster on which the agent runs. |
| 73 | + - `CloudEventsOptions`, it provides cloudevents clients to send/receive cloudevents based on different event |
| 74 | + protocol. We have supported the MQTT protocol (`mqtt.NewAgentOptions`), developers can use it directly. |
| 75 | + |
| 76 | +2. A resource lister (`generic.Lister`), it is used to list the resource objects on a managed cluster when resyncing the |
| 77 | +resources between sources and agents, for example, a work agent can list its works from its work informers. |
| 78 | + |
| 79 | +3. A resource status hash getter method (`generic.StatusHashGetter`), this method will be used to calculate the resource |
| 80 | +status hash when resyncing the resource status between sources and agents. |
| 81 | + |
| 82 | +4. Codecs (`generic.Codec`), they are used to encode a resource object into a cloudevent and decode a cloudevent into a |
| 83 | +resource object with a given cloudevent data type. We have provided two data types (`io.open-cluster-management.works.v1alpha1.manifests` |
| 84 | +that contains a single resource object in the cloudevent payload and `io.open-cluster-management.works.v1alpha1.manifestbundles` |
| 85 | +that contains a list of resource objects in the cloudevent payload) for `ManifestWork`, they can be found in the `work/payload` |
| 86 | +package. |
| 87 | + |
| 88 | +5. Resource handler methods (`generic.ResourceHandler`), they are used to handle the resources after the client received |
| 89 | +the resources from sources. |
| 90 | + |
| 91 | +for example: |
| 92 | + |
| 93 | +```golang |
| 94 | +// build a client for a work agent on the cluster1 |
| 95 | +client, err := generic.NewCloudEventAgentClient[*CustomerResource]( |
| 96 | + ctx, |
| 97 | + mqtt.NewAgentOptions(mqtt.NewMQTTOptions(), "cluster1", "cluster1-work-agent"), |
| 98 | + &ManifestWorkLister{}, |
| 99 | + ManifestWorkStatusHash, |
| 100 | + &ManifestBundleCodec{}, |
| 101 | + ) |
| 102 | + |
| 103 | +// start a go routine to receive the resources from sources |
| 104 | +go func() { |
| 105 | + if err := client.Subscribe(ctx, NewManifestWorkAgentHandler()); err != nil { |
| 106 | + //TODO handle this error when subscribing the cloudevents failed |
| 107 | + } |
| 108 | +}() |
| 109 | +``` |
| 110 | + |
| 111 | +## Work Clients |
| 112 | + |
| 113 | +We have provided a builder to build the `ManifestWork` client (`ManifestWorkInterface`) and informer (`ManifestWorkInformer`) |
| 114 | +based on the generic client. |
| 115 | + |
| 116 | +### Building work client for work controllers on the hub cluster |
| 117 | + |
| 118 | +TODO |
| 119 | + |
| 120 | +### Building work client for work agent on the managed cluster |
| 121 | + |
| 122 | +Developers can use the builder to build the `ManifestWork` client and informer with the cluster name. |
| 123 | + |
| 124 | +```golang |
| 125 | + |
| 126 | +clusterName := "cluster1" |
| 127 | +// Building the clients based on cloudevents with MQTT |
| 128 | +config := mqtt.NewMQTTOptions() |
| 129 | + |
| 130 | +clientHolder, err := work.NewClientHolderBuilder(fmt.Sprintf("%s-work-agent", clusterName), config). |
| 131 | + WithClusterName(clusterName). |
| 132 | + // Supports two event data types for ManifestWork |
| 133 | + WithCodecs(codec.NewManifestBundleCodec(), codec.NewManifestCodec(restMapper)). |
| 134 | + NewClientHolder(ctx) |
| 135 | +if err != nil { |
| 136 | + return err |
| 137 | +} |
| 138 | + |
| 139 | +manifestWorkClient := clientHolder.ManifestWorks(clusterName) |
| 140 | +manifestWorkInformer := clientHolder.ManifestWorkInformer() |
| 141 | + |
| 142 | +// Building controllers with ManifestWork client and informer ... |
| 143 | + |
| 144 | +// Start the ManifestWork informer |
| 145 | +go manifestWorkInformer.Informer().Run(ctx.Done()) |
| 146 | + |
| 147 | +``` |
0 commit comments