Skip to content

Commit 1d5ee93

Browse files
committed
Add max_retries option to the configuration
This option makes possible to set a maximum number the plugin will retry writes for. It defaults to retry forever (negative max_retries) to keep backward compatibility.
1 parent 1fb9e92 commit 1d5ee93

File tree

2 files changed

+36
-4
lines changed

2 files changed

+36
-4
lines changed

lib/logstash/outputs/mongodb.rb

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base
2929
# The number of seconds to wait after failure before retrying.
3030
config :retry_delay, :validate => :number, :default => 3, :required => false
3131

32+
# The maximum number of times we should retry for.
33+
#
34+
# If not present the plugin will retry forever. This is the default.
35+
config :max_retries, :validate => :number, :default => -1, :required => false
36+
3237
# If true, an "_id" field will be added to the document before insertion.
3338
# The "_id" field will use the timestamp of the event and overwrite an existing
3439
# "_id" field in the event.
@@ -144,6 +149,9 @@ def validate_action(action, filter, update_expressions)
144149
end
145150

146151
def receive(event)
152+
153+
retry_count = 0
154+
147155
action = event.sprintf(@action)
148156

149157
validate_action(action, @filter, @update_expressions)
@@ -194,9 +202,23 @@ def receive(event)
194202
end
195203
else
196204
result = write_to_mongodb(collection, [document])
197-
@logger.debug("Bulk write result", :result => result)
205+
@logger.debug("Bulk write result: #{result.to_s}")
198206
end
207+
199208
rescue => e
209+
logger_data = {:collection => collection,
210+
:document => document,
211+
:action => action,
212+
:filter => document["metadata_mongodb_output_filter"],
213+
:update_expressions => document["metadata_mongodb_output_update_expressions"]}
214+
215+
if (e.is_a? Mongo::Error::BulkWriteError)
216+
logger_data["result"] = e.result
217+
end
218+
219+
@logger.debug("Error: #{e.message}", logger_data)
220+
@logger.trace("Error backtrace", backtrace: e.backtrace)
221+
200222
if e.message =~ /^E11000/
201223
# On a duplicate key error, skip the insert.
202224
# We could check if the duplicate key err is the _id key
@@ -205,9 +227,13 @@ def receive(event)
205227
# to fix the issue.
206228
@logger.warn("Skipping insert because of a duplicate key error", :event => event, :exception => e)
207229
else
208-
@logger.warn("Failed to send event to MongoDB retrying in #{@retry_delay.to_s} seconds", :result=> e.result, :message => e.message)
209-
sleep(@retry_delay)
210-
retry
230+
# if max_retries is negative we retry forever
231+
if (@max_retries < 0 || retry_count < @max_retries)
232+
retry_count += 1
233+
@logger.warn("Failed to send event to MongoDB retrying (#{retry_count.to_s}) in #{@retry_delay.to_s} seconds")
234+
sleep(@retry_delay)
235+
retry
236+
end
211237
end
212238
end
213239
end

spec/spec_helper.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
require "logstash/devutils/rspec/spec_helper"
33
require "logstash/outputs/mongodb"
44

5+
if ENV["TEST_DEBUG"]
6+
LogStash::Logging::Logger::configure_logging("DEBUG")
7+
else
8+
LogStash::Logging::Logger::configure_logging("OFF")
9+
end
10+
511
RSpec.configure do |config|
612
config.example_status_persistence_file_path = 'spec/test-report.txt'
713
end

0 commit comments

Comments
 (0)