7
7
package org .gridsuite .study .server .elasticsearch ;
8
8
9
9
import co .elastic .clients .elasticsearch ._types .FieldValue ;
10
+ import co .elastic .clients .elasticsearch ._types .aggregations .*;
11
+ import co .elastic .clients .elasticsearch ._types .aggregations .Aggregation ;
10
12
import co .elastic .clients .elasticsearch ._types .query_dsl .*;
11
13
12
14
import com .powsybl .iidm .network .VariantManagerConstants ;
13
15
import org .apache .commons .lang3 .StringUtils ;
16
+ import org .apache .commons .lang3 .tuple .Pair ;
17
+ import org .gridsuite .study .server .dto .BasicEquipmentInfos ;
14
18
import org .gridsuite .study .server .dto .EquipmentInfos ;
15
19
import org .gridsuite .study .server .dto .TombstonedEquipmentInfos ;
20
+ import org .slf4j .Logger ;
21
+ import org .slf4j .LoggerFactory ;
16
22
import org .springframework .data .domain .PageRequest ;
17
- import org .springframework .data .elasticsearch .client .elc .NativeQuery ;
18
- import org .springframework .data .elasticsearch .client .elc .NativeQueryBuilder ;
19
- import org .springframework .data .elasticsearch .client .elc .Queries ;
23
+ import org .springframework .data .elasticsearch .client .elc .*;
20
24
import org .springframework .data .elasticsearch .core .ElasticsearchOperations ;
21
25
import org .springframework .data .elasticsearch .core .SearchHit ;
26
+ import org .springframework .data .elasticsearch .core .SearchHits ;
22
27
import org .springframework .lang .NonNull ;
23
28
import org .springframework .stereotype .Service ;
24
29
@@ -42,6 +47,8 @@ public enum FieldSelector {
42
47
43
48
private static final int PAGE_MAX_SIZE = 400 ;
44
49
50
+ private static final int COMPOSITE_AGGREGATION_BATCH_SIZE = 1000 ;
51
+
45
52
public static final Map <String , Integer > EQUIPMENT_TYPE_SCORES = Map .ofEntries (
46
53
entry ("SUBSTATION" , 15 ),
47
54
entry ("VOLTAGE_LEVEL" , 14 ),
@@ -73,6 +80,8 @@ public enum FieldSelector {
73
80
74
81
private final ElasticsearchOperations elasticsearchOperations ;
75
82
83
+ private static final Logger LOGGER = LoggerFactory .getLogger (EquipmentInfosService .class );
84
+
76
85
public EquipmentInfosService (EquipmentInfosRepository equipmentInfosRepository , TombstonedEquipmentInfosRepository tombstonedEquipmentInfosRepository , ElasticsearchOperations elasticsearchOperations ) {
77
86
this .equipmentInfosRepository = equipmentInfosRepository ;
78
87
this .tombstonedEquipmentInfosRepository = tombstonedEquipmentInfosRepository ;
@@ -106,6 +115,101 @@ public long getEquipmentInfosCount() {
106
115
return equipmentInfosRepository .count ();
107
116
}
108
117
118
+ private CompositeAggregation buildCompositeAggregation (String field , Map <String , FieldValue > afterKey ) {
119
+ List <Map <String , CompositeAggregationSource >> sources = List .of (
120
+ Map .of (field , CompositeAggregationSource .of (s -> s .terms (t -> t .field (field + ".keyword" )))
121
+ )
122
+ );
123
+
124
+ CompositeAggregation .Builder compositeAggregationBuilder = new CompositeAggregation .Builder ()
125
+ .size (COMPOSITE_AGGREGATION_BATCH_SIZE )
126
+ .sources (sources );
127
+
128
+ if (afterKey != null ) {
129
+ compositeAggregationBuilder .after (afterKey );
130
+ }
131
+
132
+ return compositeAggregationBuilder .build ();
133
+ }
134
+
135
+ /**
136
+ * Constructs a NativeQuery with a composite aggregation.
137
+ *
138
+ * @param compositeName The name of the composite aggregation.
139
+ * @param compositeAggregation The composite aggregation configuration.
140
+ * @return A NativeQuery object configured with the specified composite aggregation.
141
+ */
142
+ private NativeQuery buildCompositeAggregationQuery (String compositeName , CompositeAggregation compositeAggregation ) {
143
+ Aggregation aggregation = Aggregation .of (a -> a .composite (compositeAggregation ));
144
+
145
+ return new NativeQueryBuilder ()
146
+ .withAggregation (compositeName , aggregation )
147
+ .build ();
148
+ }
149
+
150
+ /**
151
+ * This method is used to extract the results of a composite aggregation from Elasticsearch search hits.
152
+ *
153
+ * @param searchHits The search hits returned from an Elasticsearch query.
154
+ * @param compositeName The name of the composite aggregation.
155
+ * @return A Pair consisting of two elements:
156
+ * The left element of the Pair is a list of maps, where each map represents a bucket's key. Each bucket is a result of the composite aggregation.
157
+ * The right element of the Pair is the afterKey map, which is used for pagination in Elasticsearch.
158
+ * If there are no more pages, the afterKey will be null.
159
+ * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-composite-aggregation.html">Elasticsearch Composite Aggregation Documentation</a>
160
+ */
161
+ private Pair <List <Map <String , FieldValue >>, Map <String , FieldValue >> extractCompositeAggregationResults (SearchHits <EquipmentInfos > searchHits , String compositeName ) {
162
+ ElasticsearchAggregations aggregations = (ElasticsearchAggregations ) searchHits .getAggregations ();
163
+
164
+ List <Map <String , FieldValue >> results = new ArrayList <>();
165
+ if (aggregations != null ) {
166
+ Map <String , ElasticsearchAggregation > aggregationList = aggregations .aggregationsAsMap ();
167
+ if (!aggregationList .isEmpty ()) {
168
+ Aggregate aggregate = aggregationList .get (compositeName ).aggregation ().getAggregate ();
169
+ if (aggregate .isComposite () && aggregate .composite () != null ) {
170
+ for (CompositeBucket bucket : aggregate .composite ().buckets ().array ()) {
171
+ Map <String , FieldValue > key = bucket .key ();
172
+ results .add (key );
173
+ }
174
+ return Pair .of (results , aggregate .composite ().afterKey ());
175
+ }
176
+ }
177
+ }
178
+ return Pair .of (results , null );
179
+ }
180
+
181
+ public List <UUID > getEquipmentInfosDistinctNetworkUuids () {
182
+ List <UUID > networkUuids = new ArrayList <>();
183
+ Map <String , FieldValue > afterKey = null ;
184
+ String compositeName = "composite_agg" ;
185
+ String networkUuidField = BasicEquipmentInfos .Fields .networkUuid ;
186
+
187
+ do {
188
+ CompositeAggregation compositeAggregation = buildCompositeAggregation (networkUuidField , afterKey );
189
+ NativeQuery query = buildCompositeAggregationQuery (compositeName , compositeAggregation );
190
+
191
+ SearchHits <EquipmentInfos > searchHits = elasticsearchOperations .search (query , EquipmentInfos .class );
192
+ Pair <List <Map <String , FieldValue >>, Map <String , FieldValue >> searchResults = extractCompositeAggregationResults (searchHits , compositeName );
193
+
194
+ searchResults .getLeft ().stream ()
195
+ .map (result -> result .get (networkUuidField ))
196
+ .filter (Objects ::nonNull )
197
+ .map (FieldValue ::stringValue )
198
+ .map (UUID ::fromString )
199
+ .forEach (networkUuids ::add );
200
+
201
+ afterKey = searchResults .getRight ();
202
+ } while (afterKey != null && !afterKey .isEmpty ());
203
+
204
+ return networkUuids ;
205
+ }
206
+
207
+ public List <UUID > getOrphanEquipmentInfosNetworkUuids (List <UUID > networkUuidsInDatabase ) {
208
+ List <UUID > networkUuids = getEquipmentInfosDistinctNetworkUuids ();
209
+ networkUuids .removeAll (networkUuidsInDatabase );
210
+ return networkUuids ;
211
+ }
212
+
109
213
public long getTombstonedEquipmentInfosCount () {
110
214
return tombstonedEquipmentInfosRepository .count ();
111
215
}
0 commit comments