Skip to content

Commit 0bbddf6

Browse files
committed
Merge branch 'cassandra-5.0' into trunk
* cassandra-5.0: Refactor SAI ANN query execution to use score ordered iterators for correctness and speed
2 parents 4454ab8 + 23ec1c8 commit 0bbddf6

File tree

71 files changed

+3671
-1135
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+3671
-1135
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@
308308
* Add the ability to disable bulk loading of SSTables (CASSANDRA-18781)
309309
* Clean up obsolete functions and simplify cql_version handling in cqlsh (CASSANDRA-18787)
310310
Merged from 5.0:
311+
* Refactor SAI ANN query execution to use score ordered iterators for correctness and speed (CASSANDRA-20086)
311312
* Fix ConcurrentModificationException in compaction garbagecollect (CASSANDRA-21065)
312313
* Dynamically skip sharding L0 when SAI Vector index present (CASSANDRA-19661)
313314
* Optionally force IndexStatusManager to use the optimized index status format (CASSANDRA-21132)

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -507,15 +507,21 @@ public enum CassandraRelevantProperties
507507
/** Whether to allow the user to specify custom options to the hnsw index */
508508
SAI_VECTOR_ALLOW_CUSTOM_PARAMETERS("cassandra.sai.vector.allow_custom_parameters", "false"),
509509

510-
/** Controls the maximum top-k limit for vector search */
511-
SAI_VECTOR_SEARCH_MAX_TOP_K("cassandra.sai.vector_search.max_top_k", "1000"),
512-
513510
/**
514-
* Controls the maximum number of PrimaryKeys that will be read into memory at one time when ordering/limiting
515-
* the results of an ANN query constrained by non-ANN predicates.
511+
* The maximum number of primary keys that a WHERE clause may materialize before the query planner switches
512+
* from a search-then-sort execution strategy to an order-by-then-filter strategy. Increasing this limit allows
513+
* more primary keys to be buffered in memory, enabling either (a) brute-force sorting or (b) graph traversal
514+
* with a restrictive filter that admits only nodes whose primary keys matched the WHERE clause.
515+
*
516+
* Note also that the SAI_INTERSECTION_CLAUSE_LIMIT is applied to the WHERE clause before using a search to
517+
* build a potential result set for search-then-sort query execution.
516518
*/
517-
SAI_VECTOR_SEARCH_ORDER_CHUNK_SIZE("cassandra.sai.vector_search.order_chunk_size", "100000"),
519+
SAI_VECTOR_SEARCH_MAX_MATERIALIZE_KEYS("cassandra.sai.vector_search.max_materialized_keys", "16000"),
520+
521+
/** Controls the maximum top-k limit for vector search */
522+
SAI_VECTOR_SEARCH_MAX_TOP_K("cassandra.sai.vector_search.max_top_k", "1000"),
518523

524+
SCHEMA_PULL_INTERVAL_MS("cassandra.schema_pull_interval_ms", "60000"),
519525
SCHEMA_UPDATE_HANDLER_FACTORY_CLASS("cassandra.schema.update_handler_factory.class"),
520526
SEARCH_CONCURRENCY_FACTOR("cassandra.search_concurrency_factor", "1"),
521527

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.db;
20+
21+
/**
22+
* A cell's source data object. Can be used to determine if two cells originated from the same object, e.g. memtable
23+
* or sstable.
24+
*/
25+
public interface CellSourceIdentifier
26+
{
27+
/**
28+
* Returns true iff this and other CellSourceIdentifier are equal, indicating that the cell are from the same
29+
* source.
30+
* @param other the other source with which to compare
31+
* @return true if the two sources are equal
32+
*/
33+
default boolean isEqualSource(CellSourceIdentifier other)
34+
{
35+
return this.equals(other);
36+
}
37+
}

src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.cassandra.db.partitions.PartitionIterators;
5757
import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
5858
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
59+
import org.apache.cassandra.db.rows.BaseRowIterator;
5960
import org.apache.cassandra.db.rows.Cell;
6061
import org.apache.cassandra.db.rows.Row;
6162
import org.apache.cassandra.db.rows.Rows;
@@ -726,10 +727,26 @@ public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, ReadExe
726727
assert executionController != null && executionController.validForReadOn(cfs);
727728
Tracing.trace("Executing single-partition query on {}", cfs.name);
728729

