Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/128615.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128615
summary: Fix and test off-heap stats when using direct IO for accessing the raw vectors
area: Vector Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,18 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.SuppressForbidden;
import org.apache.lucene.util.hnsw.RandomVectorScorer;
import org.elasticsearch.index.codec.vectors.reflect.OffHeapStats;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;

import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readSimilarityFunction;
import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readVectorEncoding;

/** Copied from Lucene99FlatVectorsReader in Lucene 10.2, then modified to support DirectIOIndexInputSupplier */
@SuppressForbidden(reason = "Copied from lucene")
public class DirectIOLucene99FlatVectorsReader extends FlatVectorsReader {
public class DirectIOLucene99FlatVectorsReader extends FlatVectorsReader implements OffHeapStats {

private static final boolean USE_DIRECT_IO = Boolean.parseBoolean(System.getProperty("vector.rescoring.directio", "true"));

Expand Down Expand Up @@ -282,6 +284,11 @@ public void close() throws IOException {
IOUtils.close(vectorData);
}

@Override
public Map<String, Long> getOffHeapByteSize(FieldInfo fieldInfo) {
return Map.of(); // no off-heap
}

private record FieldEntry(
VectorSimilarityFunction similarityFunction,
VectorEncoding vectorEncoding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,21 @@

import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
import org.apache.lucene.index.ByteVectorValues;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FloatVectorValues;
import org.apache.lucene.search.KnnCollector;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.hnsw.RandomVectorScorer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.codec.vectors.reflect.OffHeapByteSizeUtils;
import org.elasticsearch.index.codec.vectors.reflect.OffHeapStats;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;

class MergeReaderWrapper extends FlatVectorsReader {
class MergeReaderWrapper extends FlatVectorsReader implements OffHeapStats {

private final FlatVectorsReader mainReader;
private final FlatVectorsReader mergeReader;
Expand Down Expand Up @@ -86,4 +90,9 @@ public void search(String field, byte[] target, KnnCollector knnCollector, Bits
public void close() throws IOException {
IOUtils.close(mainReader, mergeReader);
}

@Override
public Map<String, Long> getOffHeapByteSize(FieldInfo fieldInfo) {
return OffHeapByteSizeUtils.getOffHeapByteSize(mainReader, fieldInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader;
import org.apache.lucene.codecs.lucene99.Lucene99ScalarQuantizedVectorsReader;
import org.apache.lucene.index.FieldInfo;
import org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader;

import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -52,9 +51,6 @@ public static Map<String, Long> getOffHeapByteSize(KnnVectorsReader reader, Fiel
case Lucene99FlatVectorsReader flatVectorsReader -> {
return OffHeapReflectionUtils.getOffHeapByteSizeF99FLT(flatVectorsReader, fieldInfo);
}
case DirectIOLucene99FlatVectorsReader flatVectorsReader -> {
return OffHeapReflectionUtils.getOffHeapByteSizeF99FLT(flatVectorsReader, fieldInfo);
}
case Lucene95HnswVectorsReader lucene95HnswVectorsReader -> {
return OffHeapReflectionUtils.getOffHeapByteSizeL95HNSW(lucene95HnswVectorsReader, fieldInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.VectorEncoding;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
Expand All @@ -47,15 +46,12 @@ private OffHeapReflectionUtils() {}
private static final VarHandle RAW_VECTORS_READER_HNDL_SQ;
private static final MethodHandle GET_FIELD_ENTRY_HANDLE_L99FLT;
private static final MethodHandle VECTOR_DATA_LENGTH_HANDLE_L99FLT;
private static final MethodHandle GET_FIELD_ENTRY_HANDLE_DIOL99FLT;
private static final MethodHandle VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT;
private static final MethodHandle GET_FIELD_ENTRY_HANDLE_L99HNSW;
private static final MethodHandle GET_VECTOR_INDEX_LENGTH_HANDLE_L99HNSW;
private static final VarHandle FLAT_VECTORS_READER_HNDL_L99HNSW;

static final Class<?> L99_SQ_VR_CLS = Lucene99ScalarQuantizedVectorsReader.class;
static final Class<?> L99_FLT_VR_CLS = Lucene99FlatVectorsReader.class;
static final Class<?> DIOL99_FLT_VR_CLS = DirectIOLucene99FlatVectorsReader.class;
static final Class<?> L99_HNSW_VR_CLS = Lucene99HnswVectorsReader.class;

// old codecs
Expand Down Expand Up @@ -100,12 +96,6 @@ private OffHeapReflectionUtils() {}
mt = methodType(cls, String.class, VectorEncoding.class);
GET_FIELD_ENTRY_HANDLE_L99FLT = lookup.findVirtual(L99_FLT_VR_CLS, "getFieldEntry", mt);
VECTOR_DATA_LENGTH_HANDLE_L99FLT = lookup.findVirtual(cls, "vectorDataLength", methodType(long.class));
// DirectIOLucene99FlatVectorsReader
cls = Class.forName("org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader$FieldEntry");
lookup = MethodHandles.privateLookupIn(DIOL99_FLT_VR_CLS, MethodHandles.lookup());
mt = methodType(cls, String.class, VectorEncoding.class);
GET_FIELD_ENTRY_HANDLE_DIOL99FLT = lookup.findVirtual(DIOL99_FLT_VR_CLS, "getFieldEntry", mt);
VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT = lookup.findVirtual(cls, "vectorDataLength", methodType(long.class));
// Lucene99HnswVectorsReader
cls = Class.forName("org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader$FieldEntry");
lookup = MethodHandles.privateLookupIn(L99_HNSW_VR_CLS, MethodHandles.lookup());
Expand Down Expand Up @@ -182,18 +172,6 @@ static Map<String, Long> getOffHeapByteSizeF99FLT(Lucene99FlatVectorsReader read
throw new AssertionError("should not reach here");
}

@SuppressForbidden(reason = "static type is not accessible")
static Map<String, Long> getOffHeapByteSizeF99FLT(DirectIOLucene99FlatVectorsReader reader, FieldInfo fieldInfo) {
try {
var entry = GET_FIELD_ENTRY_HANDLE_DIOL99FLT.invoke(reader, fieldInfo.name, fieldInfo.getVectorEncoding());
long len = (long) VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT.invoke(entry);
return Map.of(FLAT_VECTOR_DATA_EXTENSION, len);
} catch (Throwable t) {
handleThrowable(t);
}
throw new AssertionError("should not reach here");
}

@SuppressForbidden(reason = "static type is not accessible")
static Map<String, Long> getOffHeapByteSizeL99HNSW(Lucene99HnswVectorsReader reader, FieldInfo fieldInfo) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,37 @@
import org.apache.lucene.index.KnnVectorValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.apache.lucene.misc.store.DirectIODirectory;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.KnnFloatVectorQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.tests.index.BaseKnnVectorsFormatTestCase;
import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.tests.util.TestUtil;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.vectors.BQVectorUtils;
import org.elasticsearch.index.codec.vectors.OptimizedScalarQuantizer;
import org.elasticsearch.index.codec.vectors.reflect.OffHeapByteSizeUtils;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.test.IndexSettingsModule;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Locale;
import java.util.OptionalLong;

import static java.lang.String.format;
import static org.apache.lucene.index.VectorSimilarityFunction.DOT_PRODUCT;
Expand Down Expand Up @@ -183,8 +199,28 @@ public void testQuantizedVectorsWriteAndRead() throws IOException {
}

public void testSimpleOffHeapSize() throws IOException {
try (Directory dir = newDirectory()) {
testSimpleOffHeapSizeImpl(dir, newIndexWriterConfig(), true);
}
}

public void testSimpleOffHeapSizeFSDir() throws IOException {
checkDirectIOSupported();
var config = newIndexWriterConfig().setUseCompoundFile(false); // avoid compound files to allow directIO
try (Directory dir = newFSDirectory()) {
testSimpleOffHeapSizeImpl(dir, config, false);
}
}

public void testSimpleOffHeapSizeMMapDir() throws IOException {
try (Directory dir = newMMapDirectory()) {
testSimpleOffHeapSizeImpl(dir, newIndexWriterConfig(), true);
}
}

public void testSimpleOffHeapSizeImpl(Directory dir, IndexWriterConfig config, boolean expectVecOffHeap) throws IOException {
float[] vector = randomVector(random().nextInt(12, 500));
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) {
try (IndexWriter w = new IndexWriter(dir, config)) {
Document doc = new Document();
doc.add(new KnnFloatVectorField("f", vector, DOT_PRODUCT));
w.addDocument(doc);
Expand All @@ -198,11 +234,54 @@ public void testSimpleOffHeapSize() throws IOException {
}
var fieldInfo = r.getFieldInfos().fieldInfo("f");
var offHeap = OffHeapByteSizeUtils.getOffHeapByteSize(knnVectorsReader, fieldInfo);
assertEquals(2, offHeap.size());
assertEquals(vector.length * Float.BYTES, (long) offHeap.get("vec"));
assertEquals(expectVecOffHeap ? 2 : 1, offHeap.size());
assertTrue(offHeap.get("veb") > 0L);
if (expectVecOffHeap) {
assertEquals(vector.length * Float.BYTES, (long) offHeap.get("vec"));
}
}
}
}
}

static Directory newMMapDirectory() throws IOException {
Directory dir = new MMapDirectory(createTempDir("ES818BinaryQuantizedVectorsFormatTests"));
if (random().nextBoolean()) {
dir = new MockDirectoryWrapper(random(), dir);
}
return dir;
}

private Directory newFSDirectory() throws IOException {
Settings settings = Settings.builder()
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.HYBRIDFS.name().toLowerCase(Locale.ROOT))
.build();
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("foo", settings);
Path tempDir = createTempDir().resolve(idxSettings.getUUID()).resolve("0");
Files.createDirectories(tempDir);
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(idxSettings.getIndex(), 0));
Directory dir = (new FsDirectoryFactory()).newDirectory(idxSettings, path);
if (random().nextBoolean()) {
dir = new MockDirectoryWrapper(random(), dir);
}
return dir;
}

static void checkDirectIOSupported() {
Path path = createTempDir("directIOProbe");
try (Directory dir = open(path); IndexOutput out = dir.createOutput("out", IOContext.DEFAULT)) {
out.writeString("test");
} catch (IOException e) {
assumeNoException("test requires a filesystem that supports Direct IO", e);
}
}

static DirectIODirectory open(Path path) throws IOException {
return new DirectIODirectory(FSDirectory.open(path)) {
@Override
protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) {
return true;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,37 @@
import org.apache.lucene.index.FloatVectorValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KnnVectorValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.apache.lucene.misc.store.DirectIODirectory;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.tests.index.BaseKnnVectorsFormatTestCase;
import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.SameThreadExecutorService;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.vectors.reflect.OffHeapByteSizeUtils;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.test.IndexSettingsModule;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Locale;
import java.util.OptionalLong;

import static java.lang.String.format;
import static org.apache.lucene.index.VectorSimilarityFunction.DOT_PRODUCT;
Expand Down Expand Up @@ -134,8 +151,28 @@ public void testVectorSimilarityFuncs() {
}

public void testSimpleOffHeapSize() throws IOException {
try (Directory dir = newDirectory()) {
testSimpleOffHeapSizeImpl(dir, newIndexWriterConfig(), true);
}
}

public void testSimpleOffHeapSizeFSDir() throws IOException {
checkDirectIOSupported();
var config = newIndexWriterConfig().setUseCompoundFile(false); // avoid compound files to allow directIO
try (Directory dir = newFSDirectory()) {
testSimpleOffHeapSizeImpl(dir, config, false);
}
}

public void testSimpleOffHeapSizeMMapDir() throws IOException {
try (Directory dir = newMMapDirectory()) {
testSimpleOffHeapSizeImpl(dir, newIndexWriterConfig(), true);
}
}

public void testSimpleOffHeapSizeImpl(Directory dir, IndexWriterConfig config, boolean expectVecOffHeap) throws IOException {
float[] vector = randomVector(random().nextInt(12, 500));
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) {
try (IndexWriter w = new IndexWriter(dir, config)) {
Document doc = new Document();
doc.add(new KnnFloatVectorField("f", vector, DOT_PRODUCT));
w.addDocument(doc);
Expand All @@ -149,12 +186,55 @@ public void testSimpleOffHeapSize() throws IOException {
}
var fieldInfo = r.getFieldInfos().fieldInfo("f");
var offHeap = OffHeapByteSizeUtils.getOffHeapByteSize(knnVectorsReader, fieldInfo);
assertEquals(3, offHeap.size());
assertEquals(vector.length * Float.BYTES, (long) offHeap.get("vec"));
assertEquals(expectVecOffHeap ? 3 : 2, offHeap.size());
assertEquals(1L, (long) offHeap.get("vex"));
assertTrue(offHeap.get("veb") > 0L);
if (expectVecOffHeap) {
assertEquals(vector.length * Float.BYTES, (long) offHeap.get("vec"));
}
}
}
}
}

static Directory newMMapDirectory() throws IOException {
Directory dir = new MMapDirectory(createTempDir("ES818BinaryQuantizedVectorsFormatTests"));
if (random().nextBoolean()) {
dir = new MockDirectoryWrapper(random(), dir);
}
return dir;
}

private Directory newFSDirectory() throws IOException {
Settings settings = Settings.builder()
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.HYBRIDFS.name().toLowerCase(Locale.ROOT))
.build();
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("foo", settings);
Path tempDir = createTempDir().resolve(idxSettings.getUUID()).resolve("0");
Files.createDirectories(tempDir);
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(idxSettings.getIndex(), 0));
Directory dir = (new FsDirectoryFactory()).newDirectory(idxSettings, path);
if (random().nextBoolean()) {
dir = new MockDirectoryWrapper(random(), dir);
}
return dir;
}

static void checkDirectIOSupported() {
Path path = createTempDir("directIOProbe");
try (Directory dir = open(path); IndexOutput out = dir.createOutput("out", IOContext.DEFAULT)) {
out.writeString("test");
} catch (IOException e) {
assumeNoException("test requires a filesystem that supports Direct IO", e);
}
}

static DirectIODirectory open(Path path) throws IOException {
return new DirectIODirectory(FSDirectory.open(path)) {
@Override
protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) {
return true;
}
};
}
}
Loading