Skip to content

ES|QL support #235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 4.23.0
- ES|QL support [#235](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/235)

## 4.22.0
- Add "cursor"-like index tracking [#205](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205)

Expand Down
126 changes: 122 additions & 4 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,110 @@ The next scheduled run:
* uses {ref}/point-in-time-api.html#point-in-time-api[Point in time (PIT)] + {ref}/paginate-search-results.html#search-after[Search after] to paginate through all the data, and
* updates the value of the field at the end of the pagination.

[id="plugins-{type}s-{plugin}-esql"]
==== {esql} support

.Technical Preview
****
The {esql} feature that allows using ES|QL queries with this plugin is in Technical Preview.
Configuration options and implementation details are subject to change in minor releases without being preceded by deprecation warnings.
****

{es} Query Language ({esql}) provides a SQL-like interface for querying your {es} data.

To use {esql}, this plugin needs to be installed in {ls} 8.17.4 or newer, and must be connected to {es} 8.11 or newer.

To configure {esql} query in the plugin, set the `query_type` to `esql` and provide your {esql} query in the `query` parameter.

IMPORTANT: {esql} is evolving and may still have limitations with regard to result size or supported field types. We recommend understanding https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-limitations.html[ES|QL current limitations] before using it in production environments.

The following is a basic scheduled {esql} query that runs hourly:
[source, ruby]
input {
elasticsearch {
id => hourly_cron_job
hosts => [ 'https://..']
api_key => '....'
query_type => 'esql'
query => '
FROM food-index
| WHERE spicy_level = "hot" AND @timestamp > NOW() - 1 hour
| LIMIT 500
'
schedule => '0 * * * *' # every hour at min 0
}
}

Set `config.support_escapes: true` in `logstash.yml` if you need to escape special chars in the query.

NOTE: With {esql} query, {ls} doesn't generate `event.original`.

[id="plugins-{type}s-{plugin}-esql-event-mapping"]
===== Mapping {esql} result to {ls} event
{esql} returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries).
The plugin maps each value entry to an event, populating corresponding fields.
For example, a query might produce a table like:

[cols="2,1,1,1,2",options="header"]
|===
|`timestamp` |`user_id` | `action` | `status.code` | `status.desc`

|2025-04-10T12:00:00 |123 |login |200 | Success
|2025-04-10T12:05:00 |456 |purchase |403 | Forbidden (unauthorized user)
|===

For this case, the plugin emits two events look like
[source, json]
[
{
"timestamp": "2025-04-10T12:00:00",
"user_id": 123,
"action": "login",
"status": {
"code": 200,
"desc": "Success"
}
},
{
"timestamp": "2025-04-10T12:05:00",
"user_id": 456,
"action": "purchase",
"status": {
"code": 403,
"desc": "Forbidden (unauthorized user)"
}
}
]

NOTE: If your index has a mapping with sub-objects where `status.code` and `status.desc` actually dotted fields, they appear in {ls} events as a nested structure.

[id="plugins-{type}s-{plugin}-esql-multifields"]
===== Conflict on multi-fields

{esql} query fetches all parent and sub-fields fields if your {es} index has https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] or https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/subobjects[subobjects].
Since {ls} events cannot contain parent field's concrete value and sub-field values together, the plugin ignores sub-fields with warning and includes parent.
We recommend using the `RENAME` (or `DROP` to avoid warnings) keyword in your {esql} query explicitly rename the fields to include sub-fields into the event.

This a common occurrence if your template or mapping follows the pattern of always indexing strings as "text" (`field`) + " keyword" (`field.keyword`) multi-field.
In this case it's recommended to do `KEEP field` if the string is identical and there is only one subfield as the engine will optimize and retrieve the keyword, otherwise you can do `KEEP field.keyword | RENAME field.keyword as field`.

To illustrate the situation with example, assuming your mapping has a time `time` field with `time.min` and `time.max` sub-fields as following:
[source, ruby]
"properties": {
"time": { "type": "long" },
"time.min": { "type": "long" },
"time.max": { "type": "long" }
}

The {esql} result will contain all three fields but the plugin cannot map them into {ls} event.
To avoid this, you can use the `RENAME` keyword to rename the `time` parent field to get all three fields with unique fields.
[source, ruby]
...
query => 'FROM my-index | RENAME time AS time.current'
...

