Skip to content

Commit 2577e6f

Browse files
jsvdMikhailMS
andauthored
Support aggregations (#202)
Add `response_type` configuration option to allow processing result of aggregations. The default `hits` will generate one event per returned document (i.e. "hit"), which is the current behavior. When set to `aggregations`, a single Logstash event will be generated with the contents of the `aggregations` object of the query's response. In this case the `hits` object will be ignored. The parameter `size` will be always be set to 0 regardless of the default or user-defined value set in this plugin. --------- Co-authored-by: MikhailMS <[email protected]>
1 parent fac5191 commit 2577e6f

File tree

6 files changed

+153
-23
lines changed

6 files changed

+153
-23
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 4.20.0
2+
- Added `response_type` configuration option to allow processing result of aggregations [#202](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/202)
3+
14
## 4.19.1
25
- Plugin version bump to pick up docs fix in [#199](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/199) required to clear build error in docgen. [#200](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/200)
36

docs/index.asciidoc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
115115
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
116116
| <<plugins-{type}s-{plugin}-proxy>> |<<uri,uri>>|No
117117
| <<plugins-{type}s-{plugin}-query>> |<<string,string>>|No
118+
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations"]`|No
118119
| <<plugins-{type}s-{plugin}-request_timeout_seconds>> | <<number,number>>|No
119120
| <<plugins-{type}s-{plugin}-schedule>> |<<string,string>>|No
120121
| <<plugins-{type}s-{plugin}-scroll>> |<<string,string>>|No
@@ -337,6 +338,20 @@ documentation] for more information.
337338
When <<plugins-{type}s-{plugin}-search_api>> resolves to `search_after` and the query does not specify `sort`,
338339
the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the query. Please refer to the {ref}/paginate-search-results.html#search-after[Elasticsearch search_after] parameter to know more.
339340

341+
[id="plugins-{type}s-{plugin}-response_type"]
342+
===== `response_type`
343+
344+
* Value can be any of: `hits`, `aggregations`
345+
* Default value is `hits`
346+
347+
Which part of the result to transform into Logstash events when processing the
348+
response from the query.
349+
The default `hits` will generate one event per returned document (i.e. "hit").
350+
When set to `aggregations`, a single Logstash event will be generated with the
351+
contents of the `aggregations` object of the query's response. In this case the
352+
`hits` object will be ignored. The parameter `size` will be always be set to
353+
0 regardless of the default or user-defined value set in this plugin.
354+
340355
[id="plugins-{type}s-{plugin}-request_timeout_seconds"]
341356
===== `request_timeout_seconds`
342357

lib/logstash/inputs/elasticsearch.rb

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
7575

7676
require 'logstash/inputs/elasticsearch/paginated_search'
77+
require 'logstash/inputs/elasticsearch/aggregation'
7778

7879
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
7980
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
@@ -101,6 +102,11 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
101102
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
102103
config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }'
103104

105+
# This allows you to speccify the response type: either hits or aggregations
106+
# where hits: normal search request
107+
# aggregations: aggregation request
108+
config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits'
109+
104110
# This allows you to set the maximum number of hits returned per scroll.
105111
config :size, :validate => :number, :default => 1000
106112

@@ -282,11 +288,6 @@ def register
282288
fill_hosts_from_cloud_id
283289
setup_ssl_params!
284290

285-
@options = {
286-
:index => @index,
287-
:scroll => @scroll,
288-
:size => @size
289-
}
290291
@base_query = LogStash::Json.load(@query)
291292
if @slices
292293
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@@ -328,21 +329,25 @@ def register
328329

329330
setup_search_api
330331

332+
setup_query_executor
333+
331334
@client
332335
end
333336

334-
335337
def run(output_queue)
336338
if @schedule
337-
scheduler.cron(@schedule) { @paginated_search.do_run(output_queue) }
339+
scheduler.cron(@schedule) { @query_executor.do_run(output_queue) }
338340
scheduler.join
339341
else
340-
@paginated_search.do_run(output_queue)
342+
@query_executor.do_run(output_queue)
341343
end
342344
end
343345

344-
def push_hit(hit, output_queue)
345-
event = targeted_event_factory.new_event hit['_source']
346+
##
347+
# This can be called externally from the query_executor
348+
public
349+
def push_hit(hit, output_queue, root_field = '_source')
350+
event = targeted_event_factory.new_event hit[root_field]
346351
set_docinfo_fields(hit, event) if @docinfo
347352
decorate(event)
348353
output_queue << event
@@ -643,13 +648,20 @@ def setup_search_api
643648
@search_api
644649
end
645650

651+
end
646652

647-
@paginated_search = if @resolved_search_api == "search_after"
653+
def setup_query_executor
654+
@query_executor = case @response_type
655+
when 'hits'
656+
if @resolved_search_api == "search_after"
648657
LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self)
649658
else
650659
logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8
651660
LogStash::Inputs::Elasticsearch::Scroll.new(@client, self)
652661
end
662+
when 'aggregations'
663+
LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self)
664+
end
653665
end
654666

