Skip to content

[FLINK-35459] Use Incremental Source Framework in Flink tidb CDC Source Connector#3952

Open
Mrart wants to merge 5 commits into
apache:masterfrom
Mrart:flink-35459
Open

[FLINK-35459] Use Incremental Source Framework in Flink tidb CDC Source Connector#3952
Mrart wants to merge 5 commits into
apache:masterfrom
Mrart:flink-35459

Conversation

@Mrart

@Mrart Mrart commented Mar 17, 2025

Copy link
Copy Markdown
Contributor

Use Incremental Source Framework in Flink CDC TiKV Source Connector

@Mrart Mrart marked this pull request as draft March 17, 2025 02:03
@Mrart Mrart marked this pull request as ready for review March 18, 2025 03:21
@Mrart Mrart changed the title [FLINK-35459] Use Incremental Source Framework in Flink CDC Source Connector [FLINK-35459] Use Incremental Source Framework in Flink tidb CDC Source Connector Apr 21, 2025
@github-actions

Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 60 days if no further activity occurs.

@github-actions github-actions Bot added the Stale label Aug 20, 2025
@lvyanquan lvyanquan removed the Stale label Sep 28, 2025
@lvyanquan lvyanquan added this to the V3.6.0 milestone Sep 28, 2025
@github-actions

Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 60 days if no further activity occurs.

@github-actions github-actions Bot added the Stale label Mar 11, 2026
@github-actions github-actions Bot added the build label Mar 26, 2026
@Mrart Mrart closed this Mar 26, 2026
@Mrart Mrart deleted the flink-35459 branch March 26, 2026 10:56
@Mrart Mrart restored the flink-35459 branch March 26, 2026 10:56
@Mrart

Mrart commented Jun 30, 2026

Copy link
Copy Markdown
Contributor Author

@lvyanquan Please help reopen this pr,I will completed it。

…ce Connector

# Conflicts:
#	flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml
#	flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java
#	flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSourceFactoryTest.java
#	flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/utils/UriHostMappingTest.java

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR migrates the TiDB CDC source connector from the legacy TiKV-based implementation to the Incremental Source Framework (JDBC incremental snapshot + Debezium-based streaming), updating connector configuration, shading, and test coverage accordingly.

Changes:

  • Replace legacy TiKV SourceFunction/table-factory code with a new Incremental Source Framework implementation (dialect, split reader tasks, offsets, schema discovery).
  • Introduce new TiDB table factory/options and update e2e/integration tests to pass JDBC connection properties (hostname/port/username/password).
  • Adjust build/shading to include the required Debezium/MySQL/TiDB-related dependencies.

Reviewed changes

