Skip to content

Commit f18bc0c

Browse files
committed
minor bugfix for ctx value; additional streamdal docs
1 parent 6dd23f0 commit f18bc0c

File tree

5 files changed

+2648
-133
lines changed

5 files changed

+2648
-133
lines changed

README.STREAMDAL.md

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
Confluent's Golang Client for Apache Kafka<sup>TM</sup> (instrumented with Streamdal)
2+
=====================================================================================
3+
4+
This library has been instrumented with [Streamdal's Go SDK](https://github.com/streamdal/streamdal/tree/main/sdks/go).
5+
6+
## Getting Started
7+
8+
The following environment variables must be set before launching a producer or consumer:
9+
10+
1. `STREAMDAL_ADDRESS`
11+
- Address for the streamdal server (Example: `localhost:8082`)
12+
1. `STREAMDAL_AUTH_TOKEN`
13+
- Authentication token used by the server (Example: `1234`)
14+
1. `STREAMDAL_SERVICE_NAME`
15+
- How this application/service will be identified in Streamdal Console (Example: `kafkacat`)
16+
17+
By default, the library will not have Streamdal instrumentation enabled; to enable it,
18+
you will need to set `EnableStreamdal` in either
19+
[`kafka.ReaderConfig`](https://github.com/streamdal/segmentio-kafka-go/blob/main/reader.go#L531) or [`kafka.WriterConfig{}`](https://github.com/streamdal/segmentio-kafka-go/blob/main/writer.go#L334).
20+
21+
🎉 That's it - you're ready to run the example! 🎉
22+
23+
<sub>For more in-depth explanation of the changes and available settings, see [What's changed?](#whats-changed).</sub>
24+
25+
## Example
26+
27+
A fully working example is provided in [examples/go-kafkacat-streamdal](examples/go-kafkacat-streamdal).
28+
29+
To run the example:
30+
31+
1. Change directory to `examples/go-kafkacat-streamdal`
32+
1. Start a local Kafka instance: `docker-compose up -d`
33+
1. Install & start Streamdal: `curl -sSL https://sh.streamdal.com | sh`
34+
1. Open a browser to verify you can see the streamdal UI at: `http://localhost:8080`
35+
- _It should look like this:_ ![streamdal-console-1](./assets/streamdal-console-1.png)
36+
1. Launch a consumer:
37+
```
38+
STREAMDAL_ADDRESS=localhost:8082 \
39+
STREAMDAL_AUTH_TOKEN=1234 \
40+
STREAMDAL_SERVICE_NAME=kafkacat \
41+
go run go-kafkacat-streamdal.go --broker localhost consume --group testgroup test
42+
```
43+
1. In another terminal, launch a producer:
44+
```
45+
STREAMDAL_ADDRESS=localhost:8082 \
46+
STREAMDAL_AUTH_TOKEN=1234 \
47+
STREAMDAL_SERVICE_NAME=kafkacat \
48+
go run go-kafkacat-streamdal.go produce --broker localhost --topic test --key-delim=":"
49+
```
50+
1. In the `producer` terminal, produce some data by pasting: `testKey:{"email":"[email protected]"}`
51+
1. In the `consumer` terminal, you should see: `{"email":"[email protected]"}`
52+
1. Open the Streamdal Console in a browser [https://localhost:8080](https://localhost:8080)
53+
- _It should look like this:_ ![streamdal-console-2](./assets/streamdal-console-2.png)
54+
1. Create a pipeline that detects and masks PII fields & attach it to the consumer
55+
- ![streamdal-console-3](./assets/streamdal-console-3.gif)
56+
1. Produce a message in producer terminal: `testKey:{"email":"[email protected]"}`
57+
1. You should see a masked message in the consumer terminal: `{"email":"fo*********"}`
58+
- _**Tip**: If you detach the pipeline from the consumer and paste the same message again, you
59+
will see the original, unmasked message._
60+
61+
## Passing "runtime" settings to the shim
62+
By default, the shim will set the `ComponentName` to "kafka" and the `OperationName`
63+
to the name of the topic you are producing to or consuming from.
64+
65+
Also, by default, if the shim runs into any errors executing `streamdal.Process()`,
66+
it will swallow the errors and return the original value.
67+
68+
You can change this behavior at runtime by passing a context with a value containing a [`StreamdalRuntimeConfig`](https://github.com/streamdal/segmentio-kafka-go/blob/main/streamdal.go#L27) to
69+
the `ReadMessage(ctx)`, `FetchMessage(ctx)`, and `WriteMessages(ctx, msgs...)` methods.
70+
71+
You can see an example of this in the [go-kafkacat-streamdal example](examples/go-kafkacat-streamdal/main.go#L164).
72+
73+
```go
74+
// StreamdalRuntimeConfig is an optional configuration structure that can be
75+
// passed to kafka.FetchMessage() and kafka.WriteMessage() methods to influence
76+
// streamdal shim behavior.
77+
//
78+
// NOTE: This struct is intended to be passed as a value in a context.Context.
79+
// This is done this way to avoid having to change FetchMessage() and WriteMessages()
80+
// signatures.
81+
type StreamdalRuntimeConfig struct {
82+
// StrictErrors will cause the shim to return a kafka.Error if Streamdal.Process()
83+
// runs into an unrecoverable error. Default: swallow error and return original value.
84+
StrictErrors bool
85+
86+
// Audience is used to specify a custom audience when the shim calls on
87+
// streamdal.Process(); if nil, a default ComponentName and OperationName
88+
// will be used. Only non-blank values will be used to override audience defaults.
89+
Audience *streamdal.Audience
90+
}
91+
```
92+
93+
## What's changed?
94+
95+
The goal of any shim is to make minimally invasive changes so that the original
96+
library remains backwards-compatible and does not present any "surprises" at
97+
runtime.
98+
99+
The following changes have been made to the original library:
100+
101+
1. `ReaderConfig` and `WriterConfig` have been updated to include a new field: `EnableStreamdal (bool)`
102+
- This is how you enable the Streamdal instrumentation in the library.
103+
1. `ReadMessage(ctx)`, `FetchMessage(ctx)`, `WriteMessages(ctx, msgs...)` have
104+
been updated to run `streamdal.Process()` if `EnableStreamdal` is set to `true`
105+
in either `kafka.ReaderConfig` or `kafka.WriterConfig`.
106+
1. A new file [./streamdal.go](./streamdal.go) has been added to the library that
107+
contains helper funcs, structs and vars used for simplifying Streamdal
108+
instrumentation in the core library.

0 commit comments

Comments
 (0)