729-
return queryMemtableAndDiskInternal(cfs, executionController);
730+
Tracing.trace("Acquiring sstable references");
731+
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
732+
return queryMemtableAndDiskInternal(cfs, view, null, executionController);
733+
}
734+
735+
public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs,
736+
ColumnFamilyStore.ViewFragment view,
737+
Function<CellSourceIdentifier, Transformation<BaseRowIterator<?>>> rowTransformer,
738+
ReadExecutionController executionController)
739+
{
740+
assert executionController != null && executionController.validForReadOn(cfs);
741+
Tracing.trace("Executing single-partition query on {}", cfs.name);
742+
743+
return queryMemtableAndDiskInternal(cfs, view, rowTransformer, executionController);
730744
}
731745

732-
private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, ReadExecutionController controller)
746+
private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs,
747+
ColumnFamilyStore.ViewFragment view,
748+
Function<CellSourceIdentifier, Transformation<BaseRowIterator<?>>> rowTransformer,
749+
ReadExecutionController controller)
733750
{
734751
/*
735752
* We have 2 main strategies:
@@ -753,11 +770,9 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
753770
&& !queriesMulticellType()
754771
&& !controller.isTrackingRepairedStatus())
755772
{
756-
return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter(), controller);
773+
return queryMemtableAndSSTablesInTimestampOrder(cfs, view, rowTransformer, (ClusteringIndexNamesFilter)clusteringIndexFilter(), controller);
757774
}
758775

759-
Tracing.trace("Acquiring sstable references");
760-
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
761776
view.sstables.sort(SSTableReader.maxTimestampDescending);
762777
ClusteringIndexFilter filter = clusteringIndexFilter();
763778
long minTimestamp = Long.MAX_VALUE;
@@ -776,6 +791,9 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
776791
if (memtable.getMinTimestamp() != Memtable.NO_MIN_TIMESTAMP)
777792
minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());
778793

794+
if (rowTransformer != null)
795+
iter = Transformation.apply(iter, rowTransformer.apply(memtable));
796+
779797
// Memtable data is always considered unrepaired
780798
controller.updateMinOldestUnrepairedTombstone(memtable.getMinLocalDeletionTime());
781799
inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false));
@@ -835,6 +853,9 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
835853
UnfilteredRowIterator iter = intersects ? makeRowIteratorWithLowerBound(cfs, sstable, metricsCollector)
836854
: makeRowIteratorWithSkippedNonStaticContent(cfs, sstable, metricsCollector);
837855

856+
if (rowTransformer != null)
857+
iter = Transformation.apply(iter, rowTransformer.apply(sstable.getId()));
858+
838859
inputCollector.addSSTableIterator(sstable, iter);
839860
mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
840861
iter.partitionLevelDeletion().markedForDeleteAt());
@@ -857,6 +878,10 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
857878
{
858879
if (!sstable.isRepaired())
859880
controller.updateMinOldestUnrepairedTombstone(sstable.getMinLocalDeletionTime());
881+
882+
if (rowTransformer != null)
883+
iter = Transformation.apply(iter, rowTransformer.apply(sstable.getId()));
884+
860885
inputCollector.addSSTableIterator(sstable, iter);
861886
includedDueToTombstones++;
862887
mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
@@ -996,11 +1021,8 @@ private boolean queriesMulticellType()
9961021
* no collection or counters are included).
9971022
* This method assumes the filter is a {@code ClusteringIndexNamesFilter}.
9981023
*/
999-
private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, ClusteringIndexNamesFilter filter, ReadExecutionController controller)
1024+
private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, ColumnFamilyStore.ViewFragment view, Function<CellSourceIdentifier, Transformation<BaseRowIterator<?>>> rowTransformer, ClusteringIndexNamesFilter filter, ReadExecutionController controller)
10001025
{
1001-
Tracing.trace("Acquiring sstable references");
1002-
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
1003-
10041026
ImmutableBTreePartition result = null;
10051027
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
10061028

@@ -1012,7 +1034,9 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
10121034
if (iter == null)
10131035
continue;
10141036

1015-
result = add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false),
1037+
UnfilteredRowIterator wrapped = rowTransformer != null ? Transformation.apply(iter, rowTransformer.apply(memtable))
1038+
: iter;
1039+
result = add(RTBoundValidator.validate(wrapped, RTBoundValidator.Stage.MEMTABLE, false),
10161040
result,
10171041
filter,
10181042
false,
@@ -1067,7 +1091,10 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
10671091
}
10681092
else
10691093
{
1070-
result = add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false),
1094+
UnfilteredRowIterator wrapped = rowTransformer != null ? Transformation.apply(iter, rowTransformer.apply(sstable.getId()))
1095+
: iter;
1096+
1097+
result = add(RTBoundValidator.validate(wrapped, RTBoundValidator.Stage.SSTABLE, false),
10711098
result,
10721099
filter,
10731100
sstable.isRepaired(),
@@ -1082,8 +1109,9 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
10821109
{
10831110
if (iter.isEmpty())
10841111
continue;
1085-
1086-
result = add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false),
1112+
UnfilteredRowIterator wrapped = rowTransformer != null ? Transformation.apply(iter, rowTransformer.apply(sstable.getId()))
1113+
: iter;
1114+
result = add(RTBoundValidator.validate(wrapped, RTBoundValidator.Stage.SSTABLE, false),
10871115
result,
10881116
filter,
10891117
sstable.isRepaired(),

src/java/org/apache/cassandra/db/lifecycle/Tracker.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ public Memtable switchMemtable(boolean truncating, Memtable newMemtable)
419419
if (truncating)
420420
notifyRenewed(newMemtable);
421421
else
422-
notifySwitched(result.left.getCurrentMemtable());
422+
notifySwitched(result.left.getCurrentMemtable(), result.right.getCurrentMemtable());
423423

424424
return result.left.getCurrentMemtable();
425425
}
@@ -577,9 +577,9 @@ public void notifyRenewed(Memtable renewed)
577577
notify(new MemtableRenewedNotification(renewed));
578578
}
579579

