Skip to content

Commit d0a4952

Browse files
authored
combine the sender and receiver to one client (#282)
Signed-off-by: Wei Liu <[email protected]>
1 parent ad02e82 commit d0a4952

File tree

9 files changed

+31
-77
lines changed

9 files changed

+31
-77
lines changed

cloudevents/generic/agentclient.go

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ import (
2222
// to the source.
2323
type CloudEventAgentClient[T ResourceObject] struct {
2424
cloudEventsOptions options.CloudEventsOptions
25-
sender cloudevents.Client
26-
receiver cloudevents.Client
25+
cloudEventsClient cloudevents.Client
2726
lister Lister[T]
2827
codecs map[types.CloudEventsDataType]Codec[T]
2928
statusHashGetter StatusHashGetter[T]
@@ -46,12 +45,7 @@ func NewCloudEventAgentClient[T ResourceObject](
4645
statusHashGetter StatusHashGetter[T],
4746
codecs ...Codec[T],
4847
) (*CloudEventAgentClient[T], error) {
49-
sender, err := agentOptions.CloudEventsOptions.Sender(ctx)
50-
if err != nil {
51-
return nil, err
52-
}
53-
54-
receiver, err := agentOptions.CloudEventsOptions.Receiver(ctx)
48+
cloudEventsClient, err := agentOptions.CloudEventsOptions.Client(ctx)
5549
if err != nil {
5650
return nil, err
5751
}
@@ -63,8 +57,7 @@ func NewCloudEventAgentClient[T ResourceObject](
6357

6458
return &CloudEventAgentClient[T]{
6559
cloudEventsOptions: agentOptions.CloudEventsOptions,
66-
sender: sender,
67-
receiver: receiver,
60+
cloudEventsClient: cloudEventsClient,
6861
lister: lister,
6962
codecs: evtCodes,
7063
statusHashGetter: statusHashGetter,
@@ -113,12 +106,11 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context) error {
113106
return err
114107
}
115108

116-
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
109+
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil {
117110
return err
118111
}
119-
120-
klog.V(4).Infof("Sent resync request:\n%s", evt)
121112
}
113+
122114
return nil
123115
}
124116

@@ -143,19 +135,18 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.
143135
return err
144136
}
145137

146-
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, *evt); err != nil {
138+
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, *evt); err != nil {
147139
return err
148140
}
149141

150-
klog.V(4).Infof("Sent event:\n%s", evt)
151142
return nil
152143
}
153144

