Skip to content

Commit 550ac68

Browse files
authored
add sequence id for status update event (#308)
Signed-off-by: Wei Liu <[email protected]>
1 parent ccd7a2d commit 550ac68

File tree

12 files changed

+582
-13
lines changed

12 files changed

+582
-13
lines changed

cloudevents/generic/types/types.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ const (
4848
// ExtensionResourceVersion is the cloud event extension key of the resource version.
4949
ExtensionResourceVersion = "resourceversion"
5050

51+
// ExtensionStatusUpdateSequenceID is the cloud event extension key of the status update event sequence ID.
52+
// The status update event sequence id represents the order in which status update events occur on a single agent.
53+
ExtensionStatusUpdateSequenceID = "sequenceid"
54+
5155
// ExtensionDeletionTimestamp is the cloud event extension key of the deletion timestamp.
5256
ExtensionDeletionTimestamp = "deletiontimestamp"
5357

@@ -159,6 +163,7 @@ type EventBuilder struct {
159163
clusterName string
160164
originalSource string
161165
resourceID string
166+
sequenceID string
162167
resourceVersion *int64
163168
eventType CloudEventsType
164169
deletionTimestamp time.Time
@@ -181,6 +186,11 @@ func (b *EventBuilder) WithResourceVersion(resourceVersion int64) *EventBuilder
181186
return b
182187
}
183188

189+
func (b *EventBuilder) WithStatusUpdateSequenceID(sequenceID string) *EventBuilder {
190+
b.sequenceID = sequenceID
191+
return b
192+
}
193+
184194
func (b *EventBuilder) WithClusterName(clusterName string) *EventBuilder {
185195
b.clusterName = clusterName
186196
return b
@@ -211,6 +221,10 @@ func (b *EventBuilder) NewEvent() cloudevents.Event {
211221
evt.SetExtension(ExtensionResourceVersion, *b.resourceVersion)
212222
}
213223

224+
if len(b.sequenceID) != 0 {
225+
evt.SetExtension(ExtensionStatusUpdateSequenceID, b.sequenceID)
226+
}
227+
214228
if len(b.clusterName) != 0 {
215229
evt.SetExtension(ExtensionClusterName, b.clusterName)
216230
}

cloudevents/work/agent/codec/manifest.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"strconv"
66

7+
"github.com/bwmarrin/snowflake"
78
cloudevents "github.com/cloudevents/sdk-go/v2"
89
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"
910

@@ -27,6 +28,18 @@ const (
2728
CloudEventsOriginalSourceAnnotationKey = "cloudevents.open-cluster-management.io/originalsource"
2829
)
2930

31+
var sequenceGenerator *snowflake.Node
32+
33+
func init() {
34+
// init the snowflake id generator with node id 1 for each single agent. Each single agent has its own consumer id
35+
// to be identified, and we can ensure the order of status update event from the same agent via sequence id. The
36+
// events from different agents are independent, hence the ordering among them needs not to be guaranteed.
37+
//
38+
// The snowflake `NewNode` returns error only when the snowflake node id is less than 1 or great than 1024, so the
39+
// error can be ignored here.
40+
sequenceGenerator, _ = snowflake.NewNode(1)
41+
}
42+
3043
// ManifestCodec is a codec to encode/decode a ManifestWork/cloudevent with ManifestBundle for an agent.
3144
type ManifestCodec struct {
3245
restMapper meta.RESTMapper
@@ -65,6 +78,7 @@ func (c *ManifestCodec) Encode(source string, eventType types.CloudEventsType, w
6578

6679
evt := types.NewEventBuilder(source, eventType).
6780
WithResourceID(string(work.UID)).
81+
WithStatusUpdateSequenceID(sequenceGenerator.Generate().String()).
6882
WithResourceVersion(resourceVersion).
6983
WithClusterName(work.Namespace).
7084
WithOriginalSource(originalSource).

cloudevents/work/agent/codec/manifest_test.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818

1919
func TestManifestEventDataType(t *testing.T) {
2020
codec := NewManifestCodec(nil)
21-
2221
if codec.EventDataType() != payload.ManifestEventDataType {
2322
t.Errorf("unexpected event data type %s", codec.EventDataType())
2423
}
@@ -143,9 +142,7 @@ func TestManifestEncode(t *testing.T) {
143142

144143
for _, c := range cases {
145144
t.Run(c.name, func(t *testing.T) {
146-
codec := NewManifestCodec(nil)
147-
148-
_, err := codec.Encode("cluster1-work-agent", c.eventType, c.work)
145+
_, err := NewManifestCodec(nil).Encode("cluster1-work-agent", c.eventType, c.work)
149146
if c.expectedErr {
150147
if err == nil {
151148
t.Errorf("expected an error, but failed")
@@ -283,9 +280,7 @@ func TestManifestDecode(t *testing.T) {
283280

284281
for _, c := range cases {
285282
t.Run(c.name, func(t *testing.T) {
286-
codec := NewManifestCodec(nil)
287-
288-
_, err := codec.Decode(c.event)
283+
_, err := NewManifestCodec(nil).Decode(c.event)
289284
if c.expectedErr {
290285
if err == nil {
291286
t.Errorf("expected an error, but failed")

cloudevents/work/agent/codec/manifestbundle.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func (c *ManifestBundleCodec) Encode(source string, eventType types.CloudEventsT
4646

4747
evt := types.NewEventBuilder(source, eventType).
4848
WithResourceID(string(work.UID)).
49+
WithStatusUpdateSequenceID(sequenceGenerator.Generate().String()).
4950
WithResourceVersion(resourceVersion).
5051
WithClusterName(work.Namespace).
5152
WithOriginalSource(originalSource).

cloudevents/work/agent/codec/manifestbundle_test.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,7 @@ func TestManifestBundleEncode(t *testing.T) {
9292

9393
for _, c := range cases {
9494
t.Run(c.name, func(t *testing.T) {
95-
codec := NewManifestBundleCodec()
96-
97-
_, err := codec.Encode("cluster1-work-agent", c.eventType, c.work)
95+
_, err := NewManifestBundleCodec().Encode("cluster1-work-agent", c.eventType, c.work)
9896
if c.expectedErr {
9997
if err == nil {
10098
t.Errorf("expected an error, but failed")
@@ -231,9 +229,7 @@ func TestManifestBundleDecode(t *testing.T) {
231229

232230
for _, c := range cases {
233231
t.Run(c.name, func(t *testing.T) {
234-
codec := NewManifestBundleCodec()
235-
236-
_, err := codec.Decode(c.event)
232+
_, err := NewManifestBundleCodec().Decode(c.event)
237233
if c.expectedErr {
238234
if err == nil {
239235
t.Errorf("expected an error, but failed")

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module open-cluster-management.io/api
33
go 1.20
44

55
require (
6+
github.com/bwmarrin/snowflake v0.3.0
67
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995
78
github.com/cloudevents/sdk-go/v2 v2.14.0
89
github.com/eclipse/paho.golang v0.11.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
22
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
33
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
44
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
5+
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
6+
github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
57
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
68
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
79
github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-20231030012137-0836a524e995 h1:pXyRKZ0T5WoB6X9QnHS5cEyW0Got39bNQIECxGUKVO4=

vendor/github.com/bwmarrin/snowflake/.travis.yml

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/bwmarrin/snowflake/LICENSE

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/bwmarrin/snowflake/README.md

Lines changed: 143 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)