From 688751f8a70ec3913fa78ea3b9d48ece529b1f20 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 25 Mar 2025 15:51:22 -0700 Subject: [PATCH] ES|QL PoC for es-input. --- lib/logstash/inputs/elasticsearch.rb | 31 +++++++++-- lib/logstash/inputs/elasticsearch/esql.rb | 67 +++++++++++++++++++++++ 2 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 lib/logstash/inputs/elasticsearch/esql.rb diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index c9d8b552..043bab88 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -73,6 +73,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base require 'logstash/inputs/elasticsearch/paginated_search' require 'logstash/inputs/elasticsearch/aggregation' + require 'logstash/inputs/elasticsearch/esql' include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck @@ -253,6 +254,11 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base # If set, the _source of each hit will be added nested under the target instead of at the top-level config :target, :validate => :field_reference + # A mode to switch between DSL and ES|QL, defaults to DSL + config :query_mode, :validate => %w[dsl, esql], :default => 'dsl' + + config :esql, :validate => :hash + # Obsolete Settings config :ssl, :obsolete => "Set 'ssl_enabled' instead." config :ca_file, :obsolete => "Set 'ssl_certificate_authorities' instead." @@ -283,10 +289,13 @@ def register fill_hosts_from_cloud_id setup_ssl_params! - @base_query = LogStash::Json.load(@query) - if @slices - @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") - @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") + puts "Query mode: #{@query_mode}" + if @query_mode == 'dsl' + @base_query = LogStash::Json.load(@query) + if @slices + @base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option") + @slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`") + end end @retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`") @@ -360,6 +369,15 @@ def event_from_hit(hit, root_field) return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure']) end + # hit: {columns: [], values: []} + def decorate_and_push_to_queue(output_queue, mapped_entry) + puts "mapped_entry class: #{mapped_entry.class}" + puts "mapped_entry value: #{mapped_entry.inspect}" + event = targeted_event_factory.new_event mapped_entry + decorate(event) + output_queue << event + end + def set_docinfo_fields(hit, event) # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event. docinfo_target = event.get(@docinfo_target) || {} @@ -627,6 +645,11 @@ def setup_search_api end def setup_query_executor + if @query_mode == 'esql' + @query_executor = LogStash::Inputs::Elasticsearch::Esql.new(@client, self) + return @query_executor + end + @query_executor = case @response_type when 'hits' if @resolved_search_api == "search_after" diff --git a/lib/logstash/inputs/elasticsearch/esql.rb b/lib/logstash/inputs/elasticsearch/esql.rb new file mode 100644 index 00000000..903bdb48 --- /dev/null +++ b/lib/logstash/inputs/elasticsearch/esql.rb @@ -0,0 +1,67 @@ +require 'logstash/helpers/loggable_try' + +module LogStash + module Inputs + class Elasticsearch + class Esql + include LogStash::Util::Loggable + + ESQL_JOB = "ES|QL job" + + def initialize(client, plugin) + @client = client + @plugin_params = plugin.params + @plugin = plugin + @retries = @plugin_params["retries"] + + @query = @plugin_params["query"] + esql_options = @plugin_params["esql"] ? @plugin_params["esql"]: {} + @esql_params = esql_options["params"] ? esql_options["params"] : {} + # TODO: add filter as well + # @esql_params = esql_options["filter"] | [] + end + + def retryable(job_name, &block) + stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name) + stud_try.try((@retries + 1).times) { yield } + rescue => e + error_details = {:message => e.message, :cause => e.cause} + error_details[:backtrace] = e.backtrace if logger.debug? + logger.error("#{job_name} failed with ", error_details) + false + end + + def do_run(output_queue) + logger.info("ES|QL executor starting") + response = retryable(ESQL_JOB) do + @client.esql.query({ body: { query: @query }, format: 'json' }) + # TODO: make sure to add params, filters, etc... + # @client.esql.query({ body: { query: @query }, format: 'json' }.merge(@esql_params)) + + end + puts "Response: #{response.inspect}" + if response && response['values'] + response['values'].each do |value| + mapped_data = map_column_and_values(response['columns'], value) + puts "Mapped Data: #{mapped_data}" + @plugin.decorate_and_push_to_queue(output_queue, mapped_data) + end + end + end + + def map_column_and_values(columns, values) + puts "columns class: #{columns.class}" + puts "values class: #{values.class}" + puts "columns: #{columns.inspect}" + puts "values: #{values.inspect}" + mapped_data = {} + columns.each_with_index do |column, index| + mapped_data[column["name"]] = values[index] + end + puts "values: #{mapped_data.inspect}" + mapped_data + end + end + end + end +end \ No newline at end of file