11
11
require "logstash/plugin_mixins/scheduler"
12
12
require "logstash/plugin_mixins/normalize_config_support"
13
13
require "base64"
14
- require 'logstash/helpers/loggable_try'
15
14
16
15
require "elasticsearch"
17
16
require "elasticsearch/transport/transport/http/manticore"
74
73
#
75
74
class LogStash ::Inputs ::Elasticsearch < LogStash ::Inputs ::Base
76
75
76
+ require 'logstash/inputs/elasticsearch/paginated_search'
77
+
77
78
include LogStash ::PluginMixins ::ECSCompatibilitySupport ( :disabled , :v1 , :v8 => :v1 )
78
79
include LogStash ::PluginMixins ::ECSCompatibilitySupport ::TargetCheck
79
80
@@ -106,6 +107,10 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
106
107
# The number of retries to run the query. If the query fails after all retries, it logs an error message.
107
108
config :retries , :validate => :number , :default => 0
108
109
110
+ # Default `auto` will use `search_after` api for Elasticsearch 8 and use `scroll` api for 7
111
+ # Set to scroll to fallback to previous version
112
+ config :search_api , :validate => %w[ auto search_after scroll ] , :default => "auto"
113
+
109
114
# This parameter controls the keepalive time in seconds of the scrolling
110
115
# request and initiates the scrolling process. The timeout applies per
111
116
# round trip (i.e. between the previous scroll request, to the next).
@@ -321,93 +326,21 @@ def register
321
326
322
327
setup_serverless
323
328
329
+ setup_search_api
330
+
324
331
@client
325
332
end
326
333
327
334
328
335
def run ( output_queue )
329
336
if @schedule
330
- scheduler . cron ( @schedule ) { do_run ( output_queue ) }
337
+ scheduler . cron ( @schedule ) { @paginated_search . do_run ( output_queue ) }
331
338
scheduler . join
332
339
else
333
- do_run ( output_queue )
334
- end
335
- end
336
-
337
- private
338
- JOB_NAME = "run query"
339
- def do_run ( output_queue )
340
- # if configured to run a single slice, don't bother spinning up threads
341
- if @slices . nil? || @slices <= 1
342
- return retryable ( JOB_NAME ) do
343
- do_run_slice ( output_queue )
344
- end
345
- end
346
-
347
- logger . warn ( "managed slices for query is very large (#{ @slices } ); consider reducing" ) if @slices > 8
348
-
349
-
350
- @slices . times . map do |slice_id |
351
- Thread . new do
352
- LogStash ::Util ::set_thread_name ( "[#{ pipeline_id } ]|input|elasticsearch|slice_#{ slice_id } " )
353
- retryable ( JOB_NAME ) do
354
- do_run_slice ( output_queue , slice_id )
355
- end
356
- end
357
- end . map ( &:join )
358
-
359
- logger . trace ( "#{ @slices } slices completed" )
360
- end
361
-
362
- def retryable ( job_name , &block )
363
- begin
364
- stud_try = ::LogStash ::Helpers ::LoggableTry . new ( logger , job_name )
365
- stud_try . try ( ( @retries + 1 ) . times ) { yield }
366
- rescue => e
367
- error_details = { :message => e . message , :cause => e . cause }
368
- error_details [ :backtrace ] = e . backtrace if logger . debug?
369
- logger . error ( "Tried #{ job_name } unsuccessfully" , error_details )
340
+ @paginated_search . do_run ( output_queue )
370
341
end
371
342
end
372
343
373
- def do_run_slice ( output_queue , slice_id = nil )
374
- slice_query = @base_query
375
- slice_query = slice_query . merge ( 'slice' => { 'id' => slice_id , 'max' => @slices } ) unless slice_id . nil?
376
-
377
- slice_options = @options . merge ( :body => LogStash ::Json . dump ( slice_query ) )
378
-
379
- logger . info ( "Slice starting" , slice_id : slice_id , slices : @slices ) unless slice_id . nil?
380
-
381
- begin
382
- r = search_request ( slice_options )
383
-
384
- r [ 'hits' ] [ 'hits' ] . each { |hit | push_hit ( hit , output_queue ) }
385
- logger . debug ( "Slice progress" , slice_id : slice_id , slices : @slices ) unless slice_id . nil?
386
-
387
- has_hits = r [ 'hits' ] [ 'hits' ] . any?
388
- scroll_id = r [ '_scroll_id' ]
389
-
390
- while has_hits && scroll_id && !stop?
391
- has_hits , scroll_id = process_next_scroll ( output_queue , scroll_id )
392
- logger . debug ( "Slice progress" , slice_id : slice_id , slices : @slices ) if logger . debug? && slice_id
393
- end
394
- logger . info ( "Slice complete" , slice_id : slice_id , slices : @slices ) unless slice_id . nil?
395
- ensure
396
- clear_scroll ( scroll_id )
397
- end
398
- end
399
-
400
- ##
401
- # @param output_queue [#<<]
402
- # @param scroll_id [String]: a scroll id to resume
403
- # @return [Array(Boolean,String)]: a tuple representing whether the response
404
- #
405
- def process_next_scroll ( output_queue , scroll_id )
406
- r = scroll_request ( scroll_id )
407
- r [ 'hits' ] [ 'hits' ] . each { |hit | push_hit ( hit , output_queue ) }
408
- [ r [ 'hits' ] [ 'hits' ] . any? , r [ '_scroll_id' ] ]
409
- end
410
-
411
344
def push_hit ( hit , output_queue )
412
345
event = targeted_event_factory . new_event hit [ '_source' ]
413
346
set_docinfo_fields ( hit , event ) if @docinfo
@@ -433,20 +366,7 @@ def set_docinfo_fields(hit, event)
433
366
event . set ( @docinfo_target , docinfo_target )
434
367
end
435
368
436
- def clear_scroll ( scroll_id )
437
- @client . clear_scroll ( :body => { :scroll_id => scroll_id } ) if scroll_id
438
- rescue => e
439
- # ignore & log any clear_scroll errors
440
- logger . warn ( "Ignoring clear_scroll exception" , message : e . message , exception : e . class )
441
- end
442
-
443
- def scroll_request ( scroll_id )
444
- @client . scroll ( :body => { :scroll_id => scroll_id } , :scroll => @scroll )
445
- end
446
-
447
- def search_request ( options = { } )
448
- @client . search ( options )
449
- end
369
+ private
450
370
451
371
def hosts_default? ( hosts )
452
372
hosts . nil? || ( hosts . is_a? ( Array ) && hosts . empty? )
@@ -677,6 +597,18 @@ def test_connection!
677
597
raise LogStash ::ConfigurationError , "Could not connect to a compatible version of Elasticsearch"
678
598
end
679
599
600
+ def es_info
601
+ @es_info ||= @client . info
602
+ end
603
+
604
+ def es_version
605
+ @es_version ||= es_info &.dig ( 'version' , 'number' )
606
+ end
607
+
608
+ def es_major_version
609
+ @es_major_version ||= es_version . split ( '.' ) . first . to_i
610
+ end
611
+
680
612
# recreate client with default header when it is serverless
681
613
# verify the header by sending GET /
682
614
def setup_serverless
@@ -691,13 +623,35 @@ def setup_serverless
691
623
end
692
624
693
625
def build_flavor
694
- @build_flavor ||= @client . info &.dig ( 'version' , 'build_flavor' )
626
+ @build_flavor ||= es_info &.dig ( 'version' , 'build_flavor' )
695
627
end
696
628
697
629
def serverless?
698
630
@is_serverless ||= ( build_flavor == BUILD_FLAVOR_SERVERLESS )
699
631
end
700
632
633
+ def setup_search_api
634
+ @resolved_search_api = if @search_api == "auto"
635
+ api = if es_major_version >= 8
636
+ "search_after"
637
+ else
638
+ "scroll"
639
+ end
640
+ logger . info ( "`search_api => auto` resolved to `#{ api } `" , :elasticsearch => es_version )
641
+ api
642
+ else
643
+ @search_api
644
+ end
645
+
646
+
647
+ @paginated_search = if @resolved_search_api == "search_after"
648
+ LogStash ::Inputs ::Elasticsearch ::SearchAfter . new ( @client , self )
649
+ else
650
+ logger . warn ( "scroll API is no longer recommended for pagination. Consider using search_after instead." ) if es_major_version >= 8
651
+ LogStash ::Inputs ::Elasticsearch ::Scroll . new ( @client , self )
652
+ end
653
+ end
654
+
701
655
module URIOrEmptyValidator
702
656
##
703
657
# @override to provide :uri_or_empty validator
0 commit comments