Skip to content

Commit d383f17

Browse files
committed
[timeseries] Add top fields support
1 parent f4fae65 commit d383f17

File tree

13 files changed

+280
-128
lines changed

13 files changed

+280
-128
lines changed

README.rst

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Available Features
2424

2525
* Collects and displays device status information like uptime, RAM status, CPU load averages,
2626
Interface addresses, WiFi interface status and associated clients, Neighbors information, DHCP Leases, Disk/Flash status
27-
* Collection of monitoring information in a timeseries database (currently only influxdb is supported)
27+
* Collection of monitoring information in a timeseries database (`InfluxDB <https://www.influxdata.com/>`_ and `Elasticsearch <https://www.elastic.co/elasticsearch/>`_ are currently supported)
2828
* Monitoring charts for uptime, packet loss, round trip time (latency), associated wifi clients, interface traffic,
2929
RAM usage, CPU load, flash/disk usage
3030
* Charts can be viewed at resolutions of 1 day, 3 days, a week, a month and a year
@@ -46,6 +46,8 @@ beforehand.
4646
In case you prefer not to use Docker you can `install InfluxDB <https://docs.influxdata.com/influxdb/v1.8/introduction/install/>`_
4747
and Redis from your repositories, but keep in mind that the version packaged by your distribution may be different.
4848

49+
If you wish to use ``Elasticsearch`` for storing and retrieving timeseries data then `install Elasticsearch <https://www.elastic.co/guide/en/elasticsearch/reference/current/install-elasticsearch.html>`_.
50+
4951
Install spatialite and sqlite:
5052

