From 859ec0fd31132bd77d31d1677c23ab2137114cdb Mon Sep 17 00:00:00 2001 From: ChrisHegarty Date: Mon, 26 May 2025 14:44:08 +0100 Subject: [PATCH 1/2] [9.x] Add an integration test to verify DirectIO is used for at least one case --- .../elasticsearch/index/store/DirectIOIT.java | 92 +++++++++++++++++++ .../DirectIOLucene99FlatVectorsReader.java | 5 +- .../index/store/FsDirectoryFactory.java | 2 +- 3 files changed, 96 insertions(+), 3 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/index/store/DirectIOIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/store/DirectIOIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/store/DirectIOIT.java new file mode 100644 index 0000000000000..b5ba406b68abf --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/store/DirectIOIT.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.store; + +import org.apache.logging.log4j.Level; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.vectors.KnnSearchBuilder; +import org.elasticsearch.search.vectors.VectorData; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.test.junit.annotations.TestLogging; + +import java.util.Collection; +import java.util.List; +import java.util.stream.IntStream; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; + +@LuceneTestCase.SuppressCodecs("*") // only use our own codecs +public class DirectIOIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(InternalSettingsPlugin.class); + } + + private void indexVectors() { + assertAcked( + prepareCreate("foo-vectors").setSettings(Settings.builder().put(InternalSettingsPlugin.USE_COMPOUND_FILE.getKey(), false)) + .setMapping(""" + { + "properties": { + "fooVector": { + "type": "dense_vector", + "dims": 64, + "element_type": "float", + "index": true, + "similarity": "l2_norm", + "index_options": { + "type": "bbq_flat" + } + } + } + } + """) + ); + ensureGreen("foo-vectors"); + + for (int i = 0; i < 1000; i++) { + indexDoc("foo-vectors", Integer.toString(i), "fooVector", IntStream.range(0, 64).mapToDouble(d -> randomFloat()).toArray()); + } + refresh(); + } + + @TestLogging(value = "org.elasticsearch.index.store.FsDirectoryFactory:DEBUG", reason = "to capture trace logging for direct IO") + public void testDirectIOUsed() { + try (MockLog mockLog = MockLog.capture(FsDirectoryFactory.class)) { + // we're just looking for some evidence direct IO is used + mockLog.addExpectation( + new MockLog.PatternSeenEventExpectation( + "Direct IO used", + FsDirectoryFactory.class.getCanonicalName(), + Level.DEBUG, + "Opening .*\\.vec with direct IO" + ) + ); + + indexVectors(); + + // do a search + var knn = List.of(new KnnSearchBuilder("fooVector", new VectorData(null, new byte[64]), 10, 20, null, null)); + assertHitCount(prepareSearch("foo-vectors").setKnnSearch(knn), 10); + mockLog.assertAllExpectationsMatched(); + } + } + + @Override + protected boolean addMockFSIndexStore() { + return false; // we require to always use the "real" hybrid directory + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsReader.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsReader.java index 61426d9f33008..8de208628025b 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsReader.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsReader.java @@ -36,6 +36,7 @@ import org.apache.lucene.index.VectorSimilarityFunction; import org.apache.lucene.internal.hppc.IntObjectHashMap; import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.ReadAdvice; @@ -86,7 +87,7 @@ public DirectIOLucene99FlatVectorsReader(SegmentReadState state, FlatVectorsScor } public static boolean shouldUseDirectIO(SegmentReadState state) { - return USE_DIRECT_IO && state.directory instanceof DirectIOIndexInputSupplier; + return USE_DIRECT_IO && FilterDirectory.unwrap(state.directory) instanceof DirectIOIndexInputSupplier; } private int readMetadata(SegmentReadState state) throws IOException { @@ -126,7 +127,7 @@ private static IndexInput openDataInput( ) throws IOException { String fileName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, fileExtension); // use direct IO for accessing raw vector data for searches - IndexInput in = USE_DIRECT_IO && state.directory instanceof DirectIOIndexInputSupplier did + IndexInput in = USE_DIRECT_IO && FilterDirectory.unwrap(state.directory) instanceof DirectIOIndexInputSupplier did ? did.openInputDirect(fileName, context) : state.directory.openInput(fileName, context); boolean success = false; diff --git a/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java b/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java index 9c186c74102d7..a3ab206191f9e 100644 --- a/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java +++ b/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java @@ -195,7 +195,7 @@ public IndexInput openInputDirect(String name, IOContext context) throws IOExcep // we need to do these checks on the outer directory since the inner doesn't know about pending deletes ensureOpen(); ensureCanRead(name); - + Log.debug("Opening {} with direct IO", name); return directIODelegate.openInput(name, context); } From 453c42f0bd7f162067c15ba35c38d06e619137a7 Mon Sep 17 00:00:00 2001 From: ChrisHegarty Date: Tue, 27 May 2025 17:53:03 +0100 Subject: [PATCH 2/2] itr --- .../elasticsearch/index/store/DirectIOIT.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/store/DirectIOIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/store/DirectIOIT.java index b5ba406b68abf..cfb7de6c81d88 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/store/DirectIOIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/store/DirectIOIT.java @@ -10,6 +10,11 @@ package org.elasticsearch.index.store; import org.apache.logging.log4j.Level; +import org.apache.lucene.misc.store.DirectIODirectory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; @@ -19,9 +24,13 @@ import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.MockLog; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.junit.BeforeClass; +import java.io.IOException; +import java.nio.file.Path; import java.util.Collection; import java.util.List; +import java.util.OptionalLong; import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -30,6 +39,25 @@ @LuceneTestCase.SuppressCodecs("*") // only use our own codecs public class DirectIOIT extends ESIntegTestCase { + @BeforeClass + public static void checkSupported() throws IOException { + Path path = createTempDir("directIOProbe"); + try (Directory dir = open(path); IndexOutput out = dir.createOutput("out", IOContext.DEFAULT)) { + out.writeString("test"); + } catch (IOException e) { + assumeNoException("test requires filesystem that supports Direct IO", e); + } + } + + static DirectIODirectory open(Path path) throws IOException { + return new DirectIODirectory(FSDirectory.open(path)) { + @Override + protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) { + return true; + } + }; + } + @Override protected Collection> nodePlugins() { return List.of(InternalSettingsPlugin.class);