Skip to content

Commit e4f2209

Browse files
authored
Fix: DLQ expects us to pass down an event (#1012)
resolves a DLQ regression shipped in 11.0.0 (#1013)
1 parent 57fccbd commit e4f2209

File tree

5 files changed

+72
-13
lines changed

5 files changed

+72
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
## 11.0.1
2+
- Fix: DLQ regression shipped in 11.0.0 [#1012](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1012)
23
- [DOC] Fixed broken link in list item [#1011](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1011)
34

45
## 11.0.0

lib/logstash/outputs/elasticsearch.rb

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,21 @@ def event_action_tuple(event)
392392
params[:version] = event.sprintf(@version) if @version
393393
params[:version_type] = event.sprintf(@version_type) if @version_type
394394

395-
[action, params, event.to_hash]
395+
EventActionTuple.new(action, params, event)
396+
end
397+
398+
class EventActionTuple < Array # TODO: acting as an array for compatibility
399+
400+
def initialize(action, params, event, event_data = nil)
401+
super(3)
402+
self[0] = action
403+
self[1] = params
404+
self[2] = event_data || event.to_hash
405+
@event = event
406+
end
407+
408+
attr_reader :event
409+
396410
end
397411

398412
# @return Hash (initial) parameters for given event

lib/logstash/outputs/elasticsearch/data_stream_support.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def data_stream_default(data_stream_params, valid_data_stream_config)
146146
def data_stream_event_action_tuple(event)
147147
event_data = event.to_hash
148148
data_stream_event_sync(event_data) if data_stream_sync_fields
149-
['create', common_event_params(event), event_data] # action always 'create'
149+
EventActionTuple.new('create', common_event_params(event), event, event_data)
150150
end
151151

152152
DATA_STREAM_SYNC_FIELDS = [ 'type', 'dataset', 'namespace' ].freeze

lib/logstash/plugin_mixins/elasticsearch/common.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,9 @@ def next_sleep_interval(current_interval)
200200
def handle_dlq_status(message, action, status, response)
201201
# To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior)
202202
if @dlq_writer
203+
event, action = action.event, [action[0], action[1], action[2]]
203204
# TODO: Change this to send a map with { :status => status, :action => action } in the future
204-
@dlq_writer.write(action[2], "#{message} status: #{status}, action: #{action}, response: #{response}")
205+
@dlq_writer.write(event, "#{message} status: #{status}, action: #{action}, response: #{response}")
205206
else
206207
if dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception'
207208
level = :error

spec/unit/outputs/elasticsearch_spec.rb

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@
343343
}
344344
}]
345345
}
346-
end
346+
end
347347

348348
before(:each) do
349349
allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO), instance_of(Array)) do |stream, actions|
@@ -771,9 +771,9 @@
771771

772772
context 'when getting any other exception' do
773773
it 'should log at WARN level' do
774-
dlog = double_logger = double("logger").as_null_object
775-
subject.instance_variable_set(:@logger, dlog)
776-
expect(dlog).to receive(:warn).with(/Could not index/, hash_including(:status, :action, :response))
774+
logger = double("logger").as_null_object
775+
subject.instance_variable_set(:@logger, logger)
776+
expect(logger).to receive(:warn).with(/Could not index/, hash_including(:status, :action, :response))
777777
mock_response = { 'index' => { 'error' => { 'type' => 'illegal_argument_exception' } } }
778778
subject.handle_dlq_status("Could not index event to Elasticsearch.",
779779
[:action, :params, :event], :some_status, mock_response)
@@ -782,9 +782,9 @@
782782

783783
context 'when the response does not include [error]' do
784784
it 'should not fail, but just log a warning' do
785-
dlog = double_logger = double("logger").as_null_object
786-
subject.instance_variable_set(:@logger, dlog)
787-
expect(dlog).to receive(:warn).with(/Could not index/, hash_including(:status, :action, :response))
785+
logger = double("logger").as_null_object
786+
subject.instance_variable_set(:@logger, logger)
787+
expect(logger).to receive(:warn).with(/Could not index/, hash_including(:status, :action, :response))
788788
mock_response = { 'index' => {} }
789789
expect do
790790
subject.handle_dlq_status("Could not index event to Elasticsearch.",
@@ -804,12 +804,55 @@
804804
# We should still log when sending to the DLQ.
805805
# This shall be solved by another issue, however: logstash-output-elasticsearch#772
806806
it 'should send the event to the DLQ instead, and not log' do
807-
expect(dlq_writer).to receive(:write).once.with(:event, /Could not index/)
807+
event = LogStash::Event.new("foo" => "bar")
808+
expect(dlq_writer).to receive(:write).once.with(event, /Could not index/)
808809
mock_response = { 'index' => { 'error' => { 'type' => 'illegal_argument_exception' } } }
809-
subject.handle_dlq_status("Could not index event to Elasticsearch.",
810-
[:action, :params, :event], :some_status, mock_response)
810+
action = LogStash::Outputs::ElasticSearch::EventActionTuple.new(:action, :params, event)
811+
subject.handle_dlq_status("Could not index event to Elasticsearch.", action, 404, mock_response)
811812
end
812813
end
814+
815+
context 'with response status 400' do
816+
817+
let(:options) { super().merge 'document_id' => '%{foo}' }
818+
819+
let(:events) { [ LogStash::Event.new("foo" => "bar") ] }
820+
821+
let(:dlq_writer) { subject.instance_variable_get(:@dlq_writer) }
822+
823+
let(:bulk_response) do
824+
{
825+
"took"=>1, "ingest_took"=>11, "errors"=>true, "items"=>
826+
[{
827+
"index"=>{"_index"=>"bar", "_type"=>"_doc", "_id"=>'bar', "status"=>400,
828+
"error"=>{"type" => "illegal_argument_exception", "reason" => "TEST" }
829+
}
830+
}]
831+
}
832+
end
833+
834+
before(:each) do
835+
allow(subject.client).to receive(:bulk_send).and_return(bulk_response)
836+
end
837+
838+
it "should write event to DLQ" do
839+
expect(dlq_writer).to receive(:write).and_wrap_original do |method, *args|
840+
expect( args.size ).to eql 2
841+
842+
event, reason = *args
843+
expect( event ).to be_a LogStash::Event
844+
expect( event ).to be events.first
845+
expect( reason ).to start_with 'Could not index event to Elasticsearch. status: 400, action: ["index"'
846+
expect( reason ).to match /_id=>"bar".*"foo"=>"bar".*response:.*"reason"=>"TEST"/
847+
848+
method.call(*args) # won't hurt to call LogStash::Util::DummyDeadLetterQueueWriter
849+
end.once
850+
851+
event_action_tuples = subject.map_events(events)
852+
subject.send(:submit, event_action_tuples)
853+
end
854+
855+
end if LOGSTASH_VERSION > '7.0'
813856
end
814857

815858
describe "custom headers" do

0 commit comments

Comments
 (0)