From e6278b82c3523872313dbfa2bea6a688ad39fd3a Mon Sep 17 00:00:00 2001
From: pemontto <pemontto@gmail.com>
Date: Tue, 21 Sep 2021 15:17:44 +0100
Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Add=20use=5Fmetadata=20option?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 docs/index.asciidoc                           |  23 ++++
 lib/logstash/outputs/elasticsearch.rb         |  11 ++
 spec/integration/outputs/index_spec.rb        |   2 +-
 .../integration/outputs/index_version_spec.rb |  27 ++++
 .../outputs/ingest_pipeline_spec.rb           | 119 +++++++++++++++++-
 5 files changed, 180 insertions(+), 2 deletions(-)

diff --git a/docs/index.asciidoc b/docs/index.asciidoc
index 031f2224..0240196e 100644
--- a/docs/index.asciidoc
+++ b/docs/index.asciidoc
@@ -367,6 +367,7 @@ This plugin supports the following configuration options plus the
 | <<plugins-{type}s-{plugin}-truststore_password>> |<<password,password>>|No
 | <<plugins-{type}s-{plugin}-upsert>> |<<string,string>>|No
 | <<plugins-{type}s-{plugin}-user>> |<<string,string>>|No
+| <<plugins-{type}s-{plugin}-use_metadata>> |<<boolean,boolean>>|No
 | <<plugins-{type}s-{plugin}-validate_after_inactivity>> |<<number,number>>|No
 | <<plugins-{type}s-{plugin}-version>> |<<string,string>>|No
 | <<plugins-{type}s-{plugin}-version_type>> |<<string,string>>, one of `["internal", "external", "external_gt", "external_gte", "force"]`|No
@@ -1095,6 +1096,28 @@ Create a new document with this parameter as json string if `document_id` doesn'
 
 Username to authenticate to a secure Elasticsearch cluster
 
+[id="plugins-{type}s-{plugin}-use_metadata"]
+===== `use_metadata`
+
+  * Value type is <<boolean,boolean>>
+  * Default value is `false`
+
+Use and preference output parameters defined in the document metadata. The <<plugins-{type}s-{plugin}-index>> (`@metadata._index`), <<plugins-{type}s-{plugin}-document_id>> (`@metadata._id_`), and <<plugins-{type}s-{plugin}-pipeline>> (`@metadata.pipeline`) can be set by their respective `@metadata` fields.
+
+E.g. to index a document to index `myindex` with id `myid` with the ingest pipeline `mypipeline`:
+
+[source,json]
+-----
+{
+  "message": "foo",
+  "@metadata": {
+    "_index": "myindex",
+    "_id": "myid",
+    "pipeline": "mypipeline"
+  }
+}
+-----
+
 [id="plugins-{type}s-{plugin}-validate_after_inactivity"]
 ===== `validate_after_inactivity`
 
diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb
index 25dd866a..48addfb9 100644
--- a/lib/logstash/outputs/elasticsearch.rb
+++ b/lib/logstash/outputs/elasticsearch.rb
@@ -251,6 +251,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
   # ILM policy to use, if undefined the default policy will be used.
   config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY
 
+  # ILM policy to use, if undefined the default policy will be used.
+  config :use_metadata, :validate => :boolean, :default => false
+
   attr_reader :client
   attr_reader :default_index
   attr_reader :default_ilm_rollover_alias
@@ -428,6 +431,14 @@ def common_event_params(event)
       params[:pipeline] = value unless value.empty?
     end
 
+    if @use_metadata
+      params[:_id] = event.get("[@metadata][_id]") || params[:_id]
+      event_index = event.get("[@metadata][_index]")
+      params[:_index] = event.sprintf(event_index) if event_index && !event_index.empty?
+      event_pipeline = event.get("[@metadata][pipeline]")
+      params[:pipeline] = event.sprintf(event_pipeline) if event_pipeline && !event_pipeline.empty?
+    end
+
     params
   end
 
