Skip to content

Commit 4d167d4

Browse files
test: Example for on_success e2e test (#217)
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com> Signed-off-by: adarsh0728 <gooneriitk@gmail.com> Co-authored-by: Adarsh Jain <gooneriitk@gmail.com>
1 parent 8e6eec7 commit 4d167d4

File tree

9 files changed

+135
-3
lines changed

9 files changed

+135
-3
lines changed

examples/mapper/cat_metadata/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Cat Metadata
1+
# Forward Metadata
22

33
This example demonstrates a User Defined Map Function that simply forwards the message payload (like `cat`) but adds metadata.
44

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
# LogMetadata
1+
# Log Metadata
22

33
An example User Defined Sink that prints the user metadata of datum.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
####################################################################################################
2+
# base
3+
####################################################################################################
4+
FROM alpine:3.20 AS base
5+
ARG TARGETARCH
6+
RUN apk update && apk upgrade && \
7+
apk add ca-certificates && \
8+
apk --no-cache add tzdata
9+
10+
COPY dist/on-success-log-${TARGETARCH} /bin/on-success-log
11+
RUN chmod +x /bin/on-success-log
12+
13+
####################################################################################################
14+
# on_success_log
15+
####################################################################################################
16+
FROM scratch AS on_success_log
17+
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
18+
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
19+
COPY --from=base /bin/on-success-log /bin/on-success-log
20+
ENTRYPOINT [ "/bin/on-success-log" ]
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
TAG ?= stable
2+
PUSH ?= false
3+
IMAGE_REGISTRY = quay.io/numaio/numaflow-go/on-success-log:${TAG}
4+
ARCHITECTURES = amd64 arm64
5+
6+
.PHONY: build
7+
build:
8+
for arch in $(ARCHITECTURES); do \
9+
CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/on-success-log-$${arch} main.go; \
10+
done
11+
12+
.PHONY: image-push
13+
image-push: build
14+
docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target on_success_log . --push
15+
16+
.PHONY: image-load
17+
image-load: build
18+
docker buildx build -t ${IMAGE_REGISTRY} --platform linux/arm64,linux/amd64 --target on_success_log . --load
19+
20+
.PHONY: image
21+
image: build
22+
docker build -t ${IMAGE_REGISTRY} --target on_success_log .
23+
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
24+
25+
clean:
26+
-rm -rf ./dist
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# OnSuccess Sink Logger Example
2+
3+
An example User Defined Sink that simulates primary sink write successes and returns a
4+
* OnSuccess response on write successes so that the payload will be sent to the onSuccess sink.
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
module on_success_log
2+
3+
go 1.24.0
4+
5+
toolchain go1.24.4
6+
7+
replace github.com/numaproj/numaflow-go => ../../..
8+
9+
require github.com/numaproj/numaflow-go v0.10.1
10+
11+
require (
12+
golang.org/x/net v0.46.0 // indirect
13+
golang.org/x/sync v0.17.0 // indirect
14+
golang.org/x/sys v0.37.0 // indirect
15+
golang.org/x/text v0.30.0 // indirect
16+
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect
17+
google.golang.org/grpc v1.76.0 // indirect
18+
google.golang.org/protobuf v1.36.10 // indirect
19+
)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
2+
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
3+
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
4+
golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
5+
golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
6+
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
7+
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
8+
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
9+
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
10+
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
11+
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
12+
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
13+
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
14+
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
15+
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
16+
golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
17+
golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
18+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
19+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b h1:zPKJod4w6F1+nRGDI9ubnXYhU9NSWoFAijkHkUXeTK8=
20+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
21+
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:i/Q+bfisr7gq6feoJnS/DlpdwEL4ihp41fvRiM3Ork0=
22+
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
23+
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
24+
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
25+
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
26+
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
27+
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
28+
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
29+
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
30+
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
8+
sinksdk "github.com/numaproj/numaflow-go/pkg/sinker"
9+
)
10+
11+
// onSuccessLogSink is a sinker implementation that logs the input to stdout
12+
type onSuccessLogSink struct {
13+
}
14+
15+
func (l *onSuccessLogSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses {
16+
result := sinksdk.ResponsesBuilder()
17+
for d := range datumStreamCh {
18+
_ = d.EventTime()
19+
_ = d.Watermark()
20+
id := d.ID()
21+
fmt.Println("Primary sink write succeeded, writing to onSuccess sink - ", string(d.Value()))
22+
// write to onSuccess sink
23+
result = result.Append(sinksdk.ResponseOnSuccess(id, sinksdk.NewMessage([]byte("on-success-message"))))
24+
}
25+
return result
26+
}
27+
28+
func main() {
29+
err := sinksdk.NewServer(&onSuccessLogSink{}).Start(context.Background())
30+
if err != nil {
31+
log.Panic("Failed to start sink function server: ", err)
32+
}
33+
}

examples/sourcetransformer/metadata_event_time/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Metadata Event Time
1+
# Event Time Metadata
22

33
This example demonstrates a Source Transformer that updates the event time of each message to the current time. It also adds this new timestamp to the message's metadata.
44

0 commit comments

Comments
 (0)