diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index 0b88c68..e258029 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -43,6 +43,10 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base # whatever the bulk interval value (mongodb hard limit is 1000). config :bulk_size, :validate => :number, :default => 900, :maximum => 999, :min => 2 + # Upsert documents flag, set to true to use replace_one instead of insert_one. This setting is ignored when bulk + # insert is used + config :upsert, :validate => :boolean, :default => false + # Mutex used to synchronize access to 'documents' @@mutex = Mutex.new @@ -108,7 +112,11 @@ def receive(event) end end else - @db[event.sprintf(@collection)].insert_one(document) + if @upsert && document.key?("_id") + @db[event.sprintf(@collection)].replace_one({ _id: document["_id"] }, document, { upsert: true }) + else + @db[event.sprintf(@collection)].insert_one(document) + end end rescue => e if e.message =~ /^E11000/ diff --git a/spec/outputs/mongodb_spec.rb b/spec/outputs/mongodb_spec.rb index 8d3decb..bb58950 100644 --- a/spec/outputs/mongodb_spec.rb +++ b/spec/outputs/mongodb_spec.rb @@ -21,65 +21,125 @@ end describe "receive" do - subject! { LogStash::Outputs::Mongodb.new(config) } - - let(:event) { LogStash::Event.new(properties) } - let(:connection) { double("connection") } - let(:client) { double("client") } - let(:collection) { double("collection") } - - before(:each) do - allow(Mongo::Client).to receive(:new).and_return(connection) - allow(connection).to receive(:use).and_return(client) - allow(client).to receive(:[]).and_return(collection) - allow(collection).to receive(:insert_one) - subject.register - end + describe "insert to mongodb" do + subject! { LogStash::Outputs::Mongodb.new(config) } - after(:each) do - subject.close - end + let(:event) { LogStash::Event.new(properties) } + let(:connection) { double("connection") } + let(:client) { double("client") } + let(:collection) { double("collection") } - describe "#send" do - let(:properties) {{ - "message" => "This is a message!", - "uuid" => SecureRandom.uuid, - "number" => BigDecimal.new("4321.1234"), - "utf8" => "żółć" - }} + before(:each) do + allow(Mongo::Client).to receive(:new).and_return(connection) + allow(connection).to receive(:use).and_return(client) + allow(client).to receive(:[]).and_return(collection) + allow(collection).to receive(:insert_one) + subject.register + end - it "should send the event to the database" do - expect(collection).to receive(:insert_one) - subject.receive(event) + after(:each) do + subject.close + end + + describe "#send" do + let(:properties) {{ + "message" => "This is a message!", + "uuid" => SecureRandom.uuid, + "number" => BigDecimal.new("4321.1234"), + "utf8" => "żółć" + }} + + it "should send the event to the database" do + expect(collection).to receive(:insert_one) + subject.receive(event) + end + end + + describe "no event @timestamp" do + let(:properties) { { "message" => "foo" } } + + it "should not contain a @timestamp field in the mongo document" do + expect(event).to receive(:timestamp).and_return(nil) + expect(event).to receive(:to_hash).and_return(properties) + expect(collection).to receive(:insert_one).with(properties) + subject.receive(event) + end end - end - describe "no event @timestamp" do - let(:properties) { { "message" => "foo" } } + describe "generateId" do + let(:properties) { { "message" => "foo" } } + let(:config) {{ + "uri" => uri, + "database" => database, + "collection" => collection, + "generateId" => true + }} - it "should not contain a @timestamp field in the mongo document" do - expect(event).to receive(:timestamp).and_return(nil) - expect(event).to receive(:to_hash).and_return(properties) - expect(collection).to receive(:insert_one).with(properties) - subject.receive(event) + it "should contain a BSON::ObjectId as _id" do + expect(BSON::ObjectId).to receive(:new).and_return("BSON::ObjectId") + expect(event).to receive(:timestamp).and_return(nil) + expect(event).to receive(:to_hash).and_return(properties) + expect(collection).to receive(:insert_one).with(properties.merge("_id" => "BSON::ObjectId")) + subject.receive(event) + end end end - describe "generateId" do - let(:properties) { { "message" => "foo" } } + describe "upsert/insert to mondodb" do let(:config) {{ - "uri" => uri, - "database" => database, - "collection" => collection, - "generateId" => true + "uri" => uri, + "database" => database, + "collection" => collection, + "upsert" => true }} - it "should contain a BSON::ObjectId as _id" do - expect(BSON::ObjectId).to receive(:new).and_return("BSON::ObjectId") - expect(event).to receive(:timestamp).and_return(nil) - expect(event).to receive(:to_hash).and_return(properties) - expect(collection).to receive(:insert_one).with(properties.merge("_id" => "BSON::ObjectId")) - subject.receive(event) + subject! { LogStash::Outputs::Mongodb.new(config) } + + let(:event) { LogStash::Event.new(properties) } + let(:connection) { double("connection") } + let(:client) { double("client") } + let(:collection) { double("collection") } + + before(:each) do + allow(Mongo::Client).to receive(:new).and_return(connection) + allow(connection).to receive(:use).and_return(client) + allow(client).to receive(:[]).and_return(collection) + allow(collection).to receive(:insert_one) + allow(collection).to receive(:replace_one) + subject.register + end + + after(:each) do + subject.close + end + + describe "#send with _id" do + let(:properties) {{ + "_id" => "esisting_id", + "message" => "This is a message!", + "uuid" => SecureRandom.uuid, + "number" => BigDecimal.new("4321.1234"), + "utf8" => "żółć" + }} + + it "should send the event to the database using replace_one" do + expect(collection).to receive(:replace_one) + subject.receive(event) + end + end + + describe "#send without _id" do + let(:properties) {{ + "message" => "This is a message!", + "uuid" => SecureRandom.uuid, + "number" => BigDecimal.new("4321.1234"), + "utf8" => "żółć" + }} + + it "should send the event to the database using insert_one" do + expect(collection).to receive(:insert_one) + subject.receive(event) + end end end end