diff --git a/spec/integration/outputs/index_spec.rb b/spec/integration/outputs/index_spec.rb
index 16dac139..f805dfd1 100644
--- a/spec/integration/outputs/index_spec.rb
+++ b/spec/integration/outputs/index_spec.rb
@@ -46,7 +46,7 @@
 end
 
 describe "indexing" do
-  let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type) }
+  let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type, "@metadata" => { "_id" => "test-id", "_index" => "test-index", "pipeline" => "test-pipeline" }) }
   let(:index) { 10.times.collect { rand(10).to_s }.join("") }
   let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" }
   let(:event_count) { 1 + rand(2) }
diff --git a/spec/integration/outputs/index_version_spec.rb b/spec/integration/outputs/index_version_spec.rb
index 0b1ecda9..62ea8fa4 100644
--- a/spec/integration/outputs/index_version_spec.rb
+++ b/spec/integration/outputs/index_version_spec.rb
@@ -94,5 +94,32 @@
         expect(r2["_source"]["message"]).to eq('foo')
       end
     end
+
+    describe "use metadata" do
+      let(:settings) do
+        {
+          "index" => "logstash-index",
+          "hosts" => get_host_port(),
+          "use_metadata" => true,
+        }
+      end
+
+      it "should use @metadata._id for document_id" do
+        id = "new_doc_id_1"
+        subject.multi_receive([LogStash::Event.new("@metadata" => { "_id" => id }, "message" => "foo")])
+        r = es.get(:index => "logstash-index", :type => doc_type, :id => id, :refresh => true)
+        expect(r["_id"]).to eq(id)
+        expect(r["_source"]["message"]).to eq("foo")
+      end
+      it "should use @metadata._index for index" do
+        id = "new_doc_id_2"
+        new_index = "logstash-index-new"
+        subject.multi_receive([LogStash::Event.new("@metadata" => { "_id" => id, "_index" => new_index }, "message" => "foo")])
+        r = es.get(:index => new_index, :type => doc_type, :id => id, :refresh => true)
+        expect(r["_id"]).to eq(id)
+        expect(r["_index"]).to eq(new_index)
+        expect(r["_source"]["message"]).to eq("foo")
+      end
+    end
   end
 end
diff --git a/spec/integration/outputs/ingest_pipeline_spec.rb b/spec/integration/outputs/ingest_pipeline_spec.rb
index c76fdaef..0073478d 100644
--- a/spec/integration/outputs/ingest_pipeline_spec.rb
+++ b/spec/integration/outputs/ingest_pipeline_spec.rb
@@ -60,7 +60,7 @@
   end
 
   it "indexes using the proper pipeline" do
-    results = @es.search(:index => 'logstash-*', :q => "message:\"netcat\"")
+    results = @es.search(:index => "logstash-*", :q => "message:\"netcat\"")
     expect(results).to have_hits(1)
     expect(results["hits"]["hits"][0]["_source"]["response"]).to eq("200")
     expect(results["hits"]["hits"][0]["_source"]["bytes"]).to eq("182")
@@ -72,3 +72,120 @@
     expect(results["hits"]["hits"][0]["_source"]["junkfieldaaaa"]).to eq(nil)
   end
 end