For comprehensive {esql} syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[{esql} documentation].

[id="plugins-{type}s-{plugin}-options"]
==== Elasticsearch Input configuration options

Expand All @@ -254,6 +358,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-proxy>> |<<uri,uri>>|No
| <<plugins-{type}s-{plugin}-query>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-query_type>> |<<string,string>>, one of `["dsl","esql"]`|No
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations"]`|No
| <<plugins-{type}s-{plugin}-request_timeout_seconds>> | <<number,number>>|No
| <<plugins-{type}s-{plugin}-schedule>> |<<string,string>>|No
Expand Down Expand Up @@ -495,22 +600,35 @@ environment variables e.g. `proxy => '${LS_PROXY:}'`.
* Value type is <<string,string>>
* Default value is `'{ "sort": [ "_doc" ] }'`

The query to be executed. Read the {ref}/query-dsl.html[Elasticsearch query DSL
documentation] for more information.
The query to be executed.
Accepted query shape is DSL or {esql} (when `query_type => 'esql'`).
Read the {ref}/query-dsl.html[{es} query DSL documentation] or {ref}/esql.html[{esql} documentation] for more information.

When <<plugins-{type}s-{plugin}-search_api>> resolves to `search_after` and the query does not specify `sort`,
the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the query. Please refer to the {ref}/paginate-search-results.html#search-after[Elasticsearch search_after] parameter to know more.

[id="plugins-{type}s-{plugin}-query_type"]
===== `query_type`

* Value can be `dsl` or `esql`
* Default value is `dsl`

Defines the <<plugins-{type}s-{plugin}-query>> shape.
When `dsl`, the query shape must be valid {es} JSON-style string.
When `esql`, the query shape must be a valid {esql} string and `index`, `size`, `slices`, `search_api`, `docinfo`, `docinfo_target`, `docinfo_fields`, `response_type` and `tracking_field` parameters are not allowed.

[id="plugins-{type}s-{plugin}-response_type"]
===== `response_type`

* Value can be any of: `hits`, `aggregations`
* Value can be any of: `hits`, `aggregations`, `esql`
* Default value is `hits`

Which part of the result to transform into Logstash events when processing the
response from the query.

The default `hits` will generate one event per returned document (i.e. "hit").
When set to `aggregations`, a single Logstash event will be generated with the

When set to `aggregations`, a single {ls} event will be generated with the
contents of the `aggregations` object of the query's response. In this case the
`hits` object will be ignored. The parameter `size` will be always be set to
0 regardless of the default or user-defined value set in this plugin.
Expand Down
114 changes: 78 additions & 36 deletions lib/logstash/inputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
require 'logstash/inputs/elasticsearch/paginated_search'
require 'logstash/inputs/elasticsearch/aggregation'
require 'logstash/inputs/elasticsearch/cursor_tracker'
require 'logstash/inputs/elasticsearch/esql'

include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
Expand All @@ -96,15 +97,21 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
# The index or alias to search.
config :index, :validate => :string, :default => "logstash-*"

# The query to be executed. Read the Elasticsearch query DSL documentation
# for more info
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
# A type of Elasticsearch query, provided by @query. This will validate query shape and other params.
config :query_type, :validate => %w[dsl esql], :default => 'dsl'

# The query to be executed. DSL or ES|QL (when `query_type => 'esql'`) query shape is accepted.
# Read the following documentations for more info
# Query DSL: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
# ES|QL: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html
config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }'

# This allows you to speccify the response type: either hits or aggregations
# where hits: normal search request
# aggregations: aggregation request
config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits'
# This allows you to specify the DSL response type: one of [hits, aggregations]
# where
# hits: normal search request
# aggregations: aggregation request
# Note that this param is invalid when `query_type => 'esql'`, ES|QL response shape is always a tabular format
config :response_type, :validate => %w[hits aggregations], :default => 'hits'

# This allows you to set the maximum number of hits returned per scroll.
config :size, :validate => :number, :default => 1000
Expand Down Expand Up @@ -293,6 +300,9 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze
INTERNAL_ORIGIN_HEADER = { 'x-elastic-product-origin' => 'logstash-input-elasticsearch'}.freeze

