Β Β Β Β
Apache Flink Connector for Lance Vector Database
Features β’
Quick Start β’
Documentation β’
Contributing
flink-connector-lance is a high-performance Apache Flink connector for Lance , a modern columnar data format optimized for machine learning workloads and vector search. This connector enables seamless integration between Flink's powerful stream/batch processing capabilities and Lance's efficient vector storage.
Feature
Description
π Source & Sink
Full read/write support for Lance datasets
π Table API & SQL
Native Flink SQL DDL/DML support
π Vector Search
KNN search with L2, Cosine, Dot metrics
π Index Building
IVF_PQ, IVF_HNSW, IVF_FLAT indexes
β
Exactly-Once
Checkpoint-based exactly-once semantics
π― Predicate Pushdown
Filter pushdown for optimized reads
π Catalog Support
Lance Catalog for metadata management
Java 8 or higher
Apache Flink 1.16.x
Maven 3.6+
<dependency >
<groupId >org.apache.flink</groupId >
<artifactId >flink-connector-lance</artifactId >
<version >1.0.0-SNAPSHOT</version >
</dependency >
-- Create a Lance table
CREATE TABLE vectors (
id BIGINT ,
content STRING,
embedding ARRAY< FLOAT>
) WITH (
' connector' = ' lance' ,
' path' = ' /data/vectors' ,
' write.batch-size' = ' 1024'
);
-- Insert data
INSERT INTO vectors VALUES
(1 , ' Hello World' , ARRAY[0 .1 , 0 .2 , 0 .3 , 0 .4 ]);
-- Query data
SELECT * FROM vectors WHERE id > 0 ;
// Read from Lance
LanceSource source = LanceSource .builder ()
.path ("/data/vectors" )
.batchSize (1024 )
.columns (Arrays .asList ("id" , "embedding" ))
.rowType (rowType )
.build ();
DataStream <RowData > stream = env .addSource (source );
// Write to Lance
LanceSink sink = LanceSink .builder ()
.path ("/data/output" )
.batchSize (1024 )
.writeMode (WriteMode .APPEND )
.rowType (rowType )
.build ();
stream .addSink (sink );
CREATE CATALOG lance_catalog WITH (
' type' = ' lance' ,
' warehouse' = ' /path/to/warehouse' ,
' default-database' = ' default'
);
USE CATALOG lance_catalog;
Option
Required
Default
Description
connector
β
-
Must be lance
path
β
-
Lance dataset path
read.batch-size
β
1024
Read batch size
write.batch-size
β
1024
Write batch size
write.mode
β
append
append or overwrite
write.max-rows-per-file
β
1000000
Max rows per file
LanceIndexBuilder builder = LanceIndexBuilder .builder ()
.datasetPath ("/data/vectors" )
.columnName ("embedding" )
.indexType (IndexType .IVF_PQ )
.metricType (MetricType .L2 )
.numPartitions (256 )
.numSubVectors (16 )
.build ();
builder .buildIndex ();
Index Type
Use Case
Parameters
IVF_PQ
Large-scale, memory-efficient
num_partitions, num_sub_vectors, num_bits
IVF_HNSW
High recall, fast search
num_partitions, m, ef_construction
IVF_FLAT
Small datasets, exact search
num_partitions
π Index Selection Guide
Scenario
Recommended Index
Reason
< 100K vectors
IVF_FLAT
High accuracy, acceptable performance
100K - 10M vectors
IVF_PQ
Good balance of accuracy and memory
> 10M vectors
IVF_PQ (tuned)
Optimize num_partitions and num_sub_vectors
High recall required
IVF_HNSW
Best accuracy, higher memory usage
Memory constrained
IVF_PQ
Most memory efficient
Real-time search
IVF_HNSW
Fastest query latency
βοΈ Index Configuration Options
Option
Default
Description
index.type
IVF_PQ
Index type: IVF_PQ, IVF_HNSW, IVF_FLAT
index.column
-
Vector column name for indexing
index.num-partitions
256
Number of IVF partitions
index.num-sub-vectors
16
PQ sub-vectors (IVF_PQ only)
index.num-bits
8
Bits per sub-vector (IVF_PQ only)
index.m
16
HNSW max connections (IVF_HNSW only)
index.ef-construction
100
HNSW build quality (IVF_HNSW only)
π§ Index Building SQL Example
-- Create table with IVF_PQ index
CREATE TABLE doc_embeddings (
doc_id BIGINT ,
title STRING,
embedding ARRAY< FLOAT>
) WITH (
' connector' = ' lance' ,
' path' = ' /data/embeddings' ,
' index.type' = ' IVF_PQ' ,
' index.column' = ' embedding' ,
' index.num-partitions' = ' 256' ,
' index.num-sub-vectors' = ' 16' ,
' vector.metric' = ' COSINE'
);
-- Create table with IVF_HNSW index (high accuracy)
CREATE TABLE high_accuracy_vectors (
id BIGINT ,
vector ARRAY< FLOAT>
) WITH (
' connector' = ' lance' ,
' path' = ' /data/ha_vectors' ,
' index.type' = ' IVF_HNSW' ,
' index.column' = ' vector' ,
' index.num-partitions' = ' 128' ,
' index.m' = ' 32' ,
' index.ef-construction' = ' 200' ,
' vector.metric' = ' L2'
);
LanceVectorSearch search = LanceVectorSearch .builder ()
.datasetPath ("/data/vectors" )
.columnName ("embedding" )
.metricType (MetricType .COSINE )
.nprobes (20 )
.build ();
search .open ();
List <SearchResult > results = search .search (queryVector , 10 );
search .close ();
Metric
Description
Range
L2
Euclidean distance
[0, β)
Cosine
Cosine similarity
[-1, 1]
Dot
Inner product
(-β, β)
Lance/Arrow Type
Flink Type
Int8/16/32/64
TINYINT/SMALLINT/INT/BIGINT
Float32/64
FLOAT/DOUBLE
String
STRING
Boolean
BOOLEAN
Binary
BYTES
Date32
DATE
Timestamp
TIMESTAMP
FixedSizeList<Float>
ARRAY<FLOAT>
ποΈ Project Structure
flink-connector-lance/
βββ src/main/java/org/apache/flink/connector/lance/
β βββ LanceSource.java # Source implementation
β βββ LanceSink.java # Sink with checkpointing
β βββ LanceInputFormat.java # Batch input format
β βββ LanceIndexBuilder.java # Vector index builder
β βββ LanceVectorSearch.java # KNN search
β βββ config/
β β βββ LanceOptions.java # Configuration options
β βββ converter/
β β βββ LanceTypeConverter.java
β β βββ RowDataConverter.java
β βββ table/
β βββ LanceDynamicTableFactory.java
β βββ LanceDynamicTableSource.java
β βββ LanceDynamicTableSink.java
β βββ LanceCatalog.java
β βββ LanceVectorSearchFunction.java
βββ src/test/java/ # Unit & integration tests
# Build without tests
mvn clean package -DskipTests
# Build with tests
mvn clean package
# Run tests only
mvn test
Contributions are welcome! Please feel free to submit a Pull Request.
Fork the repository
Create your feature branch (git checkout -b feature/AmazingFeature)
Commit your changes (git commit -m 'Add some AmazingFeature')
Push to the branch (git push origin feature/AmazingFeature)
Open a Pull Request
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Made with β€οΈ by the Flink Connector Lance Contributors