[FLINK-35459] Use Incremental Source Framework in Flink tidb CDC Source Connector#3952
[FLINK-35459] Use Incremental Source Framework in Flink tidb CDC Source Connector#3952Mrart wants to merge 5 commits into
Conversation
|
This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 60 days if no further activity occurs. |
|
This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 60 days if no further activity occurs. |
|
@lvyanquan Please help reopen this pr,I will completed it。 |
…ce Connector # Conflicts: # flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml # flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java # flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSourceFactoryTest.java # flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/utils/UriHostMappingTest.java
There was a problem hiding this comment.
Pull request overview
This PR migrates the TiDB CDC source connector from the legacy TiKV-based implementation to the Incremental Source Framework (JDBC incremental snapshot + Debezium-based streaming), updating connector configuration, shading, and test coverage accordingly.
Changes:
- Replace legacy TiKV SourceFunction/table-factory code with a new Incremental Source Framework implementation (dialect, split reader tasks, offsets, schema discovery).
- Introduce new TiDB table factory/options and update e2e/integration tests to pass JDBC connection properties (hostname/port/username/password).
- Adjust build/shading to include the required Debezium/MySQL/TiDB-related dependencies.
Reviewed changes
Copilot reviewed 80 out of 81 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java | Improves job-running wait loop diagnostics and adds container log dumping on failures/timeouts. |
| flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java | Updates TiDB table DDL options to include JDBC connection fields. |
| flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/JdbcProxy.java | Avoids mutating expected results list during sorting; improves result assertion. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-tidb-cdc/pom.xml | Expands shaded artifacts/relocations and adds filters for Kafka resources. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/resources/ddl/customer.sql | Adds test DDL/data for additional database/table coverage. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/TiDBTestBase.java | Refactors test base and adds helper for building TiDBSourceConfigFactory. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/testutils/RecordsFormatter.java | Adds utility to format SourceRecords into comparable strings for assertions. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/utils/UriHostMappingTest.java | Updates tests for host-mapping behavior using the new options package. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSourceFactoryTest.java | Updates table factory unit tests to new factory/source constructor/options. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorRegionITCase.java | Updates IT case DDL to include JDBC connection fields for TiDB. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorITCase.java | Updates IT cases to include JDBC connection fields and adjusts expected records order/values. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceExampleTest.java | Adds example test for incremental snapshot source consumption. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialectTest.java | Adds dialect test for multi-database table discovery. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/source/reader/TiDBStreamSplitReaderTest.java | Adds stream split reader test for change-event consumption. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/source/offset/EventOffsetCheckpointTest.java | Adds checkpoint round-trip test for TiDB stream offsets. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/source/fetch/TiDBStreamLifecycleTest.java | Adds idempotent close/lifecycle tests for stream reader/task. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory | Switches service registration to the new TiDB table factory. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/utils/UriHostMapping.java | Moves UriHostMapping into a non-table utils package. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/utils/TiDBConnectionUtils.java | Adds utilities for TiDB variable querying and value converter construction. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/utils/TableKeyRangeUtils.java | Moves TableKeyRangeUtils into a non-table utils package. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/utils/TableDiscoveryUtils.java | Adds JDBC-based table discovery with Debezium filters. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVSnapshotEventDeserializationSchema.java | Removes legacy TiKV snapshot deserialization API. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVChangeEventDeserializationSchema.java | Removes legacy TiKV change-event deserialization API. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiDBSource.java | Removes legacy TiDB SourceFunction builder. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java | Removes legacy table/source options class; replaced by new config package. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/utils/OptionUtils.java | Removes legacy option-print utility (replaced by base OptionUtils usage). |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiKVReadableMetadata.java | Removes legacy TiKV metadata model. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiKVMetadataConverter.java | Removes legacy TiKV metadata converter API. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiKVAppendMetadataCollector.java | Removes legacy TiKV metadata collector. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSourceFactory.java | Removes legacy table source factory (replaced by TiDBTableFactory). |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableFactory.java | Adds new DynamicTableSourceFactory wiring incremental snapshot framework + Debezium properties. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBReadableMetadata.java | Adds Debezium-based readable metadata enum for TiDB table source. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBDeserializationConverterFactory.java | Adds TiDB-specific converters for string/array/tinyint handling (geometry/SET/boolean). |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/StartupOptions.java | Removes legacy startup options type (uses base StartupOptions now). |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/RowDataTiKVSnapshotEventDeserializationSchema.java | Removes legacy TiKV snapshot RowData deserializer. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/RowDataTiKVChangeEventDeserializationSchema.java | Removes legacy TiKV change RowData deserializer. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceBuilder.java | Adds builder for JdbcIncrementalSource-based TiDB incremental source. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialect.java | Adds TiDB JdbcDataSourceDialect implementation for incremental snapshot framework. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/splitter/TiDBChunkSplitter.java | Adds TiDB chunk splitter backed by TiDBUtils queries. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBTableDefinition.java | Adds helper for synthesizing DDL for schema parsing fallback paths. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBSchema.java | Adds schema cache and schema fetching via SHOW CREATE TABLE / DESC parsing. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBFieldDefinition.java | Adds model for field metadata extracted from DESC. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBDatabaseSchema.java | Adds Debezium relational database schema integration for TiDB. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/TiDBSourceInfoStructMaker.java | Adds Debezium SourceInfo struct maker for TiDB events. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/TiDBSourceInfo.java | Adds TiDB source info model used for event metadata/source struct. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/EventOffsetUtils.java | Adds utility for loading EventOffsetContext from stored offsets. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/EventOffsetFactory.java | Adds OffsetFactory for TiDB EventOffset. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/EventOffsetContext.java | Adds TiDB offset context implementation used by Debezium pipeline integration. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/EventOffset.java | Adds TiDB offset type based on timestamp + commit version. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/handler/TiDBSchemaChangeEventHandler.java | Replaces legacy placeholder type with SchemaChangeEventHandler implementation stub. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/handler/TiDBErrorHandler.java | Adds error handler with retry logic for specific TiDB/MySQL errors. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/fetch/TiDBStreamFetchTask.java | Adds stream fetch task implementation for stream splits. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/fetch/TiDBSourceFetchTaskContext.java | Adds TiDB fetch task context wiring schema, dispatcher, offsets, queue, error handler. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/fetch/TiDBScanFetchTask.java | Adds snapshot split scan task implementation for incremental snapshot reads. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/fetch/StoppableChangeEventSourceContext.java | Replaces legacy enum content with a stoppable ChangeEventSourceContext implementation. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/fetch/EventEmitter.java | Adds relational change record emitter adapter for TiDB. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/connection/TiDBConnectionPoolFactory.java | Adds JDBC URL factory for pooled TiDB connections. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceOptions.java | Adds new connector options (port/pd-addresses/heartbeat/table-list/host-mapping/jdbc driver). |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfigFactory.java | Adds config factory producing TiDBSourceConfig for incremental source. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java | Adds TiDBSourceConfig extending JdbcSourceConfig with TiDB-specific fields. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetrics.java | Removes references to legacy TiKV source function from metrics. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TidbTopicSelector.java | Adds topic selector for TiDB Debezium integration. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBTaskContext.java | Adds Debezium task context wrapper for TiDB. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBPartition.java | Adds Debezium partition implementation for TiDB. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBEventMetadataProvider.java | Adds event metadata provider for TiDB Debezium pipeline integration. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/Listeners/TiDBAntlrDdlParserListener.java | Adds Antlr DDL parser listener wiring for TiDB DDL parsing. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml | Updates dependencies (geometry, Debezium MySQL, flink-cdc-base) and adjusts testcontainers exclusions. |
| .gitignore | Ignores VS Code project files. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
[FLINK-35459] add stream fetch. [FLINK-35459] add stream fetch,fixed test [FLINK-35459] fixed TiDBTestBase version fixed conflict fixd git fixed checkstyle fixed rat error. fixed ut test
…he#4322) # Conflicts: # flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java
…ce Connector [FLINK-35459] add splitReader & scanFetch test. [FLINK-35459] add stream fetch. [FLINK-35459] fixed TiDBTestBase version fixed conflict fixd git fixed checkstyle fixed rat error. fixed ut test add .vscode ide is more popular in AI ide fixed spotless fixed split key method error. fixed CatalogTableAdapter error fixed FactoryUtilAdapter fixed reader fixed ut fixed ut error fixed spotless:apply fixed e2e fixed test fixed test fixed review fixed alltype s test fixed all fail ut
67339bf to
9c3bb92
Compare
spotless:apply fixed testStreamSplitReader error fixed streamsplit error
Use Incremental Source Framework in Flink CDC TiKV Source Connector