Skip to content

Commit ae980ca

Browse files
authored
Only use the type field for the event for ES versions < 6.0 (logstash-plugins#712)
Only use the type field for the event for ES versions < 6.0. Since 6.0 only supports a single type this behavior can be problematic as multiple types will break ES. Fixes logstash-plugins#708 * fix tests to support new maximum_node_version feature and type computation * fix integration tests for new type decider
1 parent e733795 commit ae980ca

File tree

13 files changed

+165
-127
lines changed

13 files changed

+165
-127
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 9.0.2
2+
- Ignore event's type field for the purpose of setting document `_type` if cluster is es 6.x or above
3+
14
## 9.0.1
25
- Update gemspec summary
36

docs/index.asciidoc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,10 @@ Note: This option is deprecated due to the https://www.elastic.co/guide/en/elast
242242
It will be removed in the next major version of Logstash.
243243
This sets the document type to write events to. Generally you should try to write only
244244
similar events to the same 'type'. String expansion `%{foo}` works here.
245-
Unless you set 'document_type', the event 'type' will be used if it exists
246-
otherwise the document type will be assigned the value of 'doc'.
245+
If you don't set a value for this option:
246+
247+
- for elasticsearch clusters 6.x and above: the value of 'doc' will be used;
248+
- for elasticsearch clusters 5.x and below: the event's 'type' field will be used, if the field is not present the value of 'doc' will be used.
247249

248250
[id="plugins-{type}s-{plugin}-failure_type_logging_whitelist"]
249251
===== `failure_type_logging_whitelist`

lib/logstash/outputs/elasticsearch/common.rb

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ def register
2222

2323
setup_hosts # properly sets @hosts
2424
build_client
25+
2526
install_template
2627
check_action_validity
2728

@@ -48,6 +49,10 @@ def setup_hosts
4849
end
4950
end
5051

52+
def maximum_seen_major_version
53+
client.maximum_seen_major_version
54+
end
55+
5156
def install_template
5257
TemplateManager.install_template(self)
5358
end
@@ -199,12 +204,17 @@ def event_action_params(event)
199204
end
200205

201206
# Determine the correct value for the 'type' field for the given event
207+
DEFAULT_EVENT_TYPE="doc".freeze
202208
def get_event_type(event)
203209
# Set the 'type' value for the index.
204210
type = if @document_type
205211
event.sprintf(@document_type)
206212
else
207-
event.get("type") || "doc"
213+
if client.maximum_seen_major_version < 6
214+
event.get("type") || DEFAULT_EVENT_TYPE
215+
else
216+
DEFAULT_EVENT_TYPE
217+
end
208218
end
209219

210220
if !(type.is_a?(String) || type.is_a?(Numeric))

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ def template_install(name, template, force=false)
8282
template_put(name, template)
8383
end
8484

85-
def connected_es_versions
86-
@pool.connected_es_versions
85+
def maximum_seen_major_version
86+
@pool.maximum_seen_major_version
8787
end
8888

8989
def bulk(actions)

lib/logstash/outputs/elasticsearch/http_client/pool.rb

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ def url_info
107107
@state_mutex.synchronize { @url_info }
108108
end
109109

110-
def connected_es_versions
110+
def maximum_seen_major_version
111111
@state_mutex.synchronize do
112-
@url_info.values.select {|v| v[:state] == :alive }.map {|v| v[:version] }
112+
@maximum_seen_major_version
113113
end
114114
end
115115

@@ -245,6 +245,14 @@ def healthcheck!
245245
es_version = get_es_version(url)
246246
@state_mutex.synchronize do
247247
meta[:version] = es_version
248+
major = major_version(es_version)
249+
if !@maximum_seen_major_version
250+
@logger.info("ES Output version determined", :es_version => @maximum_seen_major_version)
251+
set_new_major_version(major)
252+
elsif major > @maximum_seen_major_version
253+
@logger.warn("Detected a node with a higher major version than previously observed. This could be the result of an elasticsearch cluster upgrade.", :previous_major => @maximum_seen_major_version, :new_major => major, :node_url => url)
254+
set_new_major_version(major)
255+
end
248256
meta[:state] = :alive
249257
end
250258
rescue HostUnreachableError, BadResponseCodeError => e
@@ -434,5 +442,12 @@ def get_es_version(url)
434442
request = perform_request_to_url(url, :get, ROOT_URI_PATH)
435443
LogStash::Json.load(request.body)["version"]["number"]
436444
end
445+
446+
def set_new_major_version(version)
447+
@maximum_seen_major_version = version
448+
if @maximum_seen_major_version >= 6
449+
@logger.warn("Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type", :es_version => @maximum_seen_major_version)
450+
end
451+
end
437452
end
438453
end; end; end; end;

lib/logstash/outputs/elasticsearch/template_manager.rb

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,14 @@ class TemplateManager
44
def self.install_template(plugin)
55
return unless plugin.manage_template
66
plugin.logger.info("Using mapping template from", :path => plugin.template)
7-
template = get_template(plugin.template, get_es_major_version(plugin.client))
7+
template = get_template(plugin.template, plugin.maximum_seen_major_version)
88
plugin.logger.info("Attempting to install template", :manage_template => template)
99
install(plugin.client, plugin.template_name, template, plugin.template_overwrite)
1010
rescue => e
1111
plugin.logger.error("Failed to install template.", :message => e.message, :class => e.class.name, :backtrace => e.backtrace)
1212
end
1313

1414
private
15-
def self.get_es_major_version(client)
16-
# get the elasticsearch version of each node in the pool and
17-
# pick the biggest major version
18-
client.connected_es_versions.uniq.map {|version| version.split(".").first.to_i}.max
19-
end
20-
2115
def self.get_template(path, es_major_version)
2216
template_path = path || default_template_path(es_major_version)
2317
read_template_file(template_path)

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '9.0.1'
3+
s.version = '9.0.2'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/integration/outputs/compressed_indexing_spec.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@
5050
response = http_client.get("#{index_url}/_search?q=*&size=1000")
5151
result = LogStash::Json.load(response.body)
5252
result["hits"]["hits"].each do |doc|
53-
expect(doc["_type"]).to eq(type)
53+
if ESHelper.es_version_satisfies?(">= 6")
54+
expect(doc["_type"]).to eq("doc")
55+
else
56+
expect(doc["_type"]).to eq(type)
57+
end
5458
expect(doc["_index"]).to eq(index)
5559
end
5660
end

spec/integration/outputs/index_spec.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,11 @@
7878
response = http_client.get("#{index_url}/_search?q=*&size=1000")
7979
result = LogStash::Json.load(response.body)
8080
result["hits"]["hits"].each do |doc|
81-
expect(doc["_type"]).to eq(type)
81+
if ESHelper.es_version_satisfies?(">= 6")
82+
expect(doc["_type"]).to eq("doc")
83+
else
84+
expect(doc["_type"]).to eq(type)
85+
end
8286
expect(doc["_index"]).to eq(index)
8387
end
8488
end

spec/unit/outputs/elasticsearch/http_client/pool_spec.rb

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
let(:adapter) { LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter.new(logger) }
88
let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] }
99
let(:options) { {:resurrect_delay => 2, :url_normalizer => proc {|u| u}} } # Shorten the delay a bit to speed up tests
10+
let(:es_node_versions) { [ "0.0.0" ] }
1011