154145
// Subscribe the events that are from the source status resync request or source resource spec request.
155146
// For status resync request, agent publish the current resources status back as response.
156147
// For resource spec request, agent receives resource spec and handles the spec with resource handlers.
157148
func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error {
158-
return c.receiver.StartReceiver(ctx, func(evt cloudevents.Event) {
149+
return c.cloudEventsClient.StartReceiver(ctx, func(evt cloudevents.Event) {
159150
klog.V(4).Infof("Received event:\n%s", evt)
160151

161152
eventType, err := types.ParseCloudEventsType(evt.Type())
@@ -311,6 +302,8 @@ func sendEventWithLimit(sendingCtx context.Context, limiter flowcontrol.RateLimi
311302
latency, evt))
312303
}
313304

305+
klog.V(4).Infof("Sent event: %v\n%s", sendingCtx, evt)
306+
314307
if result := sender.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) {
315308
return fmt.Errorf("failed to send event %s, %v", evt, result)
316309
}

cloudevents/generic/options/fake/fakeoptions.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,7 @@ func (o *CloudEventsFakeOptions) WithContext(ctx context.Context, evtCtx cloudev
3434
return ctx, nil
3535
}
3636

37-
func (o *CloudEventsFakeOptions) Sender(ctx context.Context) (cloudevents.Client, error) {
38-
return o.client, nil
39-
}
40-
41-
func (o *CloudEventsFakeOptions) Receiver(ctx context.Context) (cloudevents.Client, error) {
37+
func (o *CloudEventsFakeOptions) Client(ctx context.Context) (cloudevents.Client, error) {
4238
return o.client, nil
4339
}
4440

cloudevents/generic/options/mqtt/agentoptions.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,11 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E
5555
return cloudeventscontext.WithTopic(ctx, statusTopic), nil
5656
}
5757

58-
func (o *mqttAgentOptions) Sender(ctx context.Context) (cloudevents.Client, error) {
59-
sender, err := o.GetCloudEventsClient(
60-
ctx,
61-
fmt.Sprintf("%s-pub-client", o.agentID),
62-
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
63-
)
64-
if err != nil {
65-
return nil, err
66-
}
67-
return sender, nil
68-
}
69-
70-
func (o *mqttAgentOptions) Receiver(ctx context.Context) (cloudevents.Client, error) {
58+
func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, error) {
7159
receiver, err := o.GetCloudEventsClient(
7260
ctx,
73-
fmt.Sprintf("%s-sub-client", o.agentID),
61+
fmt.Sprintf("%s-client", o.agentID),
62+
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
7463
cloudeventsmqtt.WithSubscribe(
7564
&paho.Subscribe{
7665
Subscriptions: map[string]paho.SubscribeOptions{

cloudevents/generic/options/mqtt/options.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,7 @@ func (o *MQTTOptions) GetMQTTConnectOption(clientID string) *paho.Connect {
124124
func (o *MQTTOptions) GetCloudEventsClient(
125125
ctx context.Context,
126126
clientID string,
127-
clientOpt cloudeventsmqtt.Option,
128-
) (cloudevents.Client, error) {
127+
clientOpts ...cloudeventsmqtt.Option) (cloudevents.Client, error) {
129128
netConn, err := o.GetNetConn()
130129
if err != nil {
131130
return nil, err
@@ -136,8 +135,9 @@ func (o *MQTTOptions) GetCloudEventsClient(
136135
Conn: netConn,
137136
}
138137

139-
connectOpt := cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID))
140-
protocol, err := cloudeventsmqtt.New(ctx, config, connectOpt, clientOpt)
138+
opts := []cloudeventsmqtt.Option{cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID))}
139+
opts = append(opts, clientOpts...)
140+
protocol, err := cloudeventsmqtt.New(ctx, config, opts...)
141141
if err != nil {
142142
return nil, err
143143
}

cloudevents/generic/options/mqtt/sourceoptions.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,22 +51,11 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.
5151
return cloudeventscontext.WithTopic(ctx, specTopic), nil
5252
}
5353

54-
func (o *mqttSourceOptions) Sender(ctx context.Context) (cloudevents.Client, error) {
55-
sender, err := o.GetCloudEventsClient(
56-
ctx,
57-
fmt.Sprintf("%s-pub-client", o.sourceID),
58-
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
59-
)
60-
if err != nil {
61-
return nil, err
62-
}
63-
return sender, nil
64-
}
65-
66-
func (o *mqttSourceOptions) Receiver(ctx context.Context) (cloudevents.Client, error) {
54+
func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, error) {
6755
receiver, err := o.GetCloudEventsClient(
6856
ctx,
69-
fmt.Sprintf("%s-sub-client", o.sourceID),
57+
fmt.Sprintf("%s-client", o.sourceID),
58+
cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}),
7059
cloudeventsmqtt.WithSubscribe(
7160
&paho.Subscribe{
7261
Subscriptions: map[string]paho.SubscribeOptions{

cloudevents/generic/options/options.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,8 @@ type CloudEventsOptions interface {
1616
// the MQTT topic, for Kafka, the context should contain the message key, etc.
1717
WithContext(ctx context.Context, evtContext cloudevents.EventContext) (context.Context, error)
1818

19-
// Sender returns a cloudevents client for sending the cloudevents
20-
Sender(ctx context.Context) (cloudevents.Client, error)
21-
22-
// Receiver returns a cloudevents client for receiving the cloudevents
23-
Receiver(ctx context.Context) (cloudevents.Client, error)
19+
// Client returns a cloudevents client for sending and receiving cloudevents
20+
Client(ctx context.Context) (cloudevents.Client, error)
2421
}
2522

2623
// EventRateLimit for limiting the event sending rate.

cloudevents/generic/sourceclient.go

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ import (
2222
// handling resource requests.
2323
type CloudEventSourceClient[T ResourceObject] struct {
2424
cloudEventsOptions options.CloudEventsOptions
25-
sender cloudevents.Client
26-
receiver cloudevents.Client
25+
cloudEventsClient cloudevents.Client
2726
lister Lister[T]
2827
codecs map[types.CloudEventsDataType]Codec[T]
2928
statusHashGetter StatusHashGetter[T]
@@ -45,12 +44,7 @@ func NewCloudEventSourceClient[T ResourceObject](
4544
statusHashGetter StatusHashGetter[T],
4645
codecs ...Codec[T],
4746
) (*CloudEventSourceClient[T], error) {
48-
sender, err := sourceOptions.CloudEventsOptions.Sender(ctx)
49-
if err != nil {
50-
return nil, err
51-
}
52-
53-
receiver, err := sourceOptions.CloudEventsOptions.Receiver(ctx)
47+
cloudEventsClient, err := sourceOptions.CloudEventsOptions.Client(ctx)
5448
if err != nil {
5549
return nil, err
5650
}
@@ -62,8 +56,7 @@ func NewCloudEventSourceClient[T ResourceObject](
6256

6357
return &CloudEventSourceClient[T]{
6458
cloudEventsOptions: sourceOptions.CloudEventsOptions,
65-
sender: sender,
66-
receiver: receiver,
59+
cloudEventsClient: cloudEventsClient,
6760
lister: lister,
6861
codecs: evtCodes,
6962
statusHashGetter: statusHashGetter,
@@ -111,12 +104,11 @@ func (c *CloudEventSourceClient[T]) Resync(ctx context.Context) error {
111104
return err
112105
}
113106

114-
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
107+
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil {
115108
return err
116109
}
117-
118-
klog.V(4).Infof("Sent resync request:\n%s", evt)
119110
}
111+
120112
return nil
121113
}
122114

@@ -141,19 +133,18 @@ func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types
141133
return err
142134
}
143135

144-
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, *evt); err != nil {
136+
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, *evt); err != nil {
145137
return err
146138
}
147139

148-
klog.V(4).Infof("Sent event:\n%s", evt)
149140
return nil
150141
}
151142

152143
// Subscribe the events that are from the agent spec resync request or agent resource status request.
153144
// For spec resync request, source publish the current resources spec back as response.
154145
// For resource status request, source receives resource status and handles the status with resource handlers.
155146
func (c *CloudEventSourceClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) error {
156-
if err := c.receiver.StartReceiver(ctx, func(evt cloudevents.Event) {
147+
if err := c.cloudEventsClient.StartReceiver(ctx, func(evt cloudevents.Event) {
157148
klog.V(4).Infof("Received event:\n%s", evt)
158149

159150
eventType, err := types.ParseCloudEventsType(evt.Type())
@@ -286,7 +277,7 @@ func (c *CloudEventSourceClient[T]) respondResyncSpecRequest(
286277
return err
287278
}
288279

289-
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
280+
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.cloudEventsClient, evt); err != nil {
290281
return err
291282
}
292283
}

cloudevents/work/agent/client/manifestwork.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (c *ManifestWorkAgentClient) Get(ctx context.Context, name string, opts met
7979

8080
func (c *ManifestWorkAgentClient) List(ctx context.Context, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) {
8181
klog.V(4).Infof("sync manifestworks")
82-
// send resync request to fetch manifestworks from source when the ManifestWorkInformer status
82+
// send resync request to fetch manifestworks from source when the ManifestWorkInformer starts
8383
if err := c.cloudEventsClient.Resync(ctx); err != nil {
8484
return nil, err
8585
}

cloudevents/work/clientbuilder.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,6 @@ func (b *ClientHolderBuilder) newAgentClients(ctx context.Context, config *mqtt.
147147
go func() {
148148
err := cloudEventsClient.Subscribe(ctx, agenthandler.NewManifestWorkAgentHandler(namespacedLister, watcher))
149149
if err != nil {
150-
// TODO (skeeey) consider how to retry to connect the broker again
151150
klog.Errorf("failed to subscribe to %s, %v", config.BrokerHost, err)
152151
}
153152
}()

0 commit comments

Comments
 (0)