|
1 | 1 | # kafka-connect-toolkit |
| 2 | +- Build & setup |
| 3 | + - [Build](#build) |
| 4 | + - [Setup](#setup) |
2 | 5 | - Toolkit |
3 | 6 | - [DropSchemaless](#dropschemaless) |
4 | 7 | - [HeaderFromField](#headerfromfield) |
|
18 | 21 | - [NormalizeFieldValue](#normalizefieldvalue) |
19 | 22 | - [NormalizeFieldName](#normalizefieldname) |
20 | 23 | - Debezium |
21 | | - - [TimestampConverter](#timestampconverter) |
| 24 | + - [TimestampConverter](#timestampconverter-postgres) |
22 | 25 | - [SchemaRename](#schemarename) |
23 | 26 |
|
| 27 | +## Build & setup |
| 28 | +### Build |
| 29 | +To build jars use: |
| 30 | +```shell |
| 31 | +make package |
| 32 | +``` |
| 33 | + |
| 34 | +Jars will be placed in: |
| 35 | +- toolkit: `./modules/toolkit/build/libs` |
| 36 | +- debezium: `./modules/debezium/build/libs` |
| 37 | + |
| 38 | +### Setup |
| 39 | +Built jars should be placed in each kafka-connect node. |
| 40 | +- Custom **transforms** should be placed in a path, defined in `plugin.path` ([more info](https://docs.confluent.io/platform/current/connect/transforms/custom.html)) |
| 41 | +- Custom **converters** should be placed in each connector ([more info](https://debezium.io/documentation/reference/stable/development/converters.html)) |
| 42 | + |
24 | 43 | ## Toolkit |
25 | 44 | ### DropSchemaless |
26 | 45 | Allow to drop record if key or value schema is null |
@@ -312,38 +331,99 @@ transforms.normalizeFieldNameValue.case.to=UPPER_CAMEL |
312 | 331 |
|
313 | 332 | ## Debezium |
314 | 333 |
|
315 | | -### TimestampConverter |
| 334 | +### TimestampConverter (Postgres) |
| 335 | +The goal of this converter is to get predictable results from DB for temporal types especially for avro schemas. |
| 336 | +For example, initially these types will be generated for the next fields (for types with timezone initial TZ was GMT+3): |
| 337 | + |
| 338 | +| jdbc type | raw input | output | output type (avro) | |
| 339 | +|:----------------------------|:--------------------|:----------------------------|:---------------------------------------------------------------------------------------| |
| 340 | +| TIMESTAMP WITHOUT TIME ZONE | 2025-01-01 12:00:00 | 1735732800000000 | {"type":"long","connect.version":1,"connect.name":"io.debezium.time.MicroTimestamp"} | |
| 341 | +| TIMESTAMP WITH TIME ZONE | 2025-01-01 12:00:00 | 2025-01-01T09:00:00.000000Z | {"type":"string","connect.version":1,"connect.name":"io.debezium.time.ZonedTimestamp"} | |
| 342 | +| DATE | 2025-01-01 | 20089 | {"type":"int","connect.version":1,"connect.name":"io.debezium.time.Date"} | |
| 343 | +| TIME WITHOUT TIME ZONE | 12:00:00 | 43200000000 | {"type":"long","connect.version":1,"connect.name":"io.debezium.time.MicroTime"} | |
| 344 | +| TIME WITH TIME ZONE | 12:00:00 | 09:00:00Z | {"type":"string","connect.version":1,"connect.name":"io.debezium.time.ZonedTime"} | |
| 345 | + |
| 346 | +As you can see, for avro there were no any logicalTypes. Also types **with timezone** returned as strings (which may be ok for some cases) while other types returned as some numerics. |
| 347 | +In the end it will affect how these values will be saved in the target system (like iceberg table, parquet/orc file, etc) and these fields will lost any info about its initial type (like that it was a temporal one). |
| 348 | + |
| 349 | +To overcome it, one possible solution is to use custom converter. By default, without any additional configs, it will generate the next results for the same types: |
| 350 | + |
| 351 | +| jdbc type | raw input | output | output type (avro) | |
| 352 | +|:----------------------------|:--------------------|:------------------------|:---------------------------------------------------------------------------------------------------------------------------------------| |
| 353 | +| TIMESTAMP WITHOUT TIME ZONE | 2025-01-01 12:00:00 | 2025-01-01T12:00:00.000 | ["null","string"] | |
| 354 | +| TIMESTAMP WITH TIME ZONE | 2025-01-01 12:00:00 | 1735722000000 | ["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}] | |
| 355 | +| DATE | 2025-01-01 | 20089 | ["null",{"type":"int","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Date","logicalType":"date"}] | |
| 356 | +| TIME WITHOUT TIME ZONE | 12:00:00 | 12:00:00.000 | ["null","string"] | |
| 357 | +| TIME WITH TIME ZONE | 12:00:00 | 32400000 | ["null",{"type":"int","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Time","logicalType":"time-millis"}] | |
| 358 | + |
| 359 | +Using this converter without any additional settings, date, timestamptz and timetz will have concrete logicalType, which any avro library can handle. |
| 360 | +For other types plain string will be returned in some predefined format. Keep in mind, that this string representation of time **is not in UTC**, but it should be considered as **in some point in time in some local TZ**. |
| 361 | +The reason for it is that Postgres does not save any information about TZ which was used during save for types without timezone. |
| 362 | + |
| 363 | +But if you know exactly, that, e.g. timestamp and time were saved in GMT+3, then you can convert even these types to some logical counterpart: |
| 364 | +```properties |
| 365 | +converters=timestampConverter |
| 366 | +timestampConverter.type=com.nryanov.kafka.connect.toolkit.debezium.converters.TimestampConverter |
| 367 | +timestampConverter.time.shift=-03:00 |
| 368 | +timestampConverter.timestamp.shift=-03:00 |
| 369 | +timestampConverter.timestamp.type=TIMESTAMP |
| 370 | +timestampConverter.time.type=TIME |
| 371 | +``` |
| 372 | + |
| 373 | +Using this configuration, you can: |
| 374 | +- Shift temporal types without timezone (and achieve UTC if needed) |
| 375 | +- Choose final type which should be used instead of string |
| 376 | + |
| 377 | +Finally, **timestamp** and **time** will be in like these: |
| 378 | + |
| 379 | +| jdbc type | raw input | output | output type (avro) | |
| 380 | +|:----------------------------|:--------------------|:--------------|:---------------------------------------------------------------------------------------------------------------------------------------| |
| 381 | +| TIMESTAMP WITHOUT TIME ZONE | 2025-01-01 12:00:00 | 1735722000000 | ["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}] | |
| 382 | +| TIME WITHOUT TIME ZONE | 12:00:00 | 32400000 | ["null",{"type":"int","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Time","logicalType":"time-millis"}] | |
| 383 | + |
| 384 | +If you want to return everything just like strings, you can set up for every jdbc-type final output as `STRING` and also configure pattern. |
| 385 | + |
| 386 | +Complete list of properties: |
316 | 387 | ```properties |
317 | 388 | converters=timestampConverter |
318 | 389 | timestampConverter.type=com.nryanov.kafka.connect.toolkit.debezium.converters.TimestampConverter |
319 | 390 |
|
320 | 391 | # optional properties |
| 392 | +## allows to shift timestamp and time |
| 393 | +timestampConverter.time.shift=+03:00 # default: +00:00 |
| 394 | +timestampConverter.timestamp.shift=+03:00 # default: +00:00 |
321 | 395 |
|
322 | | -timestampConverter.time.shift=+03:00 # default: UTC (+0) |
323 | | -timestampConverter.timestamp.shift=+03:00 # default: UTC (+0) |
324 | | - |
| 396 | +## allows to choose output type for timestamp without timezone |
325 | 397 | timestampConverter.timestamp.type={STRING|TIMESTAMP} # default: STRING |
326 | 398 | timestampConverter.timestamp.pattern={PATTERN -- when type=STRING} # default: yyyy-MM-dd'T'HH:mm:ss.SSS |
327 | 399 |
|
| 400 | +## allows to choose output type for timestamp with timezone |
328 | 401 | timestampConverter.timestamptz.type={STRING|TIMESTAMP} # default: TIMESTAMP |
329 | 402 | timestampConverter.timestamptz.pattern={PATTERN -- when type=STRING} # default: yyyy-MM-dd'T'HH:mm:ss.SSS'Z' |
330 | 403 |
|
| 404 | +## allows to choose output type for date |
331 | 405 | timestampConverter.date.type={STRING|DATE} # default: DATE |
332 | 406 | timestampConverter.date.pattern={PATTERN -- when type=STRING} # default: yyyy-MM-dd" |
333 | 407 |
|
| 408 | +## allows to choose output type for time without timezone |
334 | 409 | timestampConverter.time.type={STRING|TIME} # default: STRING |
335 | 410 | timestampConverter.time.pattern={PATTERN -- when type=STRING} # default: HH:mm:ss.SSS |
336 | 411 |
|
| 412 | +## allows to choose output type for time with timezone |
337 | 413 | timestampConverter.timetz.type={STRING|TIME} # default: TIME |
338 | 414 | timestampConverter.timetz.pattern={PATTERN -- when type=STRING} # default: HH:mm:ss.SSS'Z' |
339 | 415 | ``` |
340 | 416 |
|
341 | 417 | ### SchemaRename |
| 418 | +For sharding and/or hypertable (timescaledb) debezium will generate different names for before/after schema even for the same tables (e.g. in different shards). |
| 419 | +Different names will affect how new schemas will be saved in Schema Registry (SR) if compatibility type != **NONE**. |
| 420 | +To overcome it, special transform should be used to re-name internal schemas: |
| 421 | + |
342 | 422 | ```properties |
343 | 423 | transforms=schemaRename |
344 | 424 | transforms.schemaRename.type=com.nryanov.kafka.connect.toolkit.debezium.transforms.SchemaRename |
345 | 425 | transforms.schemaRename.internal.name={new_name} # if not set then transform will not change any records |
| 426 | +``` |
346 | 427 |
|
347 | | -# optional |
348 | | -transforms.schemaRename.cache.size={cache_size} # default: 32 |
349 | | -``` |
| 428 | +In general, this transform should be used in pair with the default one [SetSchemaMetadata](https://docs.confluent.io/kafka-connectors/transforms/current/setschemametadata.html). In this case default transform will re-name outer schema, and this transform will re-name internal ones. |
| 429 | +It will allow you to control schema evolution and avoid errors during schemas update in source DB. |
0 commit comments