Skip to content

Commit 1b45f76

Browse files
authored
add rate limit for cloudevents clients (#277)
Signed-off-by: Wei Liu <[email protected]>
1 parent 886b462 commit 1b45f76

File tree

7 files changed

+105
-23
lines changed

7 files changed

+105
-23
lines changed

cloudevents/generic/agentclient.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"context"
55
"fmt"
66
"strconv"
7+
"time"
78

89
cloudevents "github.com/cloudevents/sdk-go/v2"
910

11+
"k8s.io/client-go/util/flowcontrol"
1012
"k8s.io/klog/v2"
1113

1214
"open-cluster-management.io/api/cloudevents/generic/options"
@@ -18,15 +20,14 @@ import (
1820
//
1921
// An agent is a component that handles the deployment of requested resources on the managed cluster and status report
2022
// to the source.
21-
//
22-
// TODO support limiting the message sending rate with a configuration.
2323
type CloudEventAgentClient[T ResourceObject] struct {
2424
cloudEventsOptions options.CloudEventsOptions
2525
sender cloudevents.Client
2626
receiver cloudevents.Client
2727
lister Lister[T]
2828
codecs map[types.CloudEventsDataType]Codec[T]
2929
statusHashGetter StatusHashGetter[T]
30+
rateLimiter flowcontrol.RateLimiter
3031
agentID string
3132
clusterName string
3233
}
@@ -67,6 +68,7 @@ func NewCloudEventAgentClient[T ResourceObject](
6768
lister: lister,
6869
codecs: evtCodes,
6970
statusHashGetter: statusHashGetter,
71+
rateLimiter: NewRateLimiter(agentOptions.EventRateLimit),
7072
agentID: agentOptions.AgentID,
7173
clusterName: agentOptions.ClusterName,
7274
}, nil
@@ -111,8 +113,8 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context) error {
111113
return err
112114
}
113115

114-
if result := c.sender.Send(sendingContext, evt); cloudevents.IsUndelivered(result) {
115-
return fmt.Errorf("failed to send event %s, %v", evt, result)
116+
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
117+
return err
116118
}
117119

118120
klog.V(4).Infof("Sent resync request:\n%s", evt)
@@ -141,8 +143,8 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.
141143
return err
142144
}
143145

144-
if result := c.sender.Send(sendingContext, *evt); cloudevents.IsUndelivered(result) {
145-
return fmt.Errorf("failed to send event %s, %v", evt, result)
146+
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, *evt); err != nil {
147+
return err
146148
}
147149

148150
klog.V(4).Infof("Sent event:\n%s", evt)
@@ -294,6 +296,28 @@ func (c *CloudEventAgentClient[T]) specAction(source string, obj T) (evt types.R
294296
return types.Modified, nil
295297
}
296298