655667
module URIOrEmptyValidator
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
require 'logstash/helpers/loggable_try'
2+
3+
module LogStash
4+
module Inputs
5+
class Elasticsearch
6+
class Aggregation
7+
include LogStash::Util::Loggable
8+
9+
AGGREGATION_JOB = "aggregation"
10+
11+
def initialize(client, plugin)
12+
@client = client
13+
@plugin_params = plugin.params
14+
15+
@size = @plugin_params["size"]
16+
@query = @plugin_params["query"]
17+
@retries = @plugin_params["retries"]
18+
@agg_options = {
19+
:index => @index,
20+
:size => 0
21+
}.merge(:body => @query)
22+
23+
@plugin = plugin
24+
end
25+
26+
def retryable(job_name, &block)
27+
stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name)
28+
stud_try.try((@retries + 1).times) { yield }
29+
rescue => e
30+
error_details = {:message => e.message, :cause => e.cause}
31+
error_details[:backtrace] = e.backtrace if logger.debug?
32+
logger.error("Tried #{job_name} unsuccessfully", error_details)
33+
end
34+
35+
def do_run(output_queue)
36+
logger.info("Aggregation starting")
37+
r = retryable(AGGREGATION_JOB) do
38+
@client.search(@agg_options)
39+
end
40+
@plugin.push_hit(r, output_queue, 'aggregations')
41+
end
42+
end
43+
end
44+
end
45+
end

logstash-input-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-elasticsearch'
4-
s.version = '4.19.1'
4+
s.version = '4.20.0'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Reads query results from an Elasticsearch cluster"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/inputs/elasticsearch_spec.rb

Lines changed: 66 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,14 @@
112112
context "ES 8" do
113113
let(:es_version) { "8.10.0" }
114114
it "resolves `auto` to `search_after`" do
115-
expect(plugin.instance_variable_get(:@paginated_search)).to be_a LogStash::Inputs::Elasticsearch::SearchAfter
115+
expect(plugin.instance_variable_get(:@query_executor)).to be_a LogStash::Inputs::Elasticsearch::SearchAfter
116116
end
117117
end
118118

119119
context "ES 7" do
120120
let(:es_version) { "7.17.0" }
121121
it "resolves `auto` to `scroll`" do
122-
expect(plugin.instance_variable_get(:@paginated_search)).to be_a LogStash::Inputs::Elasticsearch::Scroll
122+
expect(plugin.instance_variable_get(:@query_executor)).to be_a LogStash::Inputs::Elasticsearch::Scroll
123123
end
124124
end
125125
end
@@ -268,7 +268,7 @@
268268
before { plugin.register }
269269

270270
it 'runs just one slice' do
271-
expect(plugin.instance_variable_get(:@paginated_search)).to receive(:search).with(duck_type(:<<), nil)
271+
expect(plugin.instance_variable_get(:@query_executor)).to receive(:search).with(duck_type(:<<), nil)
272272
expect(Thread).to_not receive(:new)
273273

274274
plugin.run([])
@@ -280,7 +280,7 @@
280280
before { plugin.register }
281281

282282
it 'runs just one slice' do
283-
expect(plugin.instance_variable_get(:@paginated_search)).to receive(:search).with(duck_type(:<<), nil)
283+
expect(plugin.instance_variable_get(:@query_executor)).to receive(:search).with(duck_type(:<<), nil)
284284
expect(Thread).to_not receive(:new)
285285

286286
plugin.run([])
@@ -295,7 +295,7 @@
295295
it "runs #{slice_count} independent slices" do
296296
expect(Thread).to receive(:new).and_call_original.exactly(slice_count).times
297297
slice_count.times do |slice_id|
298-
expect(plugin.instance_variable_get(:@paginated_search)).to receive(:search).with(duck_type(:<<), slice_id)
298+
expect(plugin.instance_variable_get(:@query_executor)).to receive(:search).with(duck_type(:<<), slice_id)
299299
end
300300

301301
plugin.run([])
@@ -423,8 +423,8 @@ def synchronize_method!(object, method_name)
423423
expect(client).to receive(:search).with(hash_including(:body => slice1_query)).and_return(slice1_response0)
424424
expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice1_scroll1 })).and_return(slice1_response1)
425425

426-
synchronize_method!(plugin.instance_variable_get(:@paginated_search), :next_page)
427-
synchronize_method!(plugin.instance_variable_get(:@paginated_search), :initial_search)
426+
synchronize_method!(plugin.instance_variable_get(:@query_executor), :next_page)
427+
synchronize_method!(plugin.instance_variable_get(:@query_executor), :initial_search)
428428
end
429429

430430
let(:client) { Elasticsearch::Client.new }
@@ -493,14 +493,14 @@ def synchronize_method!(object, method_name)
493493
expect(client).to receive(:search).with(hash_including(:body => slice1_query)).and_return(slice1_response0)
494494
expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice1_scroll1 })).and_raise("boom")
495495

496-
synchronize_method!(plugin.instance_variable_get(:@paginated_search), :next_page)
497-
synchronize_method!(plugin.instance_variable_get(:@paginated_search), :initial_search)
496+
synchronize_method!(plugin.instance_variable_get(:@query_executor), :next_page)
497+
synchronize_method!(plugin.instance_variable_get(:@query_executor), :initial_search)
498498
end
499499

500500
let(:client) { Elasticsearch::Client.new }
501501

502502
it 'insert event to queue without waiting other slices' do
503-
expect(plugin.instance_variable_get(:@paginated_search)).to receive(:search).twice.and_wrap_original do |m, *args|
503+
expect(plugin.instance_variable_get(:@query_executor)).to receive(:search).twice.and_wrap_original do |m, *args|
504504
q = args[0]
505505
slice_id = args[1]
506506
if slice_id == 0
@@ -1020,7 +1020,7 @@ def wait_receive_request
10201020

10211021
it "should properly schedule" do
10221022
begin
1023-
expect(plugin.instance_variable_get(:@paginated_search)).to receive(:do_run) {
1023+
expect(plugin.instance_variable_get(:@query_executor)).to receive(:do_run) {
10241024
queue << LogStash::Event.new({})
10251025
}.at_least(:twice)
10261026
runner = Thread.start { plugin.run(queue) }
@@ -1033,7 +1033,62 @@ def wait_receive_request
10331033
runner.join if runner
10341034
end
10351035
end
1036+
end
1037+
1038+
context "aggregations" do
1039+
let(:config) do
1040+
{
1041+
'hosts' => ["localhost"],
1042+
'query' => '{ "query": {}, "size": 0, "aggs":{"total_count": { "value_count": { "field": "type" }}, "empty_count": { "sum": { "field": "_meta.empty_event" }}}}',
1043+
'response_type' => 'aggregations',
1044+
'size' => 0
1045+
}
1046+
end
10361047

1048+
let(:mock_response) do
1049+
{
1050+
"took" => 27,
1051+
"timed_out" => false,
1052+
"_shards" => {
1053+
"total" => 169,
1054+
"successful" => 169,
1055+
"skipped" => 0,
1056+
"failed" => 0
1057+
},
1058+
"hits" => {
1059+
"total" => 10,
1060+
"max_score" => 1.0,
1061+
"hits" => []
1062+
},
1063+
"aggregations" => {
1064+
"total_counter" => {
1065+
"value" => 10
1066+
},
1067+
"empty_counter" => {
1068+
"value" => 5
1069+
},
1070+
}
1071+
}
1072+
end
1073+
1074+
before(:each) do
1075+
client = Elasticsearch::Client.new
1076+
1077+
expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
1078+
expect(client).to receive(:search).with(any_args).and_return(mock_response)
1079+
expect(client).to receive(:ping)
1080+
end
1081+
1082+
before { plugin.register }
1083+
1084+
it 'creates the events from the aggregations' do
1085+
plugin.run queue
1086+
event = queue.pop
1087+
1088+
expect(event).to be_a(LogStash::Event)
1089+
expect(event.get("[total_counter][value]")).to eql 10
1090+
expect(event.get("[empty_counter][value]")).to eql 5
1091+
end
10371092
end
10381093

10391094
context "retries" do

0 commit comments

Comments
 (0)