Skip to content

Commit a7b5785

Browse files
authored
Make sure all timestamps are UTC (#40)
1 parent 3143e8c commit a7b5785

File tree

4 files changed

+51
-11
lines changed

4 files changed

+51
-11
lines changed

pool/node.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ func (node *Node) routeWorkerEvent(ctx context.Context, ev *streaming.Event) err
541541
node.logger.Debug("routed", "event", ev.EventName, "id", ev.ID, "worker", wid, "worker-event-id", eventID)
542542

543543
// Record the event in the pending events map for future ack.
544-
node.pendingEvents[wid+":"+eventID] = ev
544+
node.pendingEvents[pendingEventKey(wid, eventID)] = ev
545545

546546
return nil
547547
}
@@ -583,7 +583,7 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
583583

584584
workerID, payload := unmarshalEnvelope(ev.Payload)
585585
ack := unmarshalAck(payload)
586-
key := workerID + ":" + ack.EventID
586+
key := pendingEventKey(workerID, ack.EventID)
587587
pending, ok := node.pendingEvents[key]
588588
if !ok {
589589
node.logger.Error(fmt.Errorf("ackWorkerEvent: received unknown event %s from worker %s", ack.EventID, workerID))
@@ -619,7 +619,7 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
619619
}
620620
}
621621
for _, key := range staleKeys {
622-
node.logger.Error(fmt.Errorf("ackWorkerEvent: stale event, removing from pending events"), "event", node.pendingEvents[key].EventName, "id", key)
622+
node.logger.Error(fmt.Errorf("ackWorkerEvent: stale event, removing from pending events"), "event", node.pendingEvents[key].EventName, "id", node.pendingEvents[key].ID, "since", time.Since(node.pendingEvents[key].CreatedAt()), "TTL", 2*node.pendingJobTTL)
623623
delete(node.pendingEvents, key)
624624
}
625625
}
@@ -1028,3 +1028,9 @@ func poolStreamName(pool string) string {
10281028
func nodeStreamName(pool, nodeID string) string {
10291029
return fmt.Sprintf("%s:node:%s", pool, nodeID)
10301030
}
1031+
1032+
// pendingEventKey computes the key of a pending event from a worker ID and a
1033+
// stream event ID.
1034+
func pendingEventKey(workerID, eventID string) string {
1035+
return fmt.Sprintf("%s:%s", workerID, eventID)
1036+
}

pool/node_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ func TestStaleEventsAreRemoved(t *testing.T) {
519519
},
520520
},
521521
}
522-
node.pendingEvents["worker:stale-event-id"] = staleEvent
522+
node.pendingEvents[pendingEventKey("worker", staleEventID)] = staleEvent
523523

524524
// Add a fresh event
525525
freshEventID := fmt.Sprintf("%d-0", time.Now().Add(-time.Second).UnixNano()/int64(time.Millisecond))
@@ -533,35 +533,36 @@ func TestStaleEventsAreRemoved(t *testing.T) {
533533
},
534534
},
535535
}
536-
node.pendingEvents["worker:fresh-event-id"] = freshEvent
536+
node.pendingEvents[pendingEventKey("worker", freshEventID)] = freshEvent
537537

538538
// Create a mock event to trigger the ackWorkerEvent function
539+
mockEventID := "mock-event-id"
539540
mockEvent := &streaming.Event{
540-
ID: "mock-event-id",
541+
ID: mockEventID,
541542
EventName: evAck,
542-
Payload: marshalEnvelope("worker", marshalAck(&ack{EventID: "mock-event-id"})),
543+
Payload: marshalEnvelope("worker", marshalAck(&ack{EventID: mockEventID})),
543544
Acker: &mockAcker{
544545
XAckFunc: func(ctx context.Context, streamKey, sinkName string, ids ...string) *redis.IntCmd {
545546
return redis.NewIntCmd(ctx, 0)
546547
},
547548
},
548549
}
549-
node.pendingEvents["worker:mock-event-id"] = mockEvent
550+
node.pendingEvents[pendingEventKey("worker", mockEventID)] = mockEvent
550551

551552
// Call ackWorkerEvent to trigger the stale event cleanup
552553
node.ackWorkerEvent(ctx, mockEvent)
553554

554555
assert.Eventually(t, func() bool {
555556
node.lock.Lock()
556557
defer node.lock.Unlock()
557-
_, exists := node.pendingEvents["worker:stale-event-id"]
558+
_, exists := node.pendingEvents[pendingEventKey("worker", staleEventID)]
558559
return !exists
559560
}, max, delay, "Stale event should have been removed")
560561

561562
assert.Eventually(t, func() bool {
562563
node.lock.Lock()
563564
defer node.lock.Unlock()
564-
_, exists := node.pendingEvents["worker:fresh-event-id"]
565+
_, exists := node.pendingEvents[pendingEventKey("worker", freshEventID)]
565566
return exists
566567
}, max, delay, "Fresh event should still be present")
567568
}

streaming/reader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ func (e *Event) CreatedAt() time.Time {
288288
ts, _ := strconv.ParseInt(tss, 10, 64)
289289
seconds := ts / 1000
290290
nanos := (ts % 1000) * 1_000_000
291-
return time.Unix(seconds, nanos)
291+
return time.Unix(seconds, nanos).UTC()
292292
}
293293

294294
// streamEvents filters and streams the Redis messages as events to c.

streaming/reader_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package streaming
22

33
import (
4+
"strconv"
45
"strings"
56
"testing"
7+
"time"
68

79
"github.com/stretchr/testify/assert"
810
"github.com/stretchr/testify/require"
911

12+
"github.com/redis/go-redis/v9"
1013
"goa.design/pulse/pulse"
1114
"goa.design/pulse/streaming/options"
1215
ptesting "goa.design/pulse/testing"
@@ -185,3 +188,33 @@ func TestRemoveReaderStream(t *testing.T) {
185188
assert.Equal(t, "event3", read.EventName)
186189
assert.Equal(t, []byte("payload3"), read.Payload)
187190
}
191+
192+
func TestEventCreatedAt(t *testing.T) {
193+
rdb := ptesting.NewRedisClient(t)
194+
defer ptesting.CleanupRedis(t, rdb, false, "")
195+
ctx := ptesting.NewTestContext(t)
196+
197+
// Use Redis to create a new event ID
198+
eventID, err := rdb.XAdd(ctx, &redis.XAddArgs{
199+
Stream: "test-stream",
200+
Values: map[string]interface{}{"key": "value"},
201+
}).Result()
202+
require.NoError(t, err)
203+
204+
event := &Event{ID: eventID}
205+
206+
// Call CreatedAt() method
207+
createdAt := event.CreatedAt()
208+
209+
// Parse the timestamp from the event ID
210+
parts := strings.Split(eventID, "-")
211+
ts, err := strconv.ParseInt(parts[0], 10, 64)
212+
require.NoError(t, err)
213+
expectedTime := time.UnixMilli(ts).UTC()
214+
215+
// Assert that the returned time is exactly the expected time
216+
assert.Equal(t, expectedTime, createdAt)
217+
218+
// Assert that the returned time is in UTC
219+
assert.Equal(t, time.UTC, createdAt.Location())
220+
}

0 commit comments

Comments
 (0)