Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/128615.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128615
summary: Fix and test off-heap stats when using direct IO for accessing the raw vectors
area: Vector Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,18 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.SuppressForbidden;
import org.apache.lucene.util.hnsw.RandomVectorScorer;
import org.elasticsearch.index.codec.vectors.reflect.OffHeapStats;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;

import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readSimilarityFunction;
import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readVectorEncoding;

/** Copied from Lucene99FlatVectorsReader in Lucene 10.2, then modified to support DirectIOIndexInputSupplier */
@SuppressForbidden(reason = "Copied from lucene")
public class DirectIOLucene99FlatVectorsReader extends FlatVectorsReader {
public class DirectIOLucene99FlatVectorsReader extends FlatVectorsReader implements OffHeapStats {

private static final boolean USE_DIRECT_IO = Boolean.parseBoolean(System.getProperty("vector.rescoring.directio", "true"));

Expand Down Expand Up @@ -282,6 +284,11 @@ public void close() throws IOException {
IOUtils.close(vectorData);
}

@Override
public Map<String, Long> getOffHeapByteSize(FieldInfo fieldInfo) {
return Map.of(); // no off-heap
}

private record FieldEntry(
VectorSimilarityFunction similarityFunction,
VectorEncoding vectorEncoding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,21 @@

import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
import org.apache.lucene.index.ByteVectorValues;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FloatVectorValues;
import org.apache.lucene.search.KnnCollector;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.hnsw.RandomVectorScorer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.codec.vectors.reflect.OffHeapByteSizeUtils;
import org.elasticsearch.index.codec.vectors.reflect.OffHeapStats;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;

class MergeReaderWrapper extends FlatVectorsReader {
class MergeReaderWrapper extends FlatVectorsReader implements OffHeapStats {

private final FlatVectorsReader mainReader;
private final FlatVectorsReader mergeReader;
Expand Down Expand Up @@ -86,4 +90,9 @@ public void search(String field, byte[] target, KnnCollector knnCollector, Bits
public void close() throws IOException {
IOUtils.close(mainReader, mergeReader);
}

@Override
public Map<String, Long> getOffHeapByteSize(FieldInfo fieldInfo) {
return OffHeapByteSizeUtils.getOffHeapByteSize(mainReader, fieldInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader;
import org.apache.lucene.codecs.lucene99.Lucene99ScalarQuantizedVectorsReader;
import org.apache.lucene.index.FieldInfo;
import org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader;

import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -52,9 +51,6 @@ public static Map<String, Long> getOffHeapByteSize(KnnVectorsReader reader, Fiel
case Lucene99FlatVectorsReader flatVectorsReader -> {
return OffHeapReflectionUtils.getOffHeapByteSizeF99FLT(flatVectorsReader, fieldInfo);
}
case DirectIOLucene99FlatVectorsReader flatVectorsReader -> {
return OffHeapReflectionUtils.getOffHeapByteSizeF99FLT(flatVectorsReader, fieldInfo);
}
case Lucene95HnswVectorsReader lucene95HnswVectorsReader -> {
return OffHeapReflectionUtils.getOffHeapByteSizeL95HNSW(lucene95HnswVectorsReader, fieldInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.VectorEncoding;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
Expand All @@ -47,15 +46,12 @@ private OffHeapReflectionUtils() {}
private static final VarHandle RAW_VECTORS_READER_HNDL_SQ;
private static final MethodHandle GET_FIELD_ENTRY_HANDLE_L99FLT;
private static final MethodHandle VECTOR_DATA_LENGTH_HANDLE_L99FLT;
private static final MethodHandle GET_FIELD_ENTRY_HANDLE_DIOL99FLT;
private static final MethodHandle VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT;
private static final MethodHandle GET_FIELD_ENTRY_HANDLE_L99HNSW;
private static final MethodHandle GET_VECTOR_INDEX_LENGTH_HANDLE_L99HNSW;
private static final VarHandle FLAT_VECTORS_READER_HNDL_L99HNSW;

static final Class<?> L99_SQ_VR_CLS = Lucene99ScalarQuantizedVectorsReader.class;
static final Class<?> L99_FLT_VR_CLS = Lucene99FlatVectorsReader.class;
static final Class<?> DIOL99_FLT_VR_CLS = DirectIOLucene99FlatVectorsReader.class;
static final Class<?> L99_HNSW_VR_CLS = Lucene99HnswVectorsReader.class;

// old codecs
Expand Down Expand Up @@ -100,12 +96,6 @@ private OffHeapReflectionUtils() {}
mt = methodType(cls, String.class, VectorEncoding.class);
GET_FIELD_ENTRY_HANDLE_L99FLT = lookup.findVirtual(L99_FLT_VR_CLS, "getFieldEntry", mt);
VECTOR_DATA_LENGTH_HANDLE_L99FLT = lookup.findVirtual(cls, "vectorDataLength", methodType(long.class));
// DirectIOLucene99FlatVectorsReader
cls = Class.forName("org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader$FieldEntry");
lookup = MethodHandles.privateLookupIn(DIOL99_FLT_VR_CLS, MethodHandles.lookup());
mt = methodType(cls, String.class, VectorEncoding.class);
GET_FIELD_ENTRY_HANDLE_DIOL99FLT = lookup.findVirtual(DIOL99_FLT_VR_CLS, "getFieldEntry", mt);
VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT = lookup.findVirtual(cls, "vectorDataLength", methodType(long.class));
// Lucene99HnswVectorsReader
cls = Class.forName("org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader$FieldEntry");
lookup = MethodHandles.privateLookupIn(L99_HNSW_VR_CLS, MethodHandles.lookup());
Expand Down Expand Up @@ -182,18 +172,6 @@ static Map<String, Long> getOffHeapByteSizeF99FLT(Lucene99FlatVectorsReader read
throw new AssertionError("should not reach here");
}

@SuppressForbidden(reason = "static type is not accessible")
static Map<String, Long> getOffHeapByteSizeF99FLT(DirectIOLucene99FlatVectorsReader reader, FieldInfo fieldInfo) {
try {
var entry = GET_FIELD_ENTRY_HANDLE_DIOL99FLT.invoke(reader, fieldInfo.name, fieldInfo.getVectorEncoding());
long len = (long) VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT.invoke(entry);
return Map.of(FLAT_VECTOR_DATA_EXTENSION, len);
} catch (Throwable t) {
handleThrowable(t);
}
throw new AssertionError("should not reach here");
}

@SuppressForbidden(reason = "static type is not accessible")
static Map<String, Long> getOffHeapByteSizeL99HNSW(Lucene99HnswVectorsReader reader, FieldInfo fieldInfo) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,25 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.tests.index.BaseKnnVectorsFormatTestCase;
import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.tests.util.TestUtil;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.vectors.BQVectorUtils;
import org.elasticsearch.index.codec.vectors.OptimizedScalarQuantizer;
import org.elasticsearch.index.codec.vectors.reflect.OffHeapByteSizeUtils;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.test.IndexSettingsModule;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Locale;

import static java.lang.String.format;
Expand All @@ -57,6 +68,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;

@com.carrotsearch.randomizedtesting.annotations.Repeat(iterations = 100)
public class ES818BinaryQuantizedVectorsFormatTests extends BaseKnnVectorsFormatTestCase {

static {
Expand Down Expand Up @@ -183,8 +195,27 @@ public void testQuantizedVectorsWriteAndRead() throws IOException {
}

public void testSimpleOffHeapSize() throws IOException {
try (Directory dir = newDirectory()) {
testSimpleOffHeapSizeImpl(dir, newIndexWriterConfig(), true);
}
}

public void testSimpleOffHeapSizeFSDir() throws IOException {
var config = newIndexWriterConfig().setUseCompoundFile(false); // avoid compound files to allow directIO
try (Directory dir = newFSDirectory()) {
testSimpleOffHeapSizeImpl(dir, config, false);
}
}

public void testSimpleOffHeapSizeMMapDir() throws IOException {
try (Directory dir = newMMapDirectory()) {
testSimpleOffHeapSizeImpl(dir, newIndexWriterConfig(), true);
}
}

public void testSimpleOffHeapSizeImpl(Directory dir, IndexWriterConfig config, boolean expectVecOffHeap) throws IOException {
float[] vector = randomVector(random().nextInt(12, 500));
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) {
try (IndexWriter w = new IndexWriter(dir, config)) {
Document doc = new Document();
doc.add(new KnnFloatVectorField("f", vector, DOT_PRODUCT));
w.addDocument(doc);
Expand All @@ -198,11 +229,36 @@ public void testSimpleOffHeapSize() throws IOException {
}
var fieldInfo = r.getFieldInfos().fieldInfo("f");
var offHeap = OffHeapByteSizeUtils.getOffHeapByteSize(knnVectorsReader, fieldInfo);
assertEquals(2, offHeap.size());
assertEquals(vector.length * Float.BYTES, (long) offHeap.get("vec"));
assertEquals(expectVecOffHeap ? 2 : 1, offHeap.size());
assertTrue(offHeap.get("veb") > 0L);
if (expectVecOffHeap) {
assertEquals(vector.length * Float.BYTES, (long) offHeap.get("vec"));
}
}
}
}
}

static Directory newMMapDirectory() throws IOException {
Directory dir = new MMapDirectory(createTempDir("ES818BinaryQuantizedVectorsFormatTests"));
if (random().nextBoolean()) {
dir = new MockDirectoryWrapper(random(), dir);
}
return dir;
}

private Directory newFSDirectory() throws IOException {
Settings settings = Settings.builder()
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.HYBRIDFS.name().toLowerCase(Locale.ROOT))
.build();
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("foo", settings);
Path tempDir = createTempDir().resolve(idxSettings.getUUID()).resolve("0");
Files.createDirectories(tempDir);
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(idxSettings.getIndex(), 0));
Directory dir = (new FsDirectoryFactory()).newDirectory(idxSettings, path);
if (random().nextBoolean()) {
dir = new MockDirectoryWrapper(random(), dir);
}
return dir;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,30 @@
import org.apache.lucene.index.FloatVectorValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KnnVectorValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.tests.index.BaseKnnVectorsFormatTestCase;
import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.SameThreadExecutorService;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.vectors.reflect.OffHeapByteSizeUtils;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.test.IndexSettingsModule;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Locale;

Expand Down Expand Up @@ -134,8 +146,27 @@ public void testVectorSimilarityFuncs() {
}

public void testSimpleOffHeapSize() throws IOException {
try (Directory dir = newDirectory()) {
testSimpleOffHeapSizeImpl(dir, newIndexWriterConfig(), true);
}
}

public void testSimpleOffHeapSizeFSDir() throws IOException {
var config = newIndexWriterConfig().setUseCompoundFile(false); // avoid compound files to allow directIO
try (Directory dir = newFSDirectory()) {
testSimpleOffHeapSizeImpl(dir, config, false);
}
}

public void testSimpleOffHeapSizeMMapDir() throws IOException {
try (Directory dir = newMMapDirectory()) {
testSimpleOffHeapSizeImpl(dir, newIndexWriterConfig(), true);
}
}

public void testSimpleOffHeapSizeImpl(Directory dir, IndexWriterConfig config, boolean expectVecOffHeap) throws IOException {
float[] vector = randomVector(random().nextInt(12, 500));
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) {
try (IndexWriter w = new IndexWriter(dir, config)) {
Document doc = new Document();
doc.add(new KnnFloatVectorField("f", vector, DOT_PRODUCT));
w.addDocument(doc);
Expand All @@ -149,12 +180,37 @@ public void testSimpleOffHeapSize() throws IOException {
}
var fieldInfo = r.getFieldInfos().fieldInfo("f");
var offHeap = OffHeapByteSizeUtils.getOffHeapByteSize(knnVectorsReader, fieldInfo);
assertEquals(3, offHeap.size());
assertEquals(vector.length * Float.BYTES, (long) offHeap.get("vec"));
assertEquals(expectVecOffHeap ? 3 : 2, offHeap.size());
assertEquals(1L, (long) offHeap.get("vex"));
assertTrue(offHeap.get("veb") > 0L);
if (expectVecOffHeap) {
assertEquals(vector.length * Float.BYTES, (long) offHeap.get("vec"));
}
}
}
}
}

static Directory newMMapDirectory() throws IOException {
Directory dir = new MMapDirectory(createTempDir("ES818BinaryQuantizedVectorsFormatTests"));
if (random().nextBoolean()) {
dir = new MockDirectoryWrapper(random(), dir);
}
return dir;
}

private Directory newFSDirectory() throws IOException {
Settings settings = Settings.builder()
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.HYBRIDFS.name().toLowerCase(Locale.ROOT))
.build();
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("foo", settings);
Path tempDir = createTempDir().resolve(idxSettings.getUUID()).resolve("0");
Files.createDirectories(tempDir);
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(idxSettings.getIndex(), 0));
Directory dir = (new FsDirectoryFactory()).newDirectory(idxSettings, path);
if (random().nextBoolean()) {
dir = new MockDirectoryWrapper(random(), dir);
}
return dir;
}
}
Loading