Skip to content

Commit be49865

Browse files
committed
[timeseries] Add index lifecycle policy to elasticsearch
1 parent bd51b94 commit be49865

File tree

4 files changed

+140
-50
lines changed

4 files changed

+140
-50
lines changed

openwisp_monitoring/db/backends/elasticsearch/client.py

Lines changed: 51 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from .. import TIMESERIES_DB
1818
from .index import MetricIndex, Point, find_metric
1919
from .queries import default_chart_query, math_map, operator_lookup
20+
from .retention_policies import _make_policy, default_rp_policy
2021

2122
logger = logging.getLogger(__name__)
2223

@@ -70,7 +71,10 @@ def __init__(self, db_name='metric'):
7071
def create_database(self):
7172
""" creates connection to elasticsearch """
7273
connections.create_connection(hosts=[TIMESERIES_DB['HOST']])
73-
self.get_db
74+
db = self.get_db
75+
# Skip if support for Index Lifecycle Management is disabled or no privileges
76+
self.ilm_enabled = db.ilm.start()['acknowledged']
77+
self.create_or_alter_retention_policy(name='default')
7478

7579
def drop_database(self):
7680
""" deletes all indices """
@@ -81,27 +85,48 @@ def drop_database(self):
8185
@cached_property
8286
def get_db(self):
8387
""" Returns an ``Elasticsearch Client`` instance """
84-
# TODO: AUTHENTICATION remains see `SecurityClient`
8588
return Elasticsearch(
8689
[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"],
8790
http_auth=(TIMESERIES_DB['USER'], TIMESERIES_DB['PASSWORD']),
8891
retry_on_timeout=True,
8992
)
9093

91-
def create_or_alter_retention_policy(self, name, duration):
92-
""" creates or alters existing retention policy if necessary """
93-
# TODO
94-
pass
94+
def create_or_alter_retention_policy(self, name, duration=None):
95+
"""
96+
creates or alters existing retention policy if necessary
97+
98+
Note: default retention policy can't be altered with this function
99+
"""
100+
if not self.ilm_enabled:
101+
return
102+
ilm = self.get_db.ilm
103+
if not duration:
104+
ilm.put_lifecycle(policy=name, body=default_rp_policy)
105+
return
106+
days = f'{int(duration.split("h")[0]) // 24}d'
107+
duration_changed = False
108+
try:
109+
policy = ilm.get_lifecycle('default')
110+
exists = True
111+
current_duration = policy['default']['policy']['phases']['hot']['actions'][
112+
'rollover'
113+
]['max_age']
114+
duration_changed = current_duration != days
115+
except NotFoundError:
116+
exists = False
117+
if not exists or duration_changed:
118+
policy = _make_policy(days)
119+
ilm.put_lifecycle(policy=name, body=policy)
95120

96121
def query(self, query, precision=None):
97122
index = query.pop('key')
98123
return Search(index=index).from_dict(query).execute().to_dict()
99124

100125
def write(self, name, values, **kwargs):
101-
# TODO: Add support for retention policy
126+
rp = kwargs.get('retention_policy')
102127
tags = kwargs.get('tags')
103128
timestamp = kwargs.get('timestamp')
104-
metric_id = find_metric(name, tags, add=True)
129+
metric_id = find_metric(self.get_db, name, tags, rp, add=True)
105130
metric_index = MetricIndex().get(metric_id, index=name)
106131
point = Point(fields=values, time=timestamp or datetime.now())
107132
metric_index.points.append(point)
@@ -111,13 +136,10 @@ def read(self, key, fields, tags, limit=1, order='-time', **kwargs):
111136
extra_fields = kwargs.get('extra_fields')
112137
time_format = kwargs.get('time_format')
113138
# since = kwargs.get('since')
114-
metric_id = find_metric(key, tags)
139+
metric_id = find_metric(self.get_db, key, tags)
115140
if not metric_id:
116-
return list()
117-
try:
118-
metric_index = MetricIndex().get(metric_id, index=key)
119-
except NotFoundError:
120141
return []
142+
metric_index = self.get_db.get(index=key, id=metric_id)
121143
if order == 'time':
122144
points = list(metric_index.points[0:limit])
123145
elif order == '-time':
@@ -127,33 +149,27 @@ def read(self, key, fields, tags, limit=1, order='-time', **kwargs):
127149
f'Invalid order "{order}" passed.\nYou may pass "time" / "-time" to get '
128150
'result sorted in ascending /descending order respectively.'
129151
)
130-
if not points:
131-
return list()
132152
# distinguish between traffic and clients
133153
for point in list(points):
134154
if fields not in point.fields.to_dict():
135155
points.remove(point)
136156
if extra_fields and extra_fields != '*':
137157
assert isinstance(extra_fields, list)
138-
_points = []
139-
for point in points:
140-
point = point.to_dict()
141-
_point = {
158+
for count, point in enumerate(points):
159+
fields_dict = point.to_dict()['fields']
160+
point = {
142161
'time': self._format_time(point['time'], time_format),
143-
fields: point['fields'][fields],
162+
fields: fields_dict[fields],
144163
}
145164
for extra_field in extra_fields:
146-
if point['fields'].get(extra_field) is not None:
147-
_point.update({extra_field: point['fields'][extra_field]})
148-
_points.append(_point)
149-
points = _points
165+
if fields_dict.get(extra_field) is not None:
166+
point.update({extra_field: fields_dict[extra_field]})
167+
points[count] = point
150168
elif extra_fields == '*':
151-
points = [
152-
deep_merge_dicts(
153-
p.fields.to_dict(), {'time': self._format_time(p.time, time_format)}
169+
for count, point in enumerate(points):
170+
points[count] = deep_merge_dicts(
171+
point.fields.to_dict(), {'time': self._format_time(point.time, time_format)}
154172
)
155-
for p in points
156-
]
157173
else:
158174
points = [
159175
deep_merge_dicts(
@@ -210,12 +226,14 @@ def _fill_points(self, query, points):
210226

211227
def delete_metric_data(self, key=None, tags=None):
212228
"""
213-
deletes a specific metric based on the key and tags
214-
provided, you may also choose to delete all metrics
229+
deletes a specific metric based on given key and tags;
230+
deletes all metrics if neither provided
215231
"""
216232
if key and tags:
217-
metric_id = find_metric(key, tags)
233+
metric_id = find_metric(self.get_db, key, tags)
218234
self.get_db.delete(index=key, id=metric_id)
235+
elif key:
236+
self.get_db.indices.delete(index=key, ignore=[400, 404])
219237
else:
220238
self.get_db.indices.delete(index='*', ignore=[400, 404])
221239

@@ -317,7 +335,7 @@ def default_chart_query(self, tags):
317335
return q
318336

319337

320-
# Old data - delete by query (inefficient) / retention policy - Index lifecycle management
338+
# TODO:
321339
# Fix Average - currently it's computing average over all fields!
322340
# Time Interval - fix range
323341
# Device query
Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import uuid
22

33
from django.conf import settings
4-
from elasticsearch import Elasticsearch
54
from elasticsearch.exceptions import NotFoundError
65
from elasticsearch.helpers import bulk
76
from elasticsearch_dsl import Date, Document, InnerDoc, Nested, Q, Search
87
from swapper import load_model
98

9+
# TODO: Remove this. Current Structure
10+
# Index Templates <-- Indexes <-- Documents <-- Points
11+
1012

1113
class Point(InnerDoc):
1214
time = Date(required=True, default_timezone=settings.TIME_ZONE)
@@ -15,20 +17,19 @@ class Point(InnerDoc):
1517

1618
class MetricIndex(Document):
1719
tags = Nested(dynamic=True, required=True, multi=True)
18-
# returns an empty list if not present
1920
points = Nested(Point)
2021

2122
class Index:
22-
# name gets replaced with metric's key
2323
name = 'metric'
2424
settings = {
25-
"number_of_shards": 1,
26-
"number_of_replicas": 0,
25+
'number_of_shards': 1,
26+
'number_of_replicas': 0,
27+
'lifecycle.name': 'default',
28+
'lifecycle.rollover_alias': 'metric',
2729
}
2830

2931

30-
def find_metric(index, tags, add=False):
31-
client = Elasticsearch()
32+
def find_metric(client, index, tags, retention_policy=None, add=False):
3233
search = Search(using=client, index=index)
3334
if tags:
3435
tags_dict = dict()
@@ -40,28 +41,56 @@ def find_metric(index, tags, add=False):
4041
try:
4142
return list(search.query(q).execute())[0].meta['id']
4243
except (NotFoundError, AttributeError, IndexError):
43-
return add_index(index, tags)['_id'] if add else None
44+
return (
45+
add_doc(client, index, tags, retention_policy=retention_policy)['_id']
46+
if add
47+
else None
48+
)
4449

4550

46-
def add_index(key, tags, id=None):
51+
def add_doc(client, key, tags, _id=None, retention_policy=None):
4752
"""
4853
Add index to elasticsearch using ``keys``, ``tags`` and ``id`` provided.
4954
If no ``id`` is provided a random ``uuid`` would be used.
5055
"""
51-
obj = MetricIndex(meta={'id': id or uuid.uuid1()}, tags=tags)
52-
obj.meta.index = key
53-
obj.save()
56+
_id = str(_id or uuid.uuid1())
57+
# Check if index exists
58+
if client.indices.exists(index=key):
59+
client.create(index=key, id=_id, body={'tags': tags})
60+
return {'_id': _id}
61+
# Create a new index if it doesn't exist
62+
name = f'{key}-000001'
63+
obj = MetricIndex()
64+
obj._index = obj._index.clone(name)
65+
# Create a new index template if it doesn't exist
66+
if not client.indices.exists_template(name=key):
67+
obj._index.settings(**{'lifecycle.rollover_alias': key})
68+
if retention_policy:
69+
obj._index.settings(**{'lifecycle.name': retention_policy})
70+
# index pattern is added for Index Lifecycle Management
71+
obj._index.as_template(key, f'{key}-*').save(using=client)
72+
obj.init(using=client, index=name)
73+
obj.meta = {'id': _id, 'index': name}
74+
obj.tags = tags
75+
obj.save(using=client, index=name)
76+
if retention_policy:
77+
client.indices.put_settings(
78+
body={'lifecycle.name': retention_policy}, index=key
79+
)
80+
client.indices.put_settings(body={'lifecycle.rollover_alias': key}, index=name)
81+
client.indices.put_alias(index=name, name=key, body={'is_write_index': True})
5482
return obj.to_dict(include_meta=True)
5583

5684

5785
def bulk_indexing():
5886
""" Index all existing metrics """
87+
from ... import timeseries_db
88+
5989
Metric = load_model('monitoring', 'Metric')
60-
MetricIndex.init()
61-
es = Elasticsearch()
6290
bulk(
63-
client=es,
91+
client=timeseries_db.get_db,
6492
actions=(
65-
add_index(m.key, m.tags, m.id) for m in Metric.objects.all().iterator()
93+
add_doc(timeseries_db.get_db, m.key, m.tags, m.id)
94+
for m in Metric.objects.all().distinct('key').iterator()
6695
),
6796
)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# By default age is calculated from the date the index is created but if the
2+
# index has been rolled over than the rollover date is used to calculate the age
3+
4+
default_rp_policy = {
5+
'policy': {
6+
'phases': {
7+
'hot': {
8+
'actions': {
9+
'rollover': {'max_age': '30d', 'max_size': '90G'},
10+
'set_priority': {'priority': 100},
11+
}
12+
},
13+
'warm': {
14+
'min_age': '30d',
15+
'actions': {
16+
'forcemerge': {'max_num_segments': 1},
17+
'allocate': {'number_of_replicas': 0},
18+
'set_priority': {'priority': 50},
19+
},
20+
},
21+
'cold': {'min_age': '150d', 'actions': {'freeze': {}}},
22+
'delete': {'min_age': '335d', 'actions': {'delete': {}}},
23+
}
24+
}
25+
}
26+
27+
28+
def _make_policy(max_age):
29+
return {
30+
'policy': {
31+
'phases': {
32+
'hot': {
33+
'actions': {
34+
'rollover': {'max_age': max_age},
35+
'set_priority': {'priority': 100},
36+
}
37+
},
38+
'delete': {'actions': {'delete': {}}},
39+
}
40+
}
41+
}

openwisp_monitoring/monitoring/tests/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
Metric = load_model('monitoring', 'Metric')
1717
AlertSettings = load_model('monitoring', 'AlertSettings')
1818

19+
# TODO: Queries relating to top_fields aren't yet
1920
# this custom chart configuration is used for automated testing purposes
2021
charts = {
2122
'histogram': {
@@ -125,7 +126,8 @@
125126
"SELECT {fields|MEAN} FROM {key} "
126127
"WHERE time >= '{time}' AND content_type = "
127128
"'{content_type}' AND object_id = '{object_id}'"
128-
)
129+
),
130+
'elasticsearch': _make_query(),
129131
},
130132
},
131133
}

0 commit comments

Comments
 (0)