Skip to content

Commit 0ac0a13

Browse files
committed
Add cursor to follow new data on an index through a tracked field
1 parent 5f6e8da commit 0ac0a13

File tree

4 files changed

+118
-13
lines changed

4 files changed

+118
-13
lines changed

lib/logstash/inputs/elasticsearch.rb

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
7575

7676
require 'logstash/inputs/elasticsearch/paginated_search'
7777
require 'logstash/inputs/elasticsearch/aggregation'
78+
require 'logstash/inputs/elasticsearch/cursor_tracker'
7879

7980
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
8081
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
@@ -126,6 +127,22 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
126127
# by this pipeline input.
127128
config :slices, :validate => :number
128129

130+
# Enable tracking the value of a given field to be used as a cursor
131+
# TODO: main concerns
132+
# * schedule overlap needs to be disabled (hardcoded as enabled)
133+
# * using anything other than _event.timestamp easily leads to data loss
134+
# * the first "synchronization run can take a long time"
135+
# * checkpointing is only safe to do after each run (not per document)
136+
config :tracking_field, :validate => :string
137+
138+
# Define the initial seed value of the tracking_field
139+
config :tracking_field_seed, :validate => :string
140+
141+
# The location of where the tracking field value will be stored
142+
# The value is persisted after each scheduled run (and not per result)
143+
# If it's not set it defaults to '${path.data}/plugins/inputs/elasticsearch/last_run_value'
144+
config :last_run_metadata_path, :validate => :string
145+
129146
# If set, include Elasticsearch document information such as index, type, and
130147
# the id in the event.
131148
#
@@ -252,6 +269,10 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
252269
# exactly once.
253270
config :schedule, :validate => :string
254271

272+
# Allow scheduled runs to overlap (enabled by default). Setting to false will
273+
# only start a new scheduled run after the previous one completes.
274+
config :schedule_overlap, :validate => :string
275+
255276
# If set, the _source of each hit will be added nested under the target instead of at the top-level
256277
config :target, :validate => :field_reference
257278

@@ -330,25 +351,38 @@ def register
330351

331352
setup_query_executor
332353

354+
setup_cursor_tracker
355+
333356
@client
334357
end
335358

336359
def run(output_queue)
337360
if @schedule
338-
scheduler.cron(@schedule) { @query_executor.do_run(output_queue) }
361+
scheduler.cron(@schedule, :overlap => @schedule_overlap) do
362+
@query_executor.do_run(output_queue, get_query_object())
363+
@cursor_tracker.checkpoint_cursor
364+
end
339365
scheduler.join
340366
else
341-
@query_executor.do_run(output_queue)
367+
@query_executor.do_run(output_queue, get_query_object())
368+
@cursor_tracker.checkpoint_cursor
342369
end
343370
end
344371

372+
def get_query_object
373+
injected_query = @cursor_tracker.inject_cursor(@query)
374+
@logger.debug("new query is #{injected_query}")
375+
query_object = LogStash::Json.load(injected_query)
376+
end
377+
345378
##
346379
# This can be called externally from the query_executor
347380
public
348381
def push_hit(hit, output_queue, root_field = '_source')
349382
event = event_from_hit(hit, root_field)
350383
decorate(event)
351384
output_queue << event
385+
@cursor_tracker.record_last_value(event)
352386
end
353387

354388
def event_from_hit(hit, root_field)
@@ -642,6 +676,17 @@ def setup_query_executor
642676
end
643677
end
644678

679+
def setup_cursor_tracker
680+
if @tracking_field
681+
@tracking_field_seed ||= Time.now.utc.iso8601
682+
@cursor_tracker = CursorTracker.new(last_run_metadata_path: @last_run_metadata_path,
683+
tracking_field: @tracking_field,
684+
tracking_field_seed: @tracking_field_seed)
685+
else
686+
@cursor_tracker = NoopCursorTracker.new
687+
end
688+
end
689+
645690
module URIOrEmptyValidator
646691
##
647692
# @override to provide :uri_or_empty validator

