diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb
index b7654506f..42fe92f65 100644
--- a/lib/logstash/outputs/elasticsearch.rb
+++ b/lib/logstash/outputs/elasticsearch.rb
@@ -180,6 +180,10 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
   # do not use full URL here, only paths, e.g. "/sniff/_nodes/http"
   config :sniffing_path, :validate => :string
 
+  # Node attribute(s) to filter for. Those that possess and match the following hash of attributes
+  # will be selected.
+  config :sniffing_attributes, :validate => :hash, :default => {}
+
   # Set the address of a forward HTTP proxy.
   # This used to accept hashes as arguments but now only accepts
   # arguments of the URI type to prevent leaking credentials.
diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb
index 32a37e82a..4456a9074 100644
--- a/lib/logstash/outputs/elasticsearch/http_client.rb
+++ b/lib/logstash/outputs/elasticsearch/http_client.rb
@@ -289,6 +289,7 @@ def build_pool(options)
         :sniffing => sniffing,
         :sniffer_delay => options[:sniffer_delay],
         :sniffing_path => options[:sniffing_path],
+        :sniffing_attributes => options[:sniffing_attributes],
         :healthcheck_path => options[:healthcheck_path],
         :resurrect_delay => options[:resurrect_delay],
         :url_normalizer => self.method(:host_to_url),
diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb
index 467141eaf..38a9ef747 100644
--- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb
+++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb
@@ -28,7 +28,7 @@ def message
       end
     end
 
-    attr_reader :logger, :adapter, :sniffing, :sniffer_delay, :resurrect_delay, :healthcheck_path, :sniffing_path, :bulk_path
+    attr_reader :logger, :adapter, :sniffing, :sniffer_delay, :resurrect_delay, :healthcheck_path, :sniffing_path, :bulk_path, :sniffing_attributes
 
     ROOT_URI_PATH = '/'.freeze
     LICENSE_PATH = '/_license'.freeze
@@ -40,6 +40,7 @@ def message
       :scheme => 'http',
       :resurrect_delay => 5,
       :sniffing => false,
+      :sniffing_attributes => {},
       :sniffer_delay => 10,
     }.freeze
 
@@ -48,7 +49,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
       @adapter = adapter
       @metric = options[:metric]
       @initial_urls = initial_urls
-      
+
       raise ArgumentError, "No URL Normalizer specified!" unless options[:url_normalizer]
       @url_normalizer = options[:url_normalizer]
       DEFAULT_OPTIONS.merge(options).tap do |merged|
@@ -58,6 +59,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
         @resurrect_delay = merged[:resurrect_delay]
         @sniffing = merged[:sniffing]
         @sniffer_delay = merged[:sniffer_delay]
+        @sniffing_attributes = merged[:sniffing_attributes]
       end
 
       # Used for all concurrent operations in this class
@@ -71,7 +73,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
     def oss?
      LogStash::Outputs::ElasticSearch.oss?
     end
-    
+
     def start
       update_initial_urls
       start_resurrectionist
@@ -189,15 +191,20 @@ def check_sniff
         end
       end
     end
-    
+
     def major_version(version_string)
       version_string.split('.').first.to_i
     end
-    
+
     def sniff_5x_and_above(nodes)
       nodes.map do |id,info|
         # Skip master-only nodes
         next if info["roles"] && info["roles"] == ["master"]
+        # if !@sniffing_attributes.nil? or !@sniffing_attributes.to_h.empty?
+        if !@sniffing_attributes.to_h.empty?
+          attributes = info["attributes"].clone.delete_if { |key, value| !@sniffing_attributes.key? key }
+          next if attributes != @sniffing_attributes
+        end
         address_str_to_uri(info["http"]["publish_address"]) if info["http"]
       end.compact
     end
@@ -215,7 +222,7 @@ def sniff_2x_1x(nodes)
       nodes.map do |id,info|
         # TODO Make sure this works with shield. Does that listed
         # stuff as 'https_address?'
-        
+
         addr_str = info['http_address'].to_s
         next unless addr_str # Skip hosts with HTTP disabled
 
@@ -344,7 +351,7 @@ def normalize_url(uri)
 
     def update_urls(new_urls)
       return if new_urls.nil?
-      
+
       # Normalize URLs
       new_urls = new_urls.map(&method(:normalize_url))
 
@@ -374,14 +381,14 @@ def update_urls(new_urls)
           logger.info("Elasticsearch pool URLs updated", :changes => state_changes)
         end
       end
-      
+
       # Run an inline healthcheck anytime URLs are updated
       # This guarantees that during startup / post-startup
       # sniffing we don't have idle periods waiting for the
       # periodic sniffer to allow new hosts to come online
-      healthcheck! 
+      healthcheck!
     end
-    
+
     def size
       @state_mutex.synchronize { @url_info.size }
     end
diff --git a/lib/logstash/outputs/elasticsearch/http_client_builder.rb b/lib/logstash/outputs/elasticsearch/http_client_builder.rb
index 3cb60bc9b..fb857853d 100644
--- a/lib/logstash/outputs/elasticsearch/http_client_builder.rb
+++ b/lib/logstash/outputs/elasticsearch/http_client_builder.rb
@@ -22,6 +22,7 @@ def self.build(logger, hosts, params)
       if params["sniffing"]
         common_options[:sniffing] = true
         common_options[:sniffer_delay] = params["sniffing_delay"]
+        common_options[:sniffing_attributes] = params["sniffing_attributes"]
       end
 
       common_options[:timeout] = params["timeout"] if params["timeout"]