Skip to content

Commit 4e72b9a

Browse files
[9.0] [ML] Prevent retention classes from failing when deleting documents in read-only indices (#125408) (#128747)
* [ML] Prevent retention classes from failing when deleting documents in read-only indices (#125408) Classes like UnusedStatsRemover delete orphaned documents without an associated job. When the indices are made read-only it will start failing as read-only means no delete. This PR ensures that the non-writable indices are not included in the delete-by-query requests. (cherry picked from commit c822a57) * [CI] Auto commit changes from spotless * fit build failure --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent 97d0821 commit 4e72b9a

19 files changed

+546
-109
lines changed

docs/changelog/125408.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 125408
2+
summary: Prevent ML data retention logic from failing when deleting documents in read-only
3+
indices
4+
area: Machine Learning
5+
type: bug
6+
issues: []

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/UnusedStatsRemoverIT.java

Lines changed: 90 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.index.IndexRequest;
1111
import org.elasticsearch.action.support.PlainActionFuture;
1212
import org.elasticsearch.client.internal.OriginSettingClient;
13+
import org.elasticsearch.common.settings.Settings;
1314
import org.elasticsearch.common.unit.ByteSizeValue;
1415
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
1516
import org.elasticsearch.tasks.TaskId;
@@ -63,91 +64,118 @@ public void createComponents() {
6364
}
6465

6566
public void testRemoveUnusedStats() throws Exception {
67+
String modelId = "model-with-stats";
68+
putDFA(modelId);
6669

67-
prepareIndex("foo").setId("some-empty-doc").setSource("{}", XContentType.JSON).get();
68-
69-
PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(
70-
new DataFrameAnalyticsConfig.Builder().setId("analytics-with-stats")
71-
.setModelMemoryLimit(ByteSizeValue.ofGb(1))
72-
.setSource(new DataFrameAnalyticsSource(new String[] { "foo" }, null, null, null))
73-
.setDest(new DataFrameAnalyticsDest("bar", null))
74-
.setAnalysis(new Regression("prediction"))
75-
.build()
76-
);
77-
client.execute(PutDataFrameAnalyticsAction.INSTANCE, request).actionGet();
78-
79-
client.execute(
80-
PutTrainedModelAction.INSTANCE,
81-
new PutTrainedModelAction.Request(
82-
TrainedModelConfig.builder()
83-
.setModelId("model-with-stats")
84-
.setInferenceConfig(RegressionConfig.EMPTY_PARAMS)
85-
.setInput(new TrainedModelInput(Arrays.asList("foo", "bar")))
86-
.setParsedDefinition(
87-
new TrainedModelDefinition.Builder().setPreProcessors(Collections.emptyList())
88-
.setTrainedModel(
89-
Tree.builder()
90-
.setFeatureNames(Arrays.asList("foo", "bar"))
91-
.setRoot(TreeNode.builder(0).setLeafValue(42))
92-
.build()
93-
)
94-
)
95-
.validate(true)
96-
.build(),
97-
false
98-
)
99-
).actionGet();
100-
70+
// Existing analytics and models
10171
indexStatDocument(new DataCounts("analytics-with-stats", 1, 1, 1), DataCounts.documentId("analytics-with-stats"));
102-
indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1), DataCounts.documentId("missing-analytics-with-stats"));
72+
indexStatDocument(new InferenceStats(1, 1, 1, 1, modelId, "test", Instant.now()), InferenceStats.docId(modelId, "test"));
10373
indexStatDocument(
10474
new InferenceStats(1, 1, 1, 1, TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test", Instant.now()),
10575
InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")
10676
);
77+
78+
// Unused analytics/model stats
79+
indexStatDocument(new DataCounts("missing-analytics-with-stats", 1, 1, 1), DataCounts.documentId("missing-analytics-with-stats"));
10780
indexStatDocument(
10881
new InferenceStats(1, 1, 1, 1, "missing-model", "test", Instant.now()),
10982
InferenceStats.docId("missing-model", "test")
11083
);
84+
85+
refreshStatsIndex();
86+
runUnusedStatsRemover();
87+
88+
final String index = MlStatsIndex.TEMPLATE_NAME + "-000001";
89+
90+
// Validate expected docs
91+
assertDocExists(index, InferenceStats.docId(modelId, "test"));
92+
assertDocExists(index, DataCounts.documentId("analytics-with-stats"));
93+
assertDocExists(index, InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test"));
94+
95+
// Validate removed docs
96+
assertDocDoesNotExist(index, InferenceStats.docId("missing-model", "test"));
97+
assertDocDoesNotExist(index, DataCounts.documentId("missing-analytics-with-stats"));
98+
}
99+
100+
public void testRemovingUnusedStatsFromReadOnlyIndexShouldFailSilently() throws Exception {
101+
String modelId = "model-with-stats";
102+
putDFA(modelId);
103+
111104
indexStatDocument(
112-
new InferenceStats(1, 1, 1, 1, "model-with-stats", "test", Instant.now()),
113-
InferenceStats.docId("model-with-stats", "test")
105+
new InferenceStats(1, 1, 1, 1, "missing-model", "test", Instant.now()),
106+
InferenceStats.docId("missing-model", "test")
114107
);
115-
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
108+
makeIndexReadOnly();
109+
refreshStatsIndex();
116110

117-
PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
118-
UnusedStatsRemover statsRemover = new UnusedStatsRemover(client, new TaskId("test", 0L));
119-
statsRemover.remove(10000.0f, deletionListener, () -> false);
120-
deletionListener.actionGet();
111+
runUnusedStatsRemover();
112+
refreshStatsIndex();
121113

122-
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
114+
final String index = MlStatsIndex.TEMPLATE_NAME + "-000001";
115+
assertDocExists(index, InferenceStats.docId("missing-model", "test")); // should still exist
116+
}
123117

124-
final String initialStateIndex = MlStatsIndex.TEMPLATE_NAME + "-000001";
118+
private void putDFA(String modelId) {
119+
prepareIndex("foo").setId("some-empty-doc").setSource("{}", XContentType.JSON).get();
125120

126-
// Make sure that stats that should exist still exist
127-
assertTrue(client().prepareGet(initialStateIndex, InferenceStats.docId("model-with-stats", "test")).get().isExists());
128-
assertTrue(
129-
client().prepareGet(
130-
initialStateIndex,
131-
InferenceStats.docId(TrainedModelProvider.MODELS_STORED_AS_RESOURCE.iterator().next(), "test")
132-
).get().isExists()
121+
PutDataFrameAnalyticsAction.Request analyticsRequest = new PutDataFrameAnalyticsAction.Request(
122+
new DataFrameAnalyticsConfig.Builder().setId("analytics-with-stats")
123+
.setModelMemoryLimit(ByteSizeValue.ofGb(1))
124+
.setSource(new DataFrameAnalyticsSource(new String[] { "foo" }, null, null, null))
125+
.setDest(new DataFrameAnalyticsDest("bar", null))
126+
.setAnalysis(new Regression("prediction"))
127+
.build()
133128
);
134-
assertTrue(client().prepareGet(initialStateIndex, DataCounts.documentId("analytics-with-stats")).get().isExists());
135-
136-
// make sure that unused stats were deleted
137-
assertFalse(client().prepareGet(initialStateIndex, DataCounts.documentId("missing-analytics-with-stats")).get().isExists());
138-
assertFalse(client().prepareGet(initialStateIndex, InferenceStats.docId("missing-model", "test")).get().isExists());
129+
client.execute(PutDataFrameAnalyticsAction.INSTANCE, analyticsRequest).actionGet();
130+
131+
TrainedModelDefinition.Builder definition = new TrainedModelDefinition.Builder().setPreProcessors(Collections.emptyList())
132+
.setTrainedModel(
133+
Tree.builder().setFeatureNames(Arrays.asList("foo", "bar")).setRoot(TreeNode.builder(0).setLeafValue(42)).build()
134+
);
135+
136+
TrainedModelConfig modelConfig = TrainedModelConfig.builder()
137+
.setModelId(modelId)
138+
.setInferenceConfig(RegressionConfig.EMPTY_PARAMS)
139+
.setInput(new TrainedModelInput(Arrays.asList("foo", "bar")))
140+
.setParsedDefinition(definition)
141+
.validate(true)
142+
.build();
143+
144+
client.execute(PutTrainedModelAction.INSTANCE, new PutTrainedModelAction.Request(modelConfig, false)).actionGet();
139145
}
140146

141147
private void indexStatDocument(ToXContentObject object, String docId) throws Exception {
142-
ToXContent.Params params = new ToXContent.MapParams(
143-
Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, Boolean.toString(true))
144-
);
145-
IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias());
146-
doc.id(docId);
148+
IndexRequest doc = new IndexRequest(MlStatsIndex.writeAlias()).id(docId);
147149
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
148-
object.toXContent(builder, params);
150+
object.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")));
149151
doc.source(builder);
150152
client.index(doc).actionGet();
151153
}
152154
}
155+
156+
private void refreshStatsIndex() {
157+
client().admin().indices().prepareRefresh(MlStatsIndex.indexPattern()).get();
158+
}
159+
160+
private void runUnusedStatsRemover() {
161+
PlainActionFuture<Boolean> deletionListener = new PlainActionFuture<>();
162+
new UnusedStatsRemover(client, new TaskId("test", 0L)).remove(10000.0f, deletionListener, () -> false);
163+
deletionListener.actionGet();
164+
}
165+
166+
private void makeIndexReadOnly() {
167+
client().admin()
168+
.indices()
169+
.prepareUpdateSettings(MlStatsIndex.indexPattern())
170+
.setSettings(Settings.builder().put("index.blocks.write", true))
171+
.get();
172+
}
173+
174+
private void assertDocExists(String index, String docId) {
175+
assertTrue(client().prepareGet(index, docId).get().isExists());
176+
}
177+
178+
private void assertDocDoesNotExist(String index, String docId) {
179+
assertFalse(client().prepareGet(index, docId).get().isExists());
180+
}
153181
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@
365365
import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcessFactory;
366366
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
367367
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
368+
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
368369
import org.elasticsearch.xpack.ml.job.snapshot.upgrader.SnapshotUpgradeTaskExecutor;
369370
import org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutor;
370371
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
@@ -922,6 +923,9 @@ public Collection<?> createComponents(PluginServices services) {
922923
IndexNameExpressionResolver indexNameExpressionResolver = services.indexNameExpressionResolver();
923924
TelemetryProvider telemetryProvider = services.telemetryProvider();
924925

926+
// Initialize WritableIndexExpander
927+
WritableIndexExpander.initialize(clusterService, indexNameExpressionResolver);
928+
925929
if (enabled == false) {
926930
// Holders for @link(MachineLearningFeatureSetUsage) which needs access to job manager and ML extension,
927931
// both empty if ML is disabled

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ private List<MlDataRemover> createDataRemovers(
240240
TaskId parentTaskId,
241241
AnomalyDetectionAuditor anomalyDetectionAuditor
242242
) {
243+
243244
return Arrays.asList(
244245
new ExpiredResultsRemover(
245246
originClient,
@@ -252,8 +253,8 @@ private List<MlDataRemover> createDataRemovers(
252253
new ExpiredModelSnapshotsRemover(
253254
originClient,
254255
new WrappedBatchedJobsIterator(new SearchAfterJobsIterator(originClient)),
255-
threadPool,
256256
parentTaskId,
257+
threadPool,
257258
jobResultsProvider,
258259
anomalyDetectionAuditor
259260
),
@@ -277,8 +278,8 @@ private List<MlDataRemover> createDataRemovers(List<Job> jobs, TaskId parentTask
277278
new ExpiredModelSnapshotsRemover(
278279
client,
279280
new VolatileCursorIterator<>(jobs),
280-
threadPool,
281281
parentTaskId,
282+
threadPool,
282283
jobResultsProvider,
283284
anomalyDetectionAuditor
284285
),

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.elasticsearch.xpack.core.ml.job.results.Result;
6767
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
6868
import org.elasticsearch.xpack.core.security.user.InternalUsers;
69+
import org.elasticsearch.xpack.ml.job.retention.WritableIndexExpander;
6970
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
7071

7172
import java.util.ArrayList;
@@ -132,7 +133,10 @@ public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListe
132133
indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
133134
}
134135

135-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indices.toArray(new String[0])).setRefresh(true)
136+
String[] indicesToQuery = removeReadOnlyIndices(new ArrayList<>(indices), listener, "model snapshots", null);
137+
if (indicesToQuery.length == 0) return;
138+
139+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
136140
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
137141
.setQuery(QueryBuilders.idsQuery().addIds(idsToDelete.toArray(new String[0])));
138142

@@ -180,7 +184,16 @@ public void deleteAnnotations(
180184
boolQuery.filter(QueryBuilders.termsQuery(Annotation.EVENT.getPreferredName(), eventsToDelete));
181185
}
182186
QueryBuilder query = QueryBuilders.constantScoreQuery(boolQuery);
183-
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME).setQuery(query)
187+
188+
String[] indicesToQuery = removeReadOnlyIndices(
189+
List.of(AnnotationIndex.READ_ALIAS_NAME),
190+
listener,
191+
"annotations",
192+
() -> listener.onResponse(true)
193+
);
194+
if (indicesToQuery.length == 0) return;
195+
196+
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
184197
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
185198
.setAbortOnVersionConflict(false)
186199
.setRefresh(true)
@@ -198,6 +211,28 @@ public void deleteAnnotations(
198211
);
199212
}
200213

214+
private <T> String[] removeReadOnlyIndices(
215+
List<String> indicesToQuery,
216+
ActionListener<T> listener,
217+
String entityType,
218+
Runnable onEmpty
219+
) {
220+
try {
221+
indicesToQuery = WritableIndexExpander.getInstance().getWritableIndices(indicesToQuery);
222+
} catch (Exception e) {
223+
logger.error("Failed to get writable indices for [" + jobId + "]", e);
224+
listener.onFailure(e);
225+
return new String[0];
226+
}
227+
if (indicesToQuery.isEmpty()) {
228+
logger.info("No writable {} indices found for [{}] job. No {} to remove.", entityType, jobId, entityType);
229+
if (onEmpty != null) {
230+
onEmpty.run();
231+
}
232+
}
233+
return indicesToQuery.toArray(String[]::new);
234+
}
235+
201236
/**
202237
* Asynchronously delete all result types (Buckets, Records, Influencers) from {@code cutOffTime}.
203238
* Forecasts are <em>not</em> deleted, as they will not be automatically regenerated after
@@ -219,7 +254,14 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> li
219254
)
220255
)
221256
.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
222-
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).setQuery(query)
257+
String[] indicesToQuery = removeReadOnlyIndices(
258+
List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)),
259+
listener,
260+
"results",
261+
() -> listener.onResponse(true)
262+
);
263+
if (indicesToQuery.length == 0) return;
264+
DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
223265
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
224266
.setAbortOnVersionConflict(false)
225267
.setRefresh(true)
@@ -264,9 +306,14 @@ public void deleteInterimResults() {
264306
* @param listener Response listener
265307
*/
266308
public void deleteDatafeedTimingStats(ActionListener<BulkByScrollResponse> listener) {
267-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).setRefresh(
268-
true
269-
)
309+
String[] indicesToQuery = removeReadOnlyIndices(
310+
List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)),
311+
listener,
312+
"datafeed timing stats",
313+
null
314+
);
315+
if (indicesToQuery.length == 0) return;
316+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
270317
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
271318
.setQuery(QueryBuilders.idsQuery().addIds(DatafeedTimingStats.documentId(jobId)));
272319

@@ -456,7 +503,9 @@ private void deleteResultsByQuery(
456503
ActionListener<BroadcastResponse> refreshListener = ActionListener.wrap(refreshResponse -> {
457504
logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indices));
458505
ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
459-
DeleteByQueryRequest request = new DeleteByQueryRequest(indices).setQuery(query)
506+
String[] indicesToQuery = removeReadOnlyIndices(List.of(indices), listener, "results", null);
507+
if (indicesToQuery.length == 0) return;
508+
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
460509
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
461510
.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
462511
.setAbortOnVersionConflict(false)
@@ -526,7 +575,16 @@ private static IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesRespons
526575
private void deleteQuantiles(@SuppressWarnings("HiddenField") String jobId, ActionListener<Boolean> finishedHandler) {
527576
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
528577
IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId));
529-
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)
578+
579+
String[] indicesToQuery = removeReadOnlyIndices(
580+
List.of(AnomalyDetectorsIndex.jobStateIndexPattern()),
581+
finishedHandler,
582+
"quantiles",
583+
() -> finishedHandler.onResponse(true)
584+
);
585+
if (indicesToQuery.length == 0) return;
586+
587+
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
530588
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
531589
.setAbortOnVersionConflict(false)
532590
.setRefresh(true);
@@ -556,7 +614,14 @@ private void deleteCategorizerState(
556614
) {
557615
// Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
558616
IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum));
559-
DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()).setQuery(query)
617+
String[] indicesToQuery = removeReadOnlyIndices(
618+
List.of(AnomalyDetectorsIndex.jobStateIndexPattern()),
619+
finishedHandler,
620+
"categorizer state",
621+
() -> finishedHandler.onResponse(true)
622+
);
623+
if (indicesToQuery.length == 0) return;
624+
DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
560625
.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
561626
.setAbortOnVersionConflict(false)
562627
.setRefresh(true);

0 commit comments

Comments
 (0)