lib/logstash/inputs/elasticsearch/aggregation.rb

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,7 @@ def initialize(client, plugin)
1313
@plugin_params = plugin.params
1414

1515
@size = @plugin_params["size"]
16-
@query = @plugin_params["query"]
1716
@retries = @plugin_params["retries"]
18-
@agg_options = {
19-
:index => @plugin_params["index"],
20-
:size => 0
21-
}.merge(:body => @query)
22-
2317
@plugin = plugin
2418
end
2519

@@ -33,10 +27,18 @@ def retryable(job_name, &block)
3327
false
3428
end
3529

36-
def do_run(output_queue)
30+
def aggregation_options(query_object)
31+
{
32+
:index => @index,
33+
:size => 0,
34+
:body => query_object
35+
}
36+
end
37+
38+
def do_run(output_queue, query_object)
3739
logger.info("Aggregation starting")
3840
r = retryable(AGGREGATION_JOB) do
39-
@client.search(@agg_options)
41+
@client.search(aggregation_options(query_object))
4042
end
4143
@plugin.push_hit(r, output_queue, 'aggregations') if r
4244
end
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
require 'fileutils'
2+
3+
module LogStash; module Inputs; class Elasticsearch
4+
class NoopCursorTracker
5+
include LogStash::Util::Loggable
6+
def checkpoint_cursor; end
7+
8+
def converge_last_value; end
9+
10+
def record_last_value(event); end
11+
12+
def inject_cursor(query_json); return query_json; end
13+
end
14+
15+
class CursorTracker
16+
include LogStash::Util::Loggable
17+
18+
attr_reader :last_value
19+
20+
def initialize(last_run_metadata_path:, tracking_field:, tracking_field_seed:)
21+
@last_run_metadata_path = last_run_metadata_path
22+
@last_run_metadata_path ||= ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "elasticsearch", "last_run_value")
23+
FileUtils.mkdir_p ::File.dirname(@last_run_metadata_path)
24+
@last_value_hashmap = Java::java.util.concurrent.ConcurrentHashMap.new
25+
@last_value = IO.read(@last_run_metadata_path) rescue nil || tracking_field_seed
26+
@tracking_field = tracking_field
27+
logger.info "Starting value for cursor field \"#{@tracking_field}\": #{@last_value}"
28+
end
29+
30+
def checkpoint_cursor
31+
converge_last_value
32+
IO.write(@last_run_metadata_path, @last_value)
33+
@last_value_hashmap.clear
34+
end
35+
36+
def converge_last_value
37+
return if @last_value_hashmap.empty?
38+
new_last_value = @last_value_hashmap.reduceValues(1, lambda { |v1, v2| Time.parse(v1) < Time.parse(v2) ? v2 : v1 })
39+
return if new_last_value == @last_value
40+
@last_value = new_last_value
41+
logger.info "New cursor value for field \"#{@tracking_field}\" is: #{new_last_value}"
42+
end
43+
44+
def record_last_value(event)
45+
value = event.get(@tracking_field)
46+
logger.trace? && logger.trace("storing last_value if #{@tracking_field} for #{Thread.current.object_id}: #{value}")
47+
@last_value_hashmap.put(Thread.current.object_id, value)
48+
end
49+
50+
def inject_cursor(query_json)
51+
query_json.gsub(":last_value", @last_value)
52+
end
53+
end
54+
end; end; end

lib/logstash/inputs/elasticsearch/paginated_search.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@ def initialize(client, plugin)
2121
@pipeline_id = plugin.pipeline_id
2222
end
2323

24-
def do_run(output_queue)
25-
return retryable_search(output_queue) if @slices.nil? || @slices <= 1
24+
def do_run(output_queue, query)
25+
@query = query
2626

27-
retryable_slice_search(output_queue)
27+
if @slices.nil? || @slices <= 1
28+
retryable_search(output_queue)
29+
else
30+
retryable_slice_search(output_queue)
31+
end
2832
end
2933

3034
def retryable(job_name, &block)

0 commit comments

Comments
 (0)