+
+describe "Ingest pipeline from metadata", :integration => true do
+  subject! do
+    require "logstash/outputs/elasticsearch"
+    settings = {
+      "hosts" => "#{get_host_port()}",
+      "pipeline" => "apache-logs",
+      "data_stream" => "false",
+      "use_metadata" => true,
+    }
+    next LogStash::Outputs::ElasticSearch.new(settings)
+  end
+
+  let(:http_client) { Manticore::Client.new }
+  let(:ingest_url) { "http://#{get_host_port()}/_ingest/pipeline/apache-logs" }
+  let(:apache_logs_pipeline) {
+    '
+  {
+    "description" : "Pipeline to parse Apache logs",
+    "processors" : [
+      {
+        "grok": {
+          "field": "message",
+          "patterns": ["%{COMBINEDAPACHELOG}"]
+        }
+      }
+    ]
+  }'
+  }
+
+  let(:add_field_ingest_url) { "http://#{get_host_port()}/_ingest/pipeline/add-field" }
+  let(:add_field_logs_pipeline) {
+    '
+  {
+    "description": "Add field foo with value bar",
+    "processors": [
+      {
+        "set": {
+          "field": "foo",
+          "value": "bar"
+        }
+      }
+    ]
+  }'
+  }
+
+  before :each do
+    # Delete all templates first.
+    require "elasticsearch"
+
+    # Clean ES of data before we start.
+    @es = get_client
+    @es.indices.delete_template(:name => "*")
+
+    # This can fail if there are no indexes, ignore failure.
+    @es.indices.delete(:index => "*") rescue nil
+
+    # delete existing ingest pipeline
+    http_client.delete(ingest_url).call
+
+    # register pipelines
+    http_client.put(ingest_url, :body => apache_logs_pipeline, :headers => { "Content-Type" => "application/json" }).call
+    http_client.put(add_field_ingest_url, :body => add_field_logs_pipeline, :headers => { "Content-Type" => "application/json" }).call
+
+    #TODO: Use esclient
+    #@es.ingest.put_pipeline :id => 'apache_pipeline', :body => pipeline_defintion
+
+    subject.register
+    subject.multi_receive([
+      LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'),
+      LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"', "@metadata" => { "_id" => "id1", "_index" => "index1", "pipeline" => "add-field" }),
+      LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"', "@metadata" => { "_id" => "id2", "_index" => "index2", "pipeline" => "" }),
+    ])
+    @es.indices.refresh
+
+    #Wait or fail until everything's indexed.
+    Stud::try(10.times) do
+      r = @es.search(index: "logstash-*")
+      expect(r).to have_hits(1)
+      r = @es.search(index: "index1")
+      expect(r).to have_hits(1)
+      r = @es.search(index: "index2")
+      expect(r).to have_hits(1)
+      sleep(0.1)
+    end
+  end
+
+  it "indexes using the correct pipeline when @metadata.pipeline not defined" do
+    results = @es.search(:index => "logstash-*", :q => "message:\"netcat\"")
+    expect(results).to have_hits(1)
+    expect(results["hits"]["hits"][0]["_source"]["response"]).to eq("200")
+    expect(results["hits"]["hits"][0]["_source"]["bytes"]).to eq("182")
+    expect(results["hits"]["hits"][0]["_source"]["verb"]).to eq("GET")
+    expect(results["hits"]["hits"][0]["_source"]["request"]).to eq("/scripts/netcat-webserver")
+    expect(results["hits"]["hits"][0]["_source"]["auth"]).to eq("-")
+    expect(results["hits"]["hits"][0]["_source"]["ident"]).to eq("-")
+    expect(results["hits"]["hits"][0]["_source"]["clientip"]).to eq("183.60.215.50")
+    expect(results["hits"]["hits"][0]["_source"]["junkfieldaaaa"]).to eq(nil)
+  end
+
+  it "indexes using the @metadata._index, @metadata._id, and @metadata.pipeline when defined" do
+    results = @es.search(:index => "index1", :q => "message:\"netcat\"")
+    expect(results).to have_hits(1)
+    expect(results["hits"]["hits"][0]["_id"]).to eq("id1")
+    expect(results["hits"]["hits"][0]["_index"]).to eq("index1")
+    expect(results["hits"]["hits"][0]["_source"]["foo"]).to eq("bar")
+  end
+
+  it "indexes ignore empty @metadata.pipeline values" do
+    results = @es.search(:index => "index2", :q => "message:\"netcat\"")
+    expect(results).to have_hits(1)
+    expect(results["hits"]["hits"][0]["_id"]).to eq("id2")
+    expect(results["hits"]["hits"][0]["_index"]).to eq("index2")
+    expect(results["hits"]["hits"][0]["_source"]).not_to include("foo")
+  end
+
+end