Skip to content

Add sniffing_attributes option #904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
@@ -180,6 +180,10 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# do not use full URL here, only paths, e.g. "/sniff/_nodes/http"
config :sniffing_path, :validate => :string

# Node attribute(s) to filter for. Those that possess and match the following hash of attributes
# will be selected.
config :sniffing_attributes, :validate => :hash, :default => {}

# Set the address of a forward HTTP proxy.
# This used to accept hashes as arguments but now only accepts
# arguments of the URI type to prevent leaking credentials.
1 change: 1 addition & 0 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
@@ -289,6 +289,7 @@ def build_pool(options)
:sniffing => sniffing,
:sniffer_delay => options[:sniffer_delay],
:sniffing_path => options[:sniffing_path],
:sniffing_attributes => options[:sniffing_attributes],
:healthcheck_path => options[:healthcheck_path],
:resurrect_delay => options[:resurrect_delay],
:url_normalizer => self.method(:host_to_url),
27 changes: 17 additions & 10 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ def message
end
end

attr_reader :logger, :adapter, :sniffing, :sniffer_delay, :resurrect_delay, :healthcheck_path, :sniffing_path, :bulk_path
attr_reader :logger, :adapter, :sniffing, :sniffer_delay, :resurrect_delay, :healthcheck_path, :sniffing_path, :bulk_path, :sniffing_attributes

ROOT_URI_PATH = '/'.freeze
LICENSE_PATH = '/_license'.freeze
@@ -40,6 +40,7 @@ def message
:scheme => 'http',
:resurrect_delay => 5,
:sniffing => false,
:sniffing_attributes => {},
:sniffer_delay => 10,
}.freeze

@@ -48,7 +49,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
@adapter = adapter
@metric = options[:metric]
@initial_urls = initial_urls

raise ArgumentError, "No URL Normalizer specified!" unless options[:url_normalizer]
@url_normalizer = options[:url_normalizer]
DEFAULT_OPTIONS.merge(options).tap do |merged|
@@ -58,6 +59,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
@resurrect_delay = merged[:resurrect_delay]
@sniffing = merged[:sniffing]
@sniffer_delay = merged[:sniffer_delay]
@sniffing_attributes = merged[:sniffing_attributes]
end

# Used for all concurrent operations in this class
@@ -71,7 +73,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
def oss?
LogStash::Outputs::ElasticSearch.oss?
end

def start
update_initial_urls
start_resurrectionist
@@ -189,15 +191,20 @@ def check_sniff
end
end
end

def major_version(version_string)
version_string.split('.').first.to_i
end

def sniff_5x_and_above(nodes)
nodes.map do |id,info|
# Skip master-only nodes
next if info["roles"] && info["roles"] == ["master"]
# if !@sniffing_attributes.nil? or !@sniffing_attributes.to_h.empty?
if !@sniffing_attributes.to_h.empty?
attributes = info["attributes"].clone.delete_if { |key, value| !@sniffing_attributes.key? key }
next if attributes != @sniffing_attributes
end
address_str_to_uri(info["http"]["publish_address"]) if info["http"]
end.compact
end
@@ -215,7 +222,7 @@ def sniff_2x_1x(nodes)
nodes.map do |id,info|
# TODO Make sure this works with shield. Does that listed
# stuff as 'https_address?'

addr_str = info['http_address'].to_s
next unless addr_str # Skip hosts with HTTP disabled

@@ -344,7 +351,7 @@ def normalize_url(uri)

def update_urls(new_urls)
return if new_urls.nil?

# Normalize URLs
new_urls = new_urls.map(&method(:normalize_url))

@@ -374,14 +381,14 @@ def update_urls(new_urls)
logger.info("Elasticsearch pool URLs updated", :changes => state_changes)
end
end

# Run an inline healthcheck anytime URLs are updated
# This guarantees that during startup / post-startup
# sniffing we don't have idle periods waiting for the
# periodic sniffer to allow new hosts to come online
healthcheck!
healthcheck!
end

def size
@state_mutex.synchronize { @url_info.size }
end
1 change: 1 addition & 0 deletions lib/logstash/outputs/elasticsearch/http_client_builder.rb
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ def self.build(logger, hosts, params)
if params["sniffing"]
common_options[:sniffing] = true
common_options[:sniffer_delay] = params["sniffing_delay"]
common_options[:sniffing_attributes] = params["sniffing_attributes"]
end

common_options[:timeout] = params["timeout"] if params["timeout"]