299+
func sendEventWithLimit(sendingCtx context.Context, limiter flowcontrol.RateLimiter,
300+
sender cloudevents.Client, evt cloudevents.Event) error {
301+
now := time.Now()
302+
303+
err := limiter.Wait(sendingCtx)
304+
if err != nil {
305+
return fmt.Errorf("client rate limiter Wait returned an error: %w", err)
306+
}
307+
308+
latency := time.Since(now)
309+
if latency > longThrottleLatency {
310+
klog.Warningf(fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s",
311+
latency, evt))
312+
}
313+
314+
if result := sender.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) {
315+
return fmt.Errorf("failed to send event %s, %v", evt, result)
316+
}
317+
318+
return nil
319+
}
320+
297321
func getObj[T ResourceObject](resourceID string, objs []T) (obj T, exists bool) {
298322
for _, obj := range objs {
299323
if string(obj.GetUID()) == resourceID {

cloudevents/generic/options/options.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,17 @@ type CloudEventsOptions interface {
2323
Receiver(ctx context.Context) (cloudevents.Client, error)
2424
}
2525

26+
// EventRateLimit for limiting the event sending rate.
27+
type EventRateLimit struct {
28+
// QPS indicates the maximum QPS to send the event.
29+
// If it's less than or equal to zero, the DefaultQPS (50) will be used.
30+
QPS float32
31+
32+
// Maximum burst for throttle.
33+
// If it's less than or equal to zero, the DefaultBurst (100) will be used.
34+
Burst int
35+
}
36+
2637
// CloudEventsSourceOptions provides the required options to build a source CloudEventsClient
2738
type CloudEventsSourceOptions struct {
2839
// CloudEventsOptions provides cloudevents clients to send/receive cloudevents based on different event protocol.
@@ -32,6 +43,9 @@ type CloudEventsSourceOptions struct {
3243
// URL and appending the controller name. Similarly, a RESTful service can select a unique name or generate a unique
3344
// ID in the associated database for its source identification.
3445
SourceID string
46+
47+
// EventRateLimit limits the event sending rate.
48+
EventRateLimit EventRateLimit
3549
}
3650

3751
// CloudEventsAgentOptions provides the required options to build an agent CloudEventsClient
@@ -45,4 +59,7 @@ type CloudEventsAgentOptions struct {
4559

4660
// ClusterName is the name of a managed cluster on which the agent runs.
4761
ClusterName string
62+
63+
// EventRateLimit limits the event sending rate.
64+
EventRateLimit EventRateLimit
4865
}

cloudevents/generic/ratelimiter.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package generic
2+
3+
import (
4+
"time"
5+
6+
"k8s.io/client-go/util/flowcontrol"
7+
8+
"open-cluster-management.io/api/cloudevents/generic/options"
9+
)
10+
11+
// longThrottleLatency defines threshold for logging requests. All requests being
12+
// throttled (via the provided rateLimiter) for more than longThrottleLatency will
13+
// be logged.
14+
const longThrottleLatency = 1 * time.Second
15+
16+
const (
17+
// TODO we may adjust these after performance test
18+
DefaultQPS float32 = 50.0
19+
DefaultBurst int = 100
20+
)
21+
22+
func NewRateLimiter(limit options.EventRateLimit) flowcontrol.RateLimiter {
23+
qps := limit.QPS
24+
if qps <= 0.0 {
25+
qps = DefaultQPS
26+
}
27+
28+
burst := limit.Burst
29+
if burst <= 0 {
30+
burst = DefaultBurst
31+
}
32+
33+
return flowcontrol.NewTokenBucketRateLimiter(qps, burst)
34+
}

cloudevents/generic/sourceclient.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
cloudevents "github.com/cloudevents/sdk-go/v2"
99

1010
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/client-go/util/flowcontrol"
1112
"k8s.io/klog/v2"
1213

1314
"open-cluster-management.io/api/cloudevents/generic/options"
@@ -19,15 +20,14 @@ import (
1920
//
2021
// A source is a component that runs on a server, it can be a controller on the hub cluster or a RESTful service
2122
// handling resource requests.
22-
//
23-
// TODO support limiting the message sending rate with a configuration.
2423
type CloudEventSourceClient[T ResourceObject] struct {
2524
cloudEventsOptions options.CloudEventsOptions
2625
sender cloudevents.Client
2726
receiver cloudevents.Client
2827
lister Lister[T]
2928
codecs map[types.CloudEventsDataType]Codec[T]
3029
statusHashGetter StatusHashGetter[T]
30+
rateLimiter flowcontrol.RateLimiter
3131
sourceID string
3232
}
3333

@@ -67,6 +67,7 @@ func NewCloudEventSourceClient[T ResourceObject](
6767
lister: lister,
6868
codecs: evtCodes,
6969
statusHashGetter: statusHashGetter,
70+
rateLimiter: NewRateLimiter(sourceOptions.EventRateLimit),
7071
sourceID: sourceOptions.SourceID,
7172
}, nil
7273
}
@@ -105,8 +106,13 @@ func (c *CloudEventSourceClient[T]) Resync(ctx context.Context) error {
105106
return fmt.Errorf("failed to set data to cloud event: %v", err)
106107
}
107108

108-
if result := c.sender.Send(ctx, evt); cloudevents.IsUndelivered(result) {
109-
return fmt.Errorf("failed to send: %v", result)
109+
sendingContext, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
110+
if err != nil {
111+
return err
112+
}
113+
114+
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
115+
return err
110116
}
111117

112118
klog.V(4).Infof("Sent resync request:\n%s", evt)
@@ -135,8 +141,8 @@ func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types
135141
return err
136142
}
137143

138-
if result := c.sender.Send(sendingContext, *evt); cloudevents.IsUndelivered(result) {
139-
return fmt.Errorf("failed to send event %s, %v", evt, result)
144+
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, *evt); err != nil {
145+
return err
140146
}
141147

142148
klog.V(4).Infof("Sent event:\n%s", evt)
@@ -280,8 +286,8 @@ func (c *CloudEventSourceClient[T]) respondResyncSpecRequest(
280286
return err
281287
}
282288

283-
if result := c.sender.Send(sendingContext, evt); cloudevents.IsUndelivered(result) {
284-
return fmt.Errorf("failed to send event %s, %v", evt, result)
289+
if err := sendEventWithLimit(sendingContext, c.rateLimiter, c.sender, evt); err != nil {
290+
return err
285291
}
286292
}
287293

test/integration-test.mk

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@ clean-integration-test:
2828

2929
clean: clean-integration-test
3030

31+
test-integration: test-api-integration test-cloudevents-integration
32+
.PHONY: test-integration
33+
3134
test-api-integration: ensure-kubebuilder-tools
3235
go test -c ./test/integration/api
3336
./api.test -ginkgo.slowSpecThreshold=15 -ginkgo.v -ginkgo.failFast
34-
.PHONY: test-integration
37+
.PHONY: test-api-integration
3538

3639
test-cloudevents-integration: ensure-kubebuilder-tools
3740
go test -c ./test/integration/cloudevents

test/integration/cloudevents/cloudevents_test.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,6 @@ import (
2323
)
2424

2525
var _ = ginkgo.Describe("Cloudevents clients test", func() {
26-
ginkgo.BeforeEach(func() {
27-
ginkgo.By("init resource source store", func() {
28-
source.GetStore().Add(source.NewResource("cluster1", "resource1"))
29-
source.GetStore().Add(source.NewResource("cluster2", "resource1"))
30-
})
31-
})
32-
3326
ginkgo.Context("Resync resources", func() {
3427
ginkgo.It("resync resources between source and agent", func() {
3528
ginkgo.By("start an agent on cluster1")
@@ -72,7 +65,7 @@ var _ = ginkgo.Describe("Cloudevents clients test", func() {
7265
return nil
7366
}, 10*time.Second, 1*time.Second).Should(gomega.Succeed())
7467

75-
// resync the status from source
68+
ginkgo.By("resync the status from source")
7669
err = sourceCloudEventsClient.Resync(context.TODO())
7770
gomega.Expect(err).ToNot(gomega.HaveOccurred())
7871

test/integration/cloudevents/suite_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ var _ = ginkgo.BeforeSuite(func(done ginkgo.Done) {
4343
gomega.Expect(err).ToNot(gomega.HaveOccurred())
4444
}()
4545

46+
ginkgo.By("init the resource source store")
47+
source.GetStore().Add(source.NewResource("cluster1", "resource1"))
48+
source.GetStore().Add(source.NewResource("cluster2", "resource1"))
49+
50+
ginkgo.By("start the resource source")
4651
mqttOptions = mqtt.NewMQTTOptions()
4752
mqttOptions.BrokerHost = mqttBrokerHost
4853
sourceCloudEventsClient, err = source.StartResourceSourceClient(context.TODO(), mqttOptions)

0 commit comments

Comments
 (0)