Skip to content

Commit 4b9f5ea

Browse files
committed
add :present injection to safeguard against PIT right-edge inconsistencies
1 parent 8329f59 commit 4b9f5ea

File tree

2 files changed

+7
-5
lines changed

2 files changed

+7
-5
lines changed

lib/logstash/inputs/elasticsearch.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
129129

130130
# Enable tracking the value of a given field to be used as a cursor
131131
# TODO: main concerns
132-
# * schedule overlap needs to be disabled (hardcoded as enabled)
133132
# * using anything other than _event.timestamp easily leads to data loss
134133
# * the first "synchronization run can take a long time"
135134
# * checkpointing is only safe to do after each run (not per document)
@@ -271,7 +270,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
271270

272271
# Allow scheduled runs to overlap (enabled by default). Setting to false will
273272
# only start a new scheduled run after the previous one completes.
274-
config :schedule_overlap, :validate => :string
273+
config :schedule_overlap, :validate => :boolean
275274

276275
# If set, the _source of each hit will be added nested under the target instead of at the top-level
277276
config :target, :validate => :field_reference

lib/logstash/inputs/elasticsearch/cursor_tracker.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ def checkpoint_cursor
3535

3636
def converge_last_value
3737
return if @last_value_hashmap.empty?
38-
# TODO this implicitly assumes that the way to converge the value among slices is to pick the highest and we can't assume that
39-
new_last_value = @last_value_hashmap.reduceValues(1, lambda { |v1, v2| Time.parse(v1) < Time.parse(v2) ? v2 : v1 })
38+
new_last_value = @last_value_hashmap.reduceValues(1000, lambda { |v1, v2| Java::java.time.Instant.parse(v1).isBefore(Java::java.time.Instant.parse(v2)) ? v2 : v1 })
39+
logger.trace? && logger.trace("converge_last_value: got #{@last_value_hashmap.values.inspect}. won: #{new_last_value}")
4040
return if new_last_value == @last_value
4141
@last_value = new_last_value
4242
logger.info "New cursor value for field \"#{@tracking_field}\" is: #{new_last_value}"
@@ -49,7 +49,10 @@ def record_last_value(event)
4949
end
5050

5151
def inject_cursor(query_json)
52-
query_json.gsub(":last_value", @last_value.to_s)
52+
# ":present" means "now - 30s" to avoid grabbing partially visible data in the PIT
53+
result = query_json.gsub(":last_value", @last_value.to_s).gsub(":present", Java::java.time.Instant.now.minusSeconds(30).to_s)
54+
logger.debug("inject_cursor: injected values for ':last_value' and ':present'", :query => result)
55+
result
5356
end
5457
end
5558
end; end; end

0 commit comments

Comments
 (0)