Copilot reviewed 80 out of 81 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java Improves job-running wait loop diagnostics and adds container log dumping on failures/timeouts.
flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/TiDBE2eITCase.java Updates TiDB table DDL options to include JDBC connection fields.
flink-cdc-e2e-tests/flink-cdc-e2e-utils/src/test/java/org/apache/flink/cdc/common/test/utils/JdbcProxy.java Avoids mutating expected results list during sorting; improves result assertion.
flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-tidb-cdc/pom.xml Expands shaded artifacts/relocations and adds filters for Kafka resources.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/resources/ddl/customer.sql Adds test DDL/data for additional database/table coverage.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/TiDBTestBase.java Refactors test base and adds helper for building TiDBSourceConfigFactory.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/testutils/RecordsFormatter.java Adds utility to format SourceRecords into comparable strings for assertions.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/utils/UriHostMappingTest.java Updates tests for host-mapping behavior using the new options package.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSourceFactoryTest.java Updates table factory unit tests to new factory/source constructor/options.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorRegionITCase.java Updates IT case DDL to include JDBC connection fields for TiDB.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/table/TiDBConnectorITCase.java Updates IT cases to include JDBC connection fields and adjusts expected records order/values.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceExampleTest.java Adds example test for incremental snapshot source consumption.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialectTest.java Adds dialect test for multi-database table discovery.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/source/reader/TiDBStreamSplitReaderTest.java Adds stream split reader test for change-event consumption.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/source/offset/EventOffsetCheckpointTest.java Adds checkpoint round-trip test for TiDB stream offsets.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/source/fetch/TiDBStreamLifecycleTest.java Adds idempotent close/lifecycle tests for stream reader/task.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory Switches service registration to the new TiDB table factory.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/utils/UriHostMapping.java Moves UriHostMapping into a non-table utils package.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/utils/TiDBConnectionUtils.java Adds utilities for TiDB variable querying and value converter construction.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/utils/TableKeyRangeUtils.java Moves TableKeyRangeUtils into a non-table utils package.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/utils/TableDiscoveryUtils.java Adds JDBC-based table discovery with Debezium filters.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVSnapshotEventDeserializationSchema.java Removes legacy TiKV snapshot deserialization API.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVChangeEventDeserializationSchema.java Removes legacy TiKV change-event deserialization API.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiDBSource.java Removes legacy TiDB SourceFunction builder.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java Removes legacy table/source options class; replaced by new config package.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/utils/OptionUtils.java Removes legacy option-print utility (replaced by base OptionUtils usage).
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiKVReadableMetadata.java Removes legacy TiKV metadata model.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiKVMetadataConverter.java Removes legacy TiKV metadata converter API.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiKVAppendMetadataCollector.java Removes legacy TiKV metadata collector.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableSourceFactory.java Removes legacy table source factory (replaced by TiDBTableFactory).
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBTableFactory.java Adds new DynamicTableSourceFactory wiring incremental snapshot framework + Debezium properties.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBReadableMetadata.java Adds Debezium-based readable metadata enum for TiDB table source.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/TiDBDeserializationConverterFactory.java Adds TiDB-specific converters for string/array/tinyint handling (geometry/SET/boolean).
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/StartupOptions.java Removes legacy startup options type (uses base StartupOptions now).
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/RowDataTiKVSnapshotEventDeserializationSchema.java Removes legacy TiKV snapshot RowData deserializer.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/table/RowDataTiKVChangeEventDeserializationSchema.java Removes legacy TiKV change RowData deserializer.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceBuilder.java Adds builder for JdbcIncrementalSource-based TiDB incremental source.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialect.java Adds TiDB JdbcDataSourceDialect implementation for incremental snapshot framework.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/splitter/TiDBChunkSplitter.java Adds TiDB chunk splitter backed by TiDBUtils queries.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBTableDefinition.java Adds helper for synthesizing DDL for schema parsing fallback paths.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBSchema.java Adds schema cache and schema fetching via SHOW CREATE TABLE / DESC parsing.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBFieldDefinition.java Adds model for field metadata extracted from DESC.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/schema/TiDBDatabaseSchema.java Adds Debezium relational database schema integration for TiDB.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/TiDBSourceInfoStructMaker.java Adds Debezium SourceInfo struct maker for TiDB events.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/TiDBSourceInfo.java Adds TiDB source info model used for event metadata/source struct.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/EventOffsetUtils.java Adds utility for loading EventOffsetContext from stored offsets.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/EventOffsetFactory.java Adds OffsetFactory for TiDB EventOffset.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/EventOffsetContext.java Adds TiDB offset context implementation used by Debezium pipeline integration.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/offset/EventOffset.java Adds TiDB offset type based on timestamp + commit version.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/handler/TiDBSchemaChangeEventHandler.java Replaces legacy placeholder type with SchemaChangeEventHandler implementation stub.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/handler/TiDBErrorHandler.java Adds error handler with retry logic for specific TiDB/MySQL errors.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/fetch/TiDBStreamFetchTask.java Adds stream fetch task implementation for stream splits.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/fetch/TiDBSourceFetchTaskContext.java Adds TiDB fetch task context wiring schema, dispatcher, offsets, queue, error handler.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/fetch/TiDBScanFetchTask.java Adds snapshot split scan task implementation for incremental snapshot reads.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/fetch/StoppableChangeEventSourceContext.java Replaces legacy enum content with a stoppable ChangeEventSourceContext implementation.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/fetch/EventEmitter.java Adds relational change record emitter adapter for TiDB.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/connection/TiDBConnectionPoolFactory.java Adds JDBC URL factory for pooled TiDB connections.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceOptions.java Adds new connector options (port/pd-addresses/heartbeat/table-list/host-mapping/jdbc driver).
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfigFactory.java Adds config factory producing TiDBSourceConfig for incremental source.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java Adds TiDBSourceConfig extending JdbcSourceConfig with TiDB-specific fields.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetrics.java Removes references to legacy TiKV source function from metrics.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TidbTopicSelector.java Adds topic selector for TiDB Debezium integration.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBTaskContext.java Adds Debezium task context wrapper for TiDB.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBPartition.java Adds Debezium partition implementation for TiDB.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBEventMetadataProvider.java Adds event metadata provider for TiDB Debezium pipeline integration.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/Listeners/TiDBAntlrDdlParserListener.java Adds Antlr DDL parser listener wiring for TiDB DDL parsing.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml Updates dependencies (geometry, Debezium MySQL, flink-cdc-base) and adjusts testcontainers exclusions.
.gitignore Ignores VS Code project files.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Mrart and others added 3 commits July 3, 2026 15:34
[FLINK-35459] add stream fetch.

 [FLINK-35459] add stream fetch,fixed test

[FLINK-35459] fixed TiDBTestBase version

fixed conflict

fixd git

fixed checkstyle

fixed rat error.

fixed ut test
…he#4322)

# Conflicts:
#	flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java
…ce Connector

[FLINK-35459] add splitReader & scanFetch test.

[FLINK-35459] add stream fetch.

[FLINK-35459] fixed TiDBTestBase version

fixed conflict

fixd git

fixed checkstyle

fixed rat error.

fixed ut test

add .vscode ide is more popular in AI ide

fixed spotless

fixed split key method error.

fixed CatalogTableAdapter error

fixed FactoryUtilAdapter

fixed reader

fixed ut

fixed ut error

fixed spotless:apply

fixed e2e

fixed test

fixed test

fixed review

fixed alltype s test

fixed all fail ut
@Mrart Mrart force-pushed the flink-35459 branch 2 times, most recently from 67339bf to 9c3bb92 Compare July 3, 2026 11:49
spotless:apply

fixed testStreamSplitReader error

fixed streamsplit error
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants