Skip to content

Commit 4699a5c

Browse files
committed
move cursor tracking to separate class
1 parent c04401b commit 4699a5c

File tree

4 files changed

+118
-12
lines changed

4 files changed

+118
-12
lines changed

lib/logstash/inputs/elasticsearch.rb

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
7575

7676
require 'logstash/inputs/elasticsearch/paginated_search'
7777
require 'logstash/inputs/elasticsearch/aggregation'
78+
require 'logstash/inputs/elasticsearch/cursor_tracker'
7879

7980
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
8081
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
@@ -126,6 +127,22 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
126127
# by this pipeline input.
127128
config :slices, :validate => :number
128129

130+
# Enable tracking the value of a given field to be used as a cursor
131+
# TODO: main concerns
132+
# * schedule overlap needs to be disabled (hardcoded as enabled)
133+
# * using anything other than _event.timestamp easily leads to data loss
134+
# * the first "synchronization run can take a long time"
135+
# * checkpointing is only safe to do after each run (not per document)
136+
config :tracking_field, :validate => :string
137+
138+
# Define the initial seed value of the tracking_field
139+
config :tracking_field_seed, :validate => :string
140+
141+
# The location of where the tracking field value will be stored
142+
# The value is persisted after each scheduled run (and not per result)
143+
# If it's not set it defaults to '${path.data}/plugins/inputs/elasticsearch/last_run_value'
144+
config :last_run_metadata_path, :validate => :string
145+
129146
# If set, include Elasticsearch document information such as index, type, and
130147
# the id in the event.
131148
#
@@ -261,6 +278,10 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
261278
# exactly once.
262279
config :schedule, :validate => :string
263280

281+
# Allow scheduled runs to overlap (enabled by default). Setting to false will
282+
# only start a new scheduled run after the previous one completes.
283+
config :schedule_overlap, :validate => :string
284+
264285
# If set, the _source of each hit will be added nested under the target instead of at the top-level
265286
config :target, :validate => :field_reference
266287

@@ -331,18 +352,30 @@ def register
331352

332353
setup_query_executor
333354

355+
setup_cursor_tracker
356+
334357
@client
335358
end
336359

337360
def run(output_queue)
338361
if @schedule
339-
scheduler.cron(@schedule) { @query_executor.do_run(output_queue) }
362+
scheduler.cron(@schedule, :overlap => @schedule_overlap) do
363+
@query_executor.do_run(output_queue, get_query_object())
364+
@cursor_tracker.checkpoint_cursor
365+
end
340366
scheduler.join
341367
else
342-
@query_executor.do_run(output_queue)
368+
@query_executor.do_run(output_queue, get_query_object())
369+
@cursor_tracker.checkpoint_cursor
343370
end
344371
end
345372

373+
def get_query_object
374+
injected_query = @cursor_tracker.inject_cursor(@query)
375+
@logger.debug("new query is #{injected_query}")
376+
query_object = LogStash::Json.load(injected_query)
377+
end
378+
346379
##
347380
# This can be called externally from the query_executor
348381
public
@@ -351,6 +384,7 @@ def push_hit(hit, output_queue, root_field = '_source')
351384
set_docinfo_fields(hit, event) if @docinfo
352385
decorate(event)
353386
output_queue << event
387+
@cursor_tracker.record_last_value(event)
354388
end
355389

356390
def set_docinfo_fields(hit, event)
@@ -664,6 +698,17 @@ def setup_query_executor
664698
end
665699
end
666700

701+
def setup_cursor_tracker
702+
if @tracking_field
703+
@tracking_field_seed ||= Time.now.utc.iso8601
704+
@cursor_tracker = CursorTracker.new(last_run_metadata_path: @last_run_metadata_path,
705+
tracking_field: @tracking_field,
706+
tracking_field_seed: @tracking_field_seed)
707+
else
708+
@cursor_tracker = NoopCursorTracker.new
709+
end
710+
end
711+
667712
module URIOrEmptyValidator
668713
##
669714
# @override to provide :uri_or_empty validator

lib/logstash/inputs/elasticsearch/aggregation.rb

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,7 @@ def initialize(client, plugin)
1313
@plugin_params = plugin.params
1414

1515
@size = @plugin_params["size"]
16-
@query = @plugin_params["query"]
1716
@retries = @plugin_params["retries"]
18-
@agg_options = {
19-
:index => @index,
20-
:size => 0
21-
}.merge(:body => @query)
2217

2318
@plugin = plugin
2419
end
@@ -33,10 +28,18 @@ def retryable(job_name, &block)
3328
false
3429
end
3530

36-
def do_run(output_queue)
31+
def aggregation_options(query_object)
32+
{
33+
:index => @index,
34+
:size => 0,
35+
:body => query_object
36+
}
37+
end
38+
39+
def do_run(output_queue, query_object)
3740
logger.info("Aggregation starting")
3841
r = retryable(AGGREGATION_JOB) do
39-
@client.search(@agg_options)
42+
@client.search(aggregation_options(query_object))
4043
end
4144
@plugin.push_hit(r, output_queue, 'aggregations') if r
4245
end
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
require 'fileutils'
2+
3+
module LogStash; module Inputs; class Elasticsearch
4+
class NoopCursorTracker
5+
include LogStash::Util::Loggable
6+
def checkpoint_cursor; end
7+
8+
def converge_last_value; end
9+
10+
def record_last_value(event); end
11+
12+
def inject_cursor(query_json); return query_json; end
13+
end
14+
15+
class CursorTracker
16+
include LogStash::Util::Loggable
17+
18+
attr_reader :last_value
19+
20+
def initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:)
21+
@last_run_metadata_path = last_run_metadata_path
22+
@last_run_metadata_path ||= ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "elasticsearch", "last_run_value")
23+
FileUtils.mkdir_p ::File.dirname(@last_run_metadata_path)
24+
@last_value_hashmap = Java::java.util.concurrent.ConcurrentHashMap.new
25+
@last_value = IO.read(@last_run_metadata_path) rescue nil || tracking_field_seed
26+
@tracking_field = tracking_field
27+
logger.info "Starting value for cursor field \"#{@tracking_field}\": #{@last_value}"
28+
end
29+
30+
def checkpoint_cursor
31+
converge_last_value
32+
IO.write(@last_run_metadata_path, @last_value)
33+
@last_value_hashmap.clear
34+
end
35+
36+
def converge_last_value
37+
return if @last_value_hashmap.empty?
38+
new_last_value = @last_value_hashmap.reduceValues(1, lambda { |v1, v2| Time.parse(v1) < Time.parse(v2) ? v2 : v1 })
39+
return if new_last_value == @last_value
40+
@last_value = new_last_value
41+
logger.info "New cursor value for field \"#{@tracking_field}\" is: #{new_last_value}"
42+
end
43+
44+
def record_last_value(event)
45+
value = event.get(@tracking_field)
46+
logger.trace? && logger.trace("storing last_value if #{@tracking_field} for #{Thread.current.object_id}: #{value}")
47+
@last_value_hashmap.put(Thread.current.object_id, value)
48+
end
49+
50+
def inject_cursor(query_json)
51+
query_json.gsub(":last_value", @last_value)
52+
end
53+
end
54+
end; end; end

lib/logstash/inputs/elasticsearch/paginated_search.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@ def initialize(client, plugin)
2121
@pipeline_id = plugin.pipeline_id
2222
end
2323

24-
def do_run(output_queue)
25-
return retryable_search(output_queue) if @slices.nil? || @slices <= 1
24+
def do_run(output_queue, query)
25+
@query = query
2626

27-
retryable_slice_search(output_queue)
27+
if @slices.nil? || @slices <= 1
28+
retryable_search(output_queue)
29+
else
30+
retryable_slice_search(output_queue)
31+
end
2832
end
2933

3034
def retryable(job_name, &block)

0 commit comments

Comments
 (0)