Skip to content

Added upsert option to config of the plugin that allows to use replac… #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion lib/logstash/outputs/mongodb.rb
Original file line number Diff line number Diff line change
@@ -43,6 +43,10 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base
# whatever the bulk interval value (mongodb hard limit is 1000).
config :bulk_size, :validate => :number, :default => 900, :maximum => 999, :min => 2

# Upsert documents flag, set to true to use replace_one instead of insert_one. This setting is ignored when bulk
# insert is used
config :upsert, :validate => :boolean, :default => false

# Mutex used to synchronize access to 'documents'
@@mutex = Mutex.new

@@ -108,7 +112,11 @@ def receive(event)
end
end
else
@db[event.sprintf(@collection)].insert_one(document)
if @upsert && document.key?("_id")
@db[event.sprintf(@collection)].replace_one({ _id: document["_id"] }, document, { upsert: true })
else
@db[event.sprintf(@collection)].insert_one(document)
end
end
rescue => e
if e.message =~ /^E11000/
154 changes: 107 additions & 47 deletions spec/outputs/mongodb_spec.rb
Original file line number Diff line number Diff line change
@@ -21,65 +21,125 @@
end

describe "receive" do
subject! { LogStash::Outputs::Mongodb.new(config) }

let(:event) { LogStash::Event.new(properties) }
let(:connection) { double("connection") }
let(:client) { double("client") }
let(:collection) { double("collection") }

before(:each) do
allow(Mongo::Client).to receive(:new).and_return(connection)
allow(connection).to receive(:use).and_return(client)
allow(client).to receive(:[]).and_return(collection)
allow(collection).to receive(:insert_one)
subject.register
end
describe "insert to mongodb" do
subject! { LogStash::Outputs::Mongodb.new(config) }

after(:each) do
subject.close
end
let(:event) { LogStash::Event.new(properties) }
let(:connection) { double("connection") }
let(:client) { double("client") }
let(:collection) { double("collection") }

describe "#send" do
let(:properties) {{
"message" => "This is a message!",
"uuid" => SecureRandom.uuid,
"number" => BigDecimal.new("4321.1234"),
"utf8" => "żółć"
}}
before(:each) do
allow(Mongo::Client).to receive(:new).and_return(connection)
allow(connection).to receive(:use).and_return(client)
allow(client).to receive(:[]).and_return(collection)
allow(collection).to receive(:insert_one)
subject.register
end

it "should send the event to the database" do
expect(collection).to receive(:insert_one)
subject.receive(event)
after(:each) do
subject.close
end

describe "#send" do
let(:properties) {{
"message" => "This is a message!",
"uuid" => SecureRandom.uuid,
"number" => BigDecimal.new("4321.1234"),
"utf8" => "żółć"
}}

it "should send the event to the database" do
expect(collection).to receive(:insert_one)
subject.receive(event)
end
end

describe "no event @timestamp" do
let(:properties) { { "message" => "foo" } }

it "should not contain a @timestamp field in the mongo document" do
expect(event).to receive(:timestamp).and_return(nil)
expect(event).to receive(:to_hash).and_return(properties)
expect(collection).to receive(:insert_one).with(properties)
subject.receive(event)
end
end
end

describe "no event @timestamp" do
let(:properties) { { "message" => "foo" } }
describe "generateId" do
let(:properties) { { "message" => "foo" } }
let(:config) {{
"uri" => uri,
"database" => database,
"collection" => collection,
"generateId" => true
}}

it "should not contain a @timestamp field in the mongo document" do
expect(event).to receive(:timestamp).and_return(nil)
expect(event).to receive(:to_hash).and_return(properties)
expect(collection).to receive(:insert_one).with(properties)
subject.receive(event)
it "should contain a BSON::ObjectId as _id" do
expect(BSON::ObjectId).to receive(:new).and_return("BSON::ObjectId")
expect(event).to receive(:timestamp).and_return(nil)
expect(event).to receive(:to_hash).and_return(properties)
expect(collection).to receive(:insert_one).with(properties.merge("_id" => "BSON::ObjectId"))
subject.receive(event)
end
end
end

describe "generateId" do
let(:properties) { { "message" => "foo" } }
describe "upsert/insert to mondodb" do
let(:config) {{
"uri" => uri,
"database" => database,
"collection" => collection,
"generateId" => true
"uri" => uri,
"database" => database,
"collection" => collection,
"upsert" => true
}}

it "should contain a BSON::ObjectId as _id" do
expect(BSON::ObjectId).to receive(:new).and_return("BSON::ObjectId")
expect(event).to receive(:timestamp).and_return(nil)
expect(event).to receive(:to_hash).and_return(properties)
expect(collection).to receive(:insert_one).with(properties.merge("_id" => "BSON::ObjectId"))
subject.receive(event)
subject! { LogStash::Outputs::Mongodb.new(config) }

let(:event) { LogStash::Event.new(properties) }
let(:connection) { double("connection") }
let(:client) { double("client") }
let(:collection) { double("collection") }

before(:each) do
allow(Mongo::Client).to receive(:new).and_return(connection)
allow(connection).to receive(:use).and_return(client)
allow(client).to receive(:[]).and_return(collection)
allow(collection).to receive(:insert_one)
allow(collection).to receive(:replace_one)
subject.register
end

after(:each) do
subject.close
end

describe "#send with _id" do
let(:properties) {{
"_id" => "esisting_id",
"message" => "This is a message!",
"uuid" => SecureRandom.uuid,
"number" => BigDecimal.new("4321.1234"),
"utf8" => "żółć"
}}

it "should send the event to the database using replace_one" do
expect(collection).to receive(:replace_one)
subject.receive(event)
end
end

describe "#send without _id" do
let(:properties) {{
"message" => "This is a message!",
"uuid" => SecureRandom.uuid,
"number" => BigDecimal.new("4321.1234"),
"utf8" => "żółć"
}}

it "should send the event to the database using insert_one" do
expect(collection).to receive(:insert_one)
subject.receive(event)
end
end
end
end