LS_ESQL_SUPPORT_VERSION = "8.17.4" # the version started using elasticsearch-ruby v8
ES_ESQL_SUPPORT_VERSION = "8.11.0"

def initialize(params={})
super(params)

Expand All @@ -309,10 +319,17 @@ def register
fill_hosts_from_cloud_id
setup_ssl_params!

@base_query = LogStash::Json.load(@query)
if @slices
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
if @query_type == 'esql'
validate_ls_version_for_esql_support!
validate_esql_query!
not_allowed_options = original_params.keys & %w(index size slices search_api docinfo docinfo_target docinfo_fields response_type tracking_field)
raise(LogStash::ConfigurationError, "Configured #{not_allowed_options} params are not allowed while using ES|QL query") if not_allowed_options&.size > 1
else
@base_query = LogStash::Json.load(@query)
if @slices
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
end
end

@retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`")
Expand Down Expand Up @@ -348,11 +365,13 @@ def register

test_connection!

validate_es_for_esql_support!

setup_serverless

setup_search_api

setup_query_executor
@query_executor = create_query_executor

setup_cursor_tracker

Expand All @@ -370,16 +389,6 @@ def run(output_queue)
end
end

def get_query_object
if @cursor_tracker
query = @cursor_tracker.inject_cursor(@query)
@logger.debug("new query is #{query}")
else
query = @query
end
LogStash::Json.load(query)
end

##
# This can be called externally from the query_executor
public
Expand All @@ -390,6 +399,23 @@ def push_hit(hit, output_queue, root_field = '_source')
record_last_value(event)
end

def decorate_event(event)
decorate(event)
end

private

def get_query_object
return @query if @query_type == 'esql'
if @cursor_tracker
query = @cursor_tracker.inject_cursor(@query)
@logger.debug("new query is #{query}")
else
query = @query
end
LogStash::Json.load(query)
end

def record_last_value(event)
@cursor_tracker.record_last_value(event) if @tracking_field
end
Expand Down Expand Up @@ -421,8 +447,6 @@ def set_docinfo_fields(hit, event)
event.set(@docinfo_target, docinfo_target)
end

private

def hosts_default?(hosts)
hosts.nil? || ( hosts.is_a?(Array) && hosts.empty? )
end
Expand Down Expand Up @@ -700,18 +724,16 @@ def setup_search_api

end

def setup_query_executor
@query_executor = case @response_type
when 'hits'
if @resolved_search_api == "search_after"
LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self)
else
logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8
LogStash::Inputs::Elasticsearch::Scroll.new(@client, self)
end
when 'aggregations'
LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self)
end
def create_query_executor
return LogStash::Inputs::Elasticsearch::Esql.new(@client, self) if @query_type == 'esql'

# DSL query executor
return LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self) if @response_type == 'aggregations'
# response_type is hits, executor can be search_after or scroll type
return LogStash::Inputs::Elasticsearch::SearchAfter.new(@client, self) if @resolved_search_api == "search_after"

logger.warn("scroll API is no longer recommended for pagination. Consider using search_after instead.") if es_major_version >= 8
LogStash::Inputs::Elasticsearch::Scroll.new(@client, self)
end

def setup_cursor_tracker
Expand Down Expand Up @@ -750,6 +772,26 @@ def get_transport_client_class
::Elastic::Transport::Transport::HTTP::Manticore
end

def validate_ls_version_for_esql_support!
if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LS_ESQL_SUPPORT_VERSION)
fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{LS_ESQL_SUPPORT_VERSION}")
end
end

def validate_esql_query!
fail(LogStash::ConfigurationError, "`query` cannot be empty") if @query.strip.empty?
source_commands = %w[FROM ROW SHOW]
contains_source_command = source_commands.any? { |source_command| @query.strip.start_with?(source_command) }
fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command
end

def validate_es_for_esql_support!
return unless @query_type == 'esql'
# make sure connected ES supports ES|QL (8.11+)
es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create(ES_ESQL_SUPPORT_VERSION)
fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{ES_ESQL_SUPPORT_VERSION} version.") unless es_supports_esql
end

module URIOrEmptyValidator
##
# @override to provide :uri_or_empty validator
Expand Down
Loading