Skip to content

Commit 184f7a6

Browse files
committed
[timeseries] Add index lifecycle policy to elasticsearch
1 parent bd51b94 commit 184f7a6

File tree

5 files changed

+149
-52
lines changed

5 files changed

+149
-52
lines changed

openwisp_monitoring/db/backends/elasticsearch/client.py

Lines changed: 53 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from .. import TIMESERIES_DB
1818
from .index import MetricIndex, Point, find_metric
1919
from .queries import default_chart_query, math_map, operator_lookup
20+
from .retention_policies import _make_policy, default_rp_policy
2021

2122
logger = logging.getLogger(__name__)
2223

@@ -70,7 +71,10 @@ def __init__(self, db_name='metric'):
7071
def create_database(self):
7172
""" creates connection to elasticsearch """
7273
connections.create_connection(hosts=[TIMESERIES_DB['HOST']])
73-
self.get_db
74+
db = self.get_db
75+
# Skip if support for Index Lifecycle Management is disabled or no privileges
76+
self.ilm_enabled = db.ilm.start()['acknowledged']
77+
self.create_or_alter_retention_policy(name='default')
7478

7579
def drop_database(self):
7680
""" deletes all indices """
@@ -81,27 +85,48 @@ def drop_database(self):
8185
@cached_property
8286
def get_db(self):
8387
""" Returns an ``Elasticsearch Client`` instance """
84-
# TODO: AUTHENTICATION remains see `SecurityClient`
8588
return Elasticsearch(
8689
[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"],
8790
http_auth=(TIMESERIES_DB['USER'], TIMESERIES_DB['PASSWORD']),
8891
retry_on_timeout=True,
8992
)
9093

91-
def create_or_alter_retention_policy(self, name, duration):
92-
""" creates or alters existing retention policy if necessary """
93-
# TODO
94-
pass
94+
def create_or_alter_retention_policy(self, name, duration=None):
95+
"""
96+
creates or alters existing retention policy if necessary
97+
98+
Note: default retention policy can't be altered with this function
99+
"""
100+
if not self.ilm_enabled:
101+
return
102+
ilm = self.get_db.ilm
103+
if not duration:
104+
ilm.put_lifecycle(policy=name, body=default_rp_policy)
105+
return
106+
days = f'{int(duration.split("h")[0]) / 24}d'
107+
duration_changed = False
108+
try:
109+
policy = ilm.get_lifecycle('default')
110+
exists = True
111+
current_duration = policy['default']['policy']['phases']['hot']['actions'][
112+
'rollover'
113+
]['max_age']
114+
duration_changed = current_duration != days
115+
except NotFoundError:
116+
exists = False
117+
if not exists or duration_changed:
118+
policy = _make_policy(days)
119+
ilm.put_lifecycle(policy=name, body=policy)
95120

96121
def query(self, query, precision=None):
97122
index = query.pop('key')
98123
return Search(index=index).from_dict(query).execute().to_dict()
99124

100125
def write(self, name, values, **kwargs):
101-
# TODO: Add support for retention policy
126+
rp = kwargs.get('retention_policy')
102127
tags = kwargs.get('tags')
103128
timestamp = kwargs.get('timestamp')
104-
metric_id = find_metric(name, tags, add=True)
129+
metric_id = find_metric(self.get_db, name, tags, rp, add=True)
105130
metric_index = MetricIndex().get(metric_id, index=name)
106131
point = Point(fields=values, time=timestamp or datetime.now())
107132
metric_index.points.append(point)
@@ -110,14 +135,12 @@ def write(self, name, values, **kwargs):
110135
def read(self, key, fields, tags, limit=1, order='-time', **kwargs):
111136
extra_fields = kwargs.get('extra_fields')
112137
time_format = kwargs.get('time_format')
138+
# TODO: It will be of the form 'now() - <int>s'
113139
# since = kwargs.get('since')
114-
metric_id = find_metric(key, tags)
140+
metric_id = find_metric(self.get_db, key, tags)
115141
if not metric_id:
116-
return list()
117-
try:
118-
metric_index = MetricIndex().get(metric_id, index=key)
119-
except NotFoundError:
120142
return []
143+
metric_index = self.get_db.get(index=key, id=metric_id)
121144
if order == 'time':
122145
points = list(metric_index.points[0:limit])
123146
elif order == '-time':
@@ -127,33 +150,28 @@ def read(self, key, fields, tags, limit=1, order='-time', **kwargs):
127150
f'Invalid order "{order}" passed.\nYou may pass "time" / "-time" to get '
128151
'result sorted in ascending /descending order respectively.'
129152
)
130-
if not points:
131-
return list()
132153
# distinguish between traffic and clients
133154
for point in list(points):
134155
if fields not in point.fields.to_dict():
135156
points.remove(point)
136157
if extra_fields and extra_fields != '*':
137158
assert isinstance(extra_fields, list)
138-
_points = []
139-
for point in points:
140-
point = point.to_dict()
141-
_point = {
159+
for count, point in enumerate(points):
160+
fields_dict = point.to_dict()['fields']
161+
point = {
142162
'time': self._format_time(point['time'], time_format),
143-
fields: point['fields'][fields],
163+
fields: fields_dict[fields],
144164
}
145165
for extra_field in extra_fields:
146-
if point['fields'].get(extra_field) is not None:
147-
_point.update({extra_field: point['fields'][extra_field]})
148-
_points.append(_point)
149-
points = _points
166+
if fields_dict.get(extra_field) is not None:
167+
point.update({extra_field: fields_dict[extra_field]})
168+
points[count] = point
150169
elif extra_fields == '*':
151-
points = [
152-
deep_merge_dicts(
153-
p.fields.to_dict(), {'time': self._format_time(p.time, time_format)}
170+
for count, point in enumerate(points):
171+
points[count] = deep_merge_dicts(
172+
point.fields.to_dict(),
173+
{'time': self._format_time(point.time, time_format)},
154174
)
155-
for p in points
156-
]
157175
else:
158176
points = [
159177
deep_merge_dicts(
@@ -210,12 +228,14 @@ def _fill_points(self, query, points):
210228

211229
def delete_metric_data(self, key=None, tags=None):
212230
"""
213-
deletes a specific metric based on the key and tags
214-
provided, you may also choose to delete all metrics
231+
deletes a specific metric based on given key and tags;
232+
deletes all metrics if neither provided
215233
"""
216234
if key and tags:
217-
metric_id = find_metric(key, tags)
235+
metric_id = find_metric(self.get_db, key, tags)
218236
self.get_db.delete(index=key, id=metric_id)
237+
elif key:
238+
self.get_db.indices.delete(index=key, ignore=[400, 404])
219239
else:
220240
self.get_db.indices.delete(index='*', ignore=[400, 404])
221241

@@ -317,7 +337,7 @@ def default_chart_query(self, tags):
317337
return q
318338

319339

320-
# Old data - delete by query (inefficient) / retention policy - Index lifecycle management
340+
# TODO:
321341
# Fix Average - currently it's computing average over all fields!
322342
# Time Interval - fix range
323343
# Device query
Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import uuid
22

33
from django.conf import settings
4-
from elasticsearch import Elasticsearch
54
from elasticsearch.exceptions import NotFoundError
65
from elasticsearch.helpers import bulk
76
from elasticsearch_dsl import Date, Document, InnerDoc, Nested, Q, Search
87
from swapper import load_model
98

9+
# TODO: Remove this. Current Structure
10+
# Index Templates <-- Indexes <-- Documents <-- Points
11+
1012

1113
class Point(InnerDoc):
1214
time = Date(required=True, default_timezone=settings.TIME_ZONE)
@@ -15,53 +17,86 @@ class Point(InnerDoc):
1517

1618
class MetricIndex(Document):
1719
tags = Nested(dynamic=True, required=True, multi=True)
18-
# returns an empty list if not present
1920
points = Nested(Point)
2021

2122
class Index:
22-
# name gets replaced with metric's key
2323
name = 'metric'
2424
settings = {
25-
"number_of_shards": 1,
26-
"number_of_replicas": 0,
25+
'number_of_shards': 1,
26+
'number_of_replicas': 0,
27+
'lifecycle.name': 'default',
28+
'lifecycle.rollover_alias': 'metric',
2729
}
2830

2931

30-
def find_metric(index, tags, add=False):
31-
client = Elasticsearch()
32+
def find_metric(client, index, tags, retention_policy=None, add=False):
3233
search = Search(using=client, index=index)
3334
if tags:
3435
tags_dict = dict()
3536
for key, value in tags.items():
3637
tags_dict[f'tags.{key}'] = value
37-
q = Q('bool', must=[Q('match', **{k: str(v)}) for k, v in tags_dict.items()])
38+
q = Q(
39+
'nested',
40+
path='tags',
41+
query=Q(
42+
'bool', must=[Q('match', **{k: str(v)}) for k, v in tags_dict.items()]
43+
),
44+
)
3845
else:
3946
q = Q()
4047
try:
4148
return list(search.query(q).execute())[0].meta['id']
4249
except (NotFoundError, AttributeError, IndexError):
43-
return add_index(index, tags)['_id'] if add else None
50+
return (
51+
add_doc(client, index, tags, retention_policy=retention_policy)['_id']
52+
if add
53+
else None
54+
)
4455

4556

46-
def add_index(key, tags, id=None):
57+
def add_doc(client, key, tags, _id=None, retention_policy=None):
4758
"""
4859
Add index to elasticsearch using ``keys``, ``tags`` and ``id`` provided.
4960
If no ``id`` is provided a random ``uuid`` would be used.
5061
"""
51-
obj = MetricIndex(meta={'id': id or uuid.uuid1()}, tags=tags)
52-
obj.meta.index = key
53-
obj.save()
62+
_id = str(_id or uuid.uuid1())
63+
# Check if index exists
64+
if client.indices.exists(index=key):
65+
client.create(index=key, id=_id, body={'tags': tags})
66+
return {'_id': _id}
67+
# Create a new index if it doesn't exist
68+
name = f'{key}-000001'
69+
obj = MetricIndex()
70+
obj._index = obj._index.clone(name)
71+
# Create a new index template if it doesn't exist
72+
if not client.indices.exists_template(name=key):
73+
obj._index.settings(**{'lifecycle.rollover_alias': key})
74+
if retention_policy:
75+
obj._index.settings(**{'lifecycle.name': retention_policy})
76+
# index pattern is added for Index Lifecycle Management
77+
obj._index.as_template(key, f'{key}-*').save(using=client)
78+
obj.init(using=client, index=name)
79+
obj.meta = {'id': _id, 'index': name}
80+
obj.tags = tags
81+
obj.save(using=client, index=name)
82+
if retention_policy:
83+
client.indices.put_settings(
84+
body={'lifecycle.name': retention_policy}, index=key
85+
)
86+
client.indices.put_settings(body={'lifecycle.rollover_alias': key}, index=name)
87+
client.indices.put_alias(index=name, name=key, body={'is_write_index': True})
5488
return obj.to_dict(include_meta=True)
5589

5690

5791
def bulk_indexing():
5892
""" Index all existing metrics """
93+
from ... import timeseries_db
94+
5995
Metric = load_model('monitoring', 'Metric')
60-
MetricIndex.init()
61-
es = Elasticsearch()
6296
bulk(
63-
client=es,
97+
client=timeseries_db.get_db,
6498
actions=(
65-
add_index(m.key, m.tags, m.id) for m in Metric.objects.all().iterator()
99+
add_doc(timeseries_db.get_db, m.key, m.tags, m.id)
100+
for m in Metric.objects.all().distinct('key').iterator()
66101
),
67102
)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# By default age is calculated from the date the index is created but if the
2+
# index has been rolled over than the rollover date is used to calculate the age
3+
4+
default_rp_policy = {
5+
'policy': {
6+
'phases': {
7+
'hot': {
8+
'actions': {
9+
'rollover': {'max_age': '30d', 'max_size': '90G'},
10+
'set_priority': {'priority': 100},
11+
}
12+
},
13+
'warm': {
14+
'min_age': '30d',
15+
'actions': {
16+
'forcemerge': {'max_num_segments': 1},
17+
'allocate': {'number_of_replicas': 0},
18+
'set_priority': {'priority': 50},
19+
},
20+
},
21+
'cold': {'min_age': '150d', 'actions': {'freeze': {}}},
22+
'delete': {'min_age': '335d', 'actions': {'delete': {}}},
23+
}
24+
}
25+
}
26+
27+
28+
def _make_policy(max_age):
29+
return {
30+
'policy': {
31+
'phases': {
32+
'hot': {
33+
'actions': {
34+
'rollover': {'max_age': max_age},
35+
'set_priority': {'priority': 100},
36+
}
37+
},
38+
'delete': {'actions': {'delete': {}}},
39+
}
40+
}
41+
}

openwisp_monitoring/monitoring/base/models.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,6 @@ def _is_crossed_by(self, current_value, time=None):
509509
continue
510510
if not self._value_crossed(point[self.metric.field_name]):
511511
return False
512-
print(point['time'])
513512
if self._time_crossed(
514513
make_aware(datetime.fromtimestamp(point['time']))
515514
):

openwisp_monitoring/monitoring/tests/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
Metric = load_model('monitoring', 'Metric')
1717
AlertSettings = load_model('monitoring', 'AlertSettings')
1818

19+
# TODO: Queries relating to top_fields aren't yet
1920
# this custom chart configuration is used for automated testing purposes
2021
charts = {
2122
'histogram': {
@@ -125,7 +126,8 @@
125126
"SELECT {fields|MEAN} FROM {key} "
126127
"WHERE time >= '{time}' AND content_type = "
127128
"'{content_type}' AND object_id = '{object_id}'"
128-
)
129+
),
130+
'elasticsearch': _make_query(),
129131
},
130132
},
131133
}

0 commit comments

Comments
 (0)