1112
subject { described_class.new(logger, adapter, initial_urls, options) }
1213

1314
let(:manticore_double) { double("manticore a") }
14-
before do
15+
before(:each) do
1516

1617
response_double = double("manticore response").as_null_object
1718
# Allow healtchecks
@@ -21,10 +22,7 @@
2122

2223
allow(::Manticore::Client).to receive(:new).and_return(manticore_double)
2324

24-
allow(subject).to receive(:connected_es_versions).with(any_args).and_return(["0.0.0"])
25-
allow(subject).to receive(:get_es_version).with(any_args).and_return("0.0.0")
26-
27-
subject.start
25+
allow(subject).to receive(:get_es_version).with(any_args).and_return(*es_node_versions)
2826
end
2927

3028
after do
@@ -34,10 +32,12 @@
3432
describe "initialization" do
3533
it "should be successful" do
3634
expect { subject }.not_to raise_error
35+
subject.start
3736
end
3837
end
3938

4039
describe "the resurrectionist" do
40+
before(:each) { subject.start }
4141
it "should start the resurrectionist when created" do
4242
expect(subject.resurrectionist_alive?).to eql(true)
4343
end
@@ -77,6 +77,7 @@
7777
end
7878

7979
describe "the sniffer" do
80+
before(:each) { subject.start }
8081
it "should not start the sniffer by default" do
8182
expect(subject.sniffer_alive?).to eql(nil)
8283
end
@@ -92,6 +93,7 @@
9293

9394
describe "closing" do
9495
before do
96+
subject.start
9597
# Simulate a single in use connection on the first check of this
9698
allow(adapter).to receive(:close).and_call_original
9799
allow(subject).to receive(:wait_for_in_use_connections).and_call_original
@@ -120,6 +122,7 @@
120122
end
121123

122124
describe "connection management" do
125+
before(:each) { subject.start }
123126
context "with only one URL in the list" do
124127
it "should use the only URL in 'with_connection'" do
125128
subject.with_connection do |c|
@@ -167,4 +170,27 @@
167170
end
168171
end
169172
end
173+
174+
describe "version tracking" do
175+
let(:initial_urls) { [
176+
::LogStash::Util::SafeURI.new("http://somehost:9200"),
177+
::LogStash::Util::SafeURI.new("http://otherhost:9201")
178+
] }
179+
180+
before(:each) do
181+
allow(subject).to receive(:perform_request_to_url).and_return(nil)
182+
subject.start
183+
end
184+
185+
it "picks the largest major version" do
186+
expect(subject.maximum_seen_major_version).to eq(0)
187+
end
188+
189+
context "if there are nodes with multiple major versions" do
190+
let(:es_node_versions) { [ "0.0.0", "6.0.0" ] }
191+
it "picks the largest major version" do
192+
expect(subject.maximum_seen_major_version).to eq(6)
193+
end
194+
end
195+
end
170196
end

spec/unit/outputs/elasticsearch/template_manager_spec.rb

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,6 @@
55

66
describe LogStash::Outputs::ElasticSearch::TemplateManager do
77

8-
describe ".get_es_major_version" do
9-
let(:client) { double("client") }
10-
11-
before(:each) do
12-
allow(client).to receive(:connected_es_versions).and_return(["5.3.0"])
13-
end
14-
15-
it "picks the largest major version" do
16-
expect(described_class.get_es_major_version(client)).to eq(5)
17-
end
18-
context "if there are nodes with multiple major versions" do
19-
before(:each) do
20-
allow(client).to receive(:connected_es_versions).and_return(["5.3.0", "6.0.0"])
21-
end
22-
it "picks the largest major version" do
23-
expect(described_class.get_es_major_version(client)).to eq(6)
24-
end
25-
end
26-
end
27-
288
describe ".default_template_path" do
299
context "elasticsearch 1.x" do
3010
it "chooses the 2x template" do

0 commit comments

Comments
 (0)