580-
public void notifySwitched(Memtable previous)
580+
public void notifySwitched(Memtable previous, Memtable next)
581581
{
582-
notify(new MemtableSwitchedNotification(previous));
582+
notify(new MemtableSwitchedNotification(previous, next));
583583
}
584584

585585
public void notifyDiscarded(Memtable discarded)

src/java/org/apache/cassandra/db/memtable/Memtable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import javax.annotation.concurrent.NotThreadSafe;
2626

27+
import org.apache.cassandra.db.CellSourceIdentifier;
2728
import org.apache.cassandra.db.ColumnFamilyStore;
2829
import org.apache.cassandra.db.PartitionPosition;
2930
import org.apache.cassandra.db.RegularAndStaticColumns;
@@ -56,7 +57,7 @@
5657
*
5758
* See Memtable_API.md for details on implementing and using alternative memtable implementations.
5859
*/
59-
public interface Memtable extends Comparable<Memtable>, UnfilteredSource
60+
public interface Memtable extends Comparable<Memtable>, UnfilteredSource, CellSourceIdentifier
6061
{
6162
public static final long NO_MIN_TIMESTAMP = -1;
6263

src/java/org/apache/cassandra/index/sai/QueryContext.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@ public class QueryContext
7171
* */
7272
public boolean hasUnrepairedMatches = false;
7373

74-
private VectorQueryContext vectorContext;
75-
7674
public QueryContext(ReadCommand readCommand, long executionQuotaMs)
7775
{
7876
this.readCommand = readCommand;
@@ -94,10 +92,8 @@ public void checkpoint()
9492
}
9593
}
9694

97-
public VectorQueryContext vectorContext()
95+
public int limit()
9896
{
99-
if (vectorContext == null)
100-
vectorContext = new VectorQueryContext(readCommand);
101-
return vectorContext;
97+
return readCommand.limits().count();
10298
}
10399
}

src/java/org/apache/cassandra/index/sai/StorageAttachedIndexGroup.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.cassandra.notifications.INotificationConsumer;
6464
import org.apache.cassandra.notifications.MemtableDiscardedNotification;
6565
import org.apache.cassandra.notifications.MemtableRenewedNotification;
66+
import org.apache.cassandra.notifications.MemtableSwitchedNotification;
6667
import org.apache.cassandra.notifications.SSTableAddedNotification;
6768
import org.apache.cassandra.notifications.SSTableListChangedNotification;
6869
import org.apache.cassandra.schema.TableMetadata;
@@ -277,6 +278,10 @@ else if (notification instanceof MemtableRenewedNotification)
277278
{
278279
indexes.forEach(index -> index.memtableIndexManager().renewMemtable(((MemtableRenewedNotification) notification).renewed));
279280
}
281+
else if (notification instanceof MemtableSwitchedNotification)
282+
{
283+
indexes.forEach(index -> index.memtableIndexManager().maybeInitializeMemtableIndex(((MemtableSwitchedNotification) notification).next));
284+
}
280285
else if (notification instanceof MemtableDiscardedNotification)
281286
{
282287
indexes.forEach(index -> index.memtableIndexManager().discardMemtable(((MemtableDiscardedNotification) notification).memtable));

0 commit comments

Comments
 (0)