5153
.. code-block:: shell
@@ -106,6 +108,19 @@ Follow the setup instructions of `openwisp-controller
106108
'PORT': '8086',
107109
}
108110
111+
In case, you wish to use ``Elasticsearch`` for timeseries data storage and retrieval,
112+
make use i=of the following settings
113+
114+
.. code-block:: python
115+
TIMESERIES_DATABASE = {
116+
'BACKEND': 'openwisp_monitoring.db.backends.elasticsearch',
117+
'USER': 'openwisp',
118+
'PASSWORD': 'openwisp',
119+
'NAME': 'openwisp2',
120+
'HOST': 'localhost',
121+
'PORT': '9200',
122+
}
123+
109124
``urls.py``:
110125

111126
.. code-block:: python
@@ -231,6 +246,9 @@ This data is only used to assess the recent status of devices, keeping
231246
it for a long time would not add much benefit and would cost a lot more
232247
in terms of disk space.
233248

249+
**Note**: In case you use ``Elasticsearch`` then time shall be taken as integral multiple of a day.
250+
That means the time ``36h0m0s`` shall be interpreted as ``24h0m0s`` (integral multiple of a day).
251+
234252
``OPENWISP_MONITORING_AUTO_PING``
235253
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
236254

@@ -360,18 +378,30 @@ MB (megabytes) instead of GB (Gigabytes) you can use:
360378
"SUM(rx_bytes) / 1000000 AS download FROM {key} "
361379
"WHERE time >= '{time}' AND content_type = '{content_type}' "
362380
"AND object_id = '{object_id}' GROUP BY time(1d)"
363-
)
381+
),
382+
'elasticsearch': _make_query({
383+
'upload': {'sum': {'field': 'points.fields.tx_bytes'}},
384+
'download': {'avg': {'field': 'points.fields.rx_bytes'}},
385+
})
364386
},
365387
}
366388
}
367389
390+
# This needs to be declared separately but only for elasticsearch
391+
OPENWISP_MONITORING_ADDITIONAL_CHARTS_OPERATIONS = {
392+
'upload': {'operator': '/', 'value': 1000000},
393+
'download': {'operator': '/', 'value': 1000000},
394+
}
395+
368396
Or if you want to define a new chart configuration, which you can then
369397
call in your custom code (eg: a custom check class), you can do so as follows:
370398

371399
.. code-block:: python
372400
373401
from django.utils.translation import gettext_lazy as _
374402
403+
from openwisp_monitoring.db.backends.elasticsearch import _make_query
404+
375405
OPENWISP_MONITORING_CHARTS = {
376406
'ram': {
377407
'type': 'line',
@@ -385,7 +415,12 @@ call in your custom code (eg: a custom check class), you can do so as follows:
385415
"MEAN(buffered) AS buffered FROM {key} WHERE time >= '{time}' AND "
386416
"content_type = '{content_type}' AND object_id = '{object_id}' "
387417
"GROUP BY time(1d)"
388-
)
418+
),
419+
'elasticsearch': _make_query({
420+
'total': {'avg': {'field': 'points.fields.total'}},
421+
'free': {'avg': {'field': 'points.fields.free'}},
422+
'buffered': {'avg': {'field': 'points.fields.buffered'}},
423+
})
389424
},
390425
}
391426
}

openwisp_monitoring/db/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from .backends import timeseries_db
22

33
chart_query = timeseries_db.queries.chart_query
4-
device_data_query = timeseries_db.queries.device_data_query
54

6-
__all__ = ['timeseries_db', 'chart_query', 'device_data_query']
5+
__all__ = ['timeseries_db', 'chart_query']
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .queries import _make_query
2+
3+
__all__ = ['_make_query']

openwisp_monitoring/db/backends/elasticsearch/client.py

Lines changed: 96 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
from collections import Counter
34
from copy import deepcopy
45
from datetime import datetime, timedelta
56

@@ -70,7 +71,9 @@ def __init__(self, db_name='metric'):
7071

7172
def create_database(self):
7273
""" creates connection to elasticsearch """
73-
connections.create_connection(hosts=[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"])
74+
connections.create_connection(
75+
hosts=[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"]
76+
)
7477
db = self.get_db
7578
# Skip if support for Index Lifecycle Management is disabled or no privileges
7679
self.ilm_enabled = db.ilm.start()['acknowledged']
@@ -122,15 +125,16 @@ def create_or_alter_retention_policy(self, name, duration=None):
122125
ilm.put_lifecycle(policy=name, body=policy)
123126

124127
def query(self, query, precision=None):
125-
index = query.pop('key')
126-
return Search(using=self.get_db, index=index).from_dict(query).execute().to_dict()
128+
if 'summary' in query:
129+
query.pop('summary')
130+
return Search(using=self.get_db).from_dict(query).execute().to_dict()
127131

128132
def write(self, name, values, **kwargs):
129133
rp = kwargs.get('retention_policy')
130134
tags = kwargs.get('tags')
131135
timestamp = kwargs.get('timestamp')
132-
metric_id = find_metric(self.get_db, name, tags, rp, add=True)
133-
metric_index = MetricIndex.get(metric_id, index=name, using=self.get_db)
136+
metric_id, index = find_metric(self.get_db, name, tags, rp, add=True)
137+
metric_index = MetricIndex.get(metric_id, index=index, using=self.get_db)
134138
point = Point(fields=values, time=timestamp or datetime.now())
135139
metric_index.points.append(point)
136140
metric_index.save()
@@ -140,10 +144,11 @@ def read(self, key, fields, tags, limit=1, order='-time', **kwargs):
140144
time_format = kwargs.get('time_format')
141145
# TODO: It will be of the form 'now() - <int>s'
142146
# since = kwargs.get('since')
143-
metric_id = find_metric(self.get_db, key, tags)
144-
if not metric_id:
147+
try:
148+
metric_id, index = find_metric(self.get_db, key, tags)
149+
except TypeError:
145150
return []
146-
metric_index = MetricIndex.get(index=key, id=metric_id, using=self.get_db)
151+
metric_index = MetricIndex.get(index=index, id=metric_id, using=self.get_db)
147152
if order == 'time':
148153
points = list(metric_index.points[0:limit])
149154
elif order == '-time':
@@ -195,26 +200,31 @@ def _format_time(self, obj, time_format=None):
195200

196201
def get_list_query(self, query, precision='s'):
197202
response = self.query(query, precision)
198-
points = response['aggregations']['GroupByTime']['buckets']
199-
list_points = self._fill_points(
200-
query, [self._format(point) for point in points]
201-
)
203+
try:
204+
points = response['aggregations']['GroupByTime']['set_range']['time'][
205+
'buckets'
206+
]
207+
list_points = self._fill_points(
208+
query, [self._format(point) for point in points],
209+
)
210+
except KeyError:
211+
return []
202212
return list_points
203213

204214
def _fill_points(self, query, points):
205-
_range = next(
206-
(item for item in query['query']['bool']['must'] if item.get('range')), None
207-
)
215+
_range = query['aggs']['GroupByTime']['nested']['aggs']['set_range']
208216
if not _range or not points:
209217
return points
210-
days = int(_range['range']['points.time']['from'][4:-3])
218+
days = int(_range['filter']['range']['points.time']['from'][4:-3])
219+
interval = _range['aggs']['time']['date_histogram']['fixed_interval']
220+
# Check if summary query
221+
if f'{days}d' == interval:
222+
return points
223+
interval_dict = {'10m': 600, '20m': 1200, '1h': 3600, '24h': 86400}
224+
interval = interval_dict[interval]
211225
start_time = datetime.now()
212226
end_time = start_time - timedelta(days=days) # include today
213227
dummy_point = deepcopy(points[0])
214-
if len(points) > 2:
215-
interval = points[0]['time'] - points[1]['time']
216-
else:
217-
interval = 600
218228
start_ts = points[0]['time'] + interval
219229
end_ts = points[-1]['time'] - interval
220230
for field in dummy_point.keys():
@@ -223,7 +233,7 @@ def _fill_points(self, query, points):
223233
dummy_point['time'] = start_ts
224234
points.insert(0, deepcopy(dummy_point))
225235
start_ts += interval
226-
# TODO: This needs to be fixed and shouldn't be required since intervals are set
236+
# TODO: Why is this required since intervals are set?
227237
while points[-1]['time'] < end_time.timestamp():
228238
points.pop(-1)
229239
while end_ts > end_time.timestamp():
@@ -238,8 +248,8 @@ def delete_metric_data(self, key=None, tags=None):
238248
deletes all metrics if neither provided
239249
"""
240250
if key and tags:
241-
metric_id = find_metric(self.get_db, key, tags)
242-
self.get_db.delete(index=key, id=metric_id)
251+
metric_id, index = find_metric(self.get_db, key, tags)
252+
self.get_db.delete(index=index, id=metric_id)
243253
elif key:
244254
self.get_db.indices.delete(index=key, ignore=[400, 404])
245255
else:
@@ -252,14 +262,19 @@ def validate_query(self, query):
252262
query = json.loads(query)
253263
# Elasticsearch currently supports validation of only query section,
254264
# aggs, size, _source etc. are not supported
255-
valid_check = self.get_db.indices.validate_query(body={'query': query['query']}, explain=True)
265+
valid_check = self.get_db.indices.validate_query(
266+
body={'query': query['query']}, explain=True
267+
)
256268
# Show a helpful message for failure
257269
if not valid_check['valid']:
258-
raise ValidationError(valid_check['explanations'])
270+
raise ValidationError(valid_check['error'])
259271
return self._is_aggregate(query)
260272

273+
# TODO: This is not covering everything
261274
def _is_aggregate(self, q):
262-
agg_dict = q['aggs']['GroupByTime']['aggs'].values()
275+
agg_dict = q['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs'][
276+
'time'
277+
]['aggs']['nest']['nested']['aggs'].values()
263278
agg = []
264279
for item in agg_dict:
265280
agg.append(next(iter(item)))
@@ -276,53 +291,88 @@ def get_query(
276291
query=None,
277292
timezone=settings.TIME_ZONE,
278293
):
279-
query['key'] = params.pop('key')
280294
query = json.dumps(query)
281295
for k, v in params.items():
282296
query = query.replace('{' + k + '}', v)
283297
query = self._group_by(query, time, chart_type, group_map, strip=summary)
284298
query = json.loads(query)
285-
if summary:
286-
_range = next(
287-
(item for item in query['query']['bool']['must'] if item.get('range')),
288-
None,
299+
set_range = query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs']['time']
300+
if fields:
301+
aggregate_dict = set_range['aggs']['nest']['nested']['aggs']
302+
agg = deepcopy(aggregate_dict).popitem()[1].popitem()[0]
303+
aggregate_dict.update(
304+
{
305+
f'{field}': {agg: {'field': f'points.fields.{field}'}}
306+
for field in fields
307+
}
289308
)
290-
if _range:
291-
query['query']['bool']['must'].remove(_range)
292-
query['aggs']['GroupByTime']['date_histogram']['time_zone'] = timezone
309+
try:
310+
set_range['date_histogram']['time_zone'] = timezone
311+
except KeyError:
312+
pass
293313
return query
294314

295315
def _group_by(self, query, time, chart_type, group_map, strip=False):
296-
if not self.validate_query(query):
297-
return query
316+
query = query.replace('1d/d', f'{time}/d')
298317
if not strip and not chart_type == 'histogram':
299318
value = group_map[time]
300-
query = query.replace('1d/d', f'{time}/d')
301319
query = query.replace('10m', value)
302320
if strip:
303321
query = query.replace('10m', time)
304322
return query
305323

306-
# TODO:
307324
def _get_top_fields(
308325
self,
309-
query,
310326
params,
311327
chart_type,
312328
group_map,
313329
number,
314330
time,
331+
query=None,
315332
timezone=settings.TIME_ZONE,
333+
get_fields=True,
334+
**kwargs,
316335
):
317-
pass
336+
"""
337+
Returns top fields if ``get_fields`` set to ``True`` (default)
338+
else it returns points containing the top fields.
339+
"""
340+
response = self.get_db.indices.get_mapping(index=params['key'])
341+
fields = [
342+
k
343+
for k, v in list(response.values())[0]['mappings']['properties']['points'][
344+
'properties'
345+
]['fields']['properties'].items()
346+
]
347+
query = self.get_query(
348+
chart_type,
349+
params,
350+
time,
351+
group_map,
352+
summary=True,
353+
fields=fields,
354+
query=query,
355+
timezone=timezone,
356+
)
357+
point = self.get_list_query(query)[0]
358+
time = point.pop('time')
359+
point = Counter(point).most_common(number)
360+
if get_fields:
361+
return [k for k, v in point]
362+
points = [{'time': time}]
363+
for k, v in point:
364+
points[0].update({k: v})
365+
return points
318366

319367
def _format(self, point):
320368
pt = {}
321369
# Convert time from milliseconds -> seconds precision
322370
pt['time'] = int(point['key'] / 1000)
323371
for key, value in point.items():
324372
if isinstance(value, dict):
325-
pt[key] = self._transform_field(key, value['value'])
373+
for k, v in value.items():
374+
if isinstance(v, dict):
375+
pt[k] = self._transform_field(k, v['value'])
326376
return pt
327377

328378
def _transform_field(self, field, value):
@@ -338,12 +388,10 @@ def _transform_field(self, field, value):
338388
def default_chart_query(self, tags):
339389
q = deepcopy(default_chart_query)
340390
if not tags:
341-
q['query']['bool']['must'].pop(0)
342-
q['query']['bool']['must'].pop(1)
391+
q['query']['nested']['query']['bool']['must'].pop(0)
392+
q['query']['nested']['query']['bool']['must'].pop(1)
343393
return q
344394

345-
346-
# TODO:
347-
# Fix Average - currently it's computing average over all fields!
348-
# Time Interval - fix range
349-
# Device query
395+
def _device_data(self, key, tags, fields, **kwargs):
396+
""" returns last snapshot of ``device_data`` """
397+
return self.read(key=key, fields=fields, tags=tags, time_format='isoformat',)

0 commit comments

Comments
 (0)