diff --git a/docs/reference/transport.md b/docs/reference/transport.md index 8af799958a..ddb09df679 100644 --- a/docs/reference/transport.md +++ b/docs/reference/transport.md @@ -3,7 +3,7 @@ mapped_pages: - https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/transport.html --- -# Transport example [transport] +# Low level Transport example [low-level-transport] This page demonstrates how to use the low level transport to send requests. @@ -16,8 +16,10 @@ public class MyRequestParameters : RequestParameters init => Q("pretty", value); } } +``` -// ... +```csharp +using Elastic.Transport; var body = """ { @@ -49,3 +51,85 @@ var response = await client.Transport .ConfigureAwait(false); ``` +# `OnBeforeRequest` example [on-before-request] + +The `OnBeforeRequest` callback in `IElasticsearchClientSettings` can be used to dynamically modify requests. + +```csharp +var settings = new ElasticsearchClientSettings(new Uri("http://localhost:9200)) + .OnBeforeRequest(OnBeforeRequest); <1> + +RequestConfiguration? globalRequestConfiguration = null; +ConditionalWeakTable? globalRequestConfigurations = null; + +void OnBeforeRequest(ElasticsearchClient client, Request request, EndpointPath endpointPath, ref PostData? postData, ref IRequestConfiguration? requestConfiguration) +{ + // Each time a request is made, the transport creates a new `BoundConfiguration` for every `IRequestConfiguration` + // that is not in the cache (based on reference equality). + + // To prevent frequent allocations of our mutated request configurations (and the secondary allocations for + // `BoundConfiguration`), we have to maintain a custom cache that maps every original request configuration to the + // mutated one. + + if (requestConfiguration is null) + { + globalRequestConfiguration = Interlocked.CompareExchange( + ref globalRequestConfiguration, + new RequestConfiguration + { + UserAgent = UserAgent.Create("my-custom-user-agent") + }, + null) ?? globalRequestConfiguration; + + requestConfiguration = globalRequestConfiguration; + return; + } + + if (requestConfiguration is not RequestConfiguration rc) + { + // Only `RequestConfiguration` (not all implementations of `IRequestConfiguration`) gets cached in the + // internal cache. + requestConfiguration = MutateRequestConfiguration(requestConfiguration); + return; + } + + // ReSharper disable InconsistentlySynchronizedField + + var cache = (Interlocked.CompareExchange( + ref globalRequestConfigurations, + new ConditionalWeakTable(), + null + ) ?? globalRequestConfigurations); + + if (cache.TryGetValue(rc, out var mutatedRequestConfiguration)) + { + requestConfiguration = mutatedRequestConfiguration; + return; + } + + mutatedRequestConfiguration = MutateRequestConfiguration(rc); + +#if NET8_0_OR_GREATER + cache.TryAdd(rc, mutatedRequestConfiguration); +#else + lock (cache) + { + cache.Add(rc, mutatedRequestConfiguration); + } +#endif + + // ReSharper restore InconsistentlySynchronizedField + + return; + + RequestConfiguration MutateRequestConfiguration(IRequestConfiguration requestConfiguration) + { + return new RequestConfiguration(requestConfiguration) + { + UserAgent = UserAgent.Create("my-custom-user-agent") + }; + } +} +``` + +1. Register the `OnBeforeRequest` callback. diff --git a/src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj b/src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj index 9945a675d3..0b6d8e66e0 100644 --- a/src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj +++ b/src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj @@ -31,7 +31,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.cs index afda07d6c1..5399abd131 100644 --- a/src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.cs +++ b/src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.cs @@ -6,12 +6,9 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Runtime.CompilerServices; -using System.Text.Json; using System.Threading.Tasks; using System.Threading; using Elastic.Transport; -using Elastic.Transport.Diagnostics; using Elastic.Clients.Elasticsearch.Requests; @@ -28,18 +25,23 @@ public partial class ElasticsearchClient private const string OpenTelemetrySchemaVersion = "https://opentelemetry.io/schemas/1.21.0"; private readonly ITransport _transport; - internal static ConditionalWeakTable SettingsTable { get; } = new(); /// /// Creates a client configured to connect to http://localhost:9200. /// - public ElasticsearchClient() : this(new ElasticsearchClientSettings(new Uri("http://localhost:9200"))) { } + public ElasticsearchClient() : + this(new ElasticsearchClientSettings(new Uri("http://localhost:9200"))) + { + } /// /// Creates a client configured to connect to a node reachable at the provided . /// /// The to connect to. - public ElasticsearchClient(Uri uri) : this(new ElasticsearchClientSettings(uri)) { } + public ElasticsearchClient(Uri uri) : + this(new ElasticsearchClientSettings(uri)) + { + } /// /// Creates a client configured to communicate with Elastic Cloud using the provided . @@ -51,8 +53,8 @@ public ElasticsearchClient(Uri uri) : this(new ElasticsearchClientSettings(uri)) /// /// The Cloud ID of an Elastic Cloud deployment. /// The credentials to use for the connection. - public ElasticsearchClient(string cloudId, AuthorizationHeader credentials) : this( - new ElasticsearchClientSettings(cloudId, credentials)) + public ElasticsearchClient(string cloudId, AuthorizationHeader credentials) : + this(new ElasticsearchClientSettings(cloudId, credentials)) { } @@ -69,8 +71,7 @@ internal ElasticsearchClient(ITransport transport) { transport.ThrowIfNull(nameof(transport)); transport.Configuration.ThrowIfNull(nameof(transport.Configuration)); - transport.Configuration.RequestResponseSerializer.ThrowIfNull( - nameof(transport.Configuration.RequestResponseSerializer)); + transport.Configuration.RequestResponseSerializer.ThrowIfNull(nameof(transport.Configuration.RequestResponseSerializer)); transport.Configuration.Inferrer.ThrowIfNull(nameof(transport.Configuration.Inferrer)); _transport = transport; @@ -96,47 +97,38 @@ private enum ProductCheckStatus private partial void SetupNamespaces(); - internal TResponse DoRequest(TRequest request) - where TRequest : Request - where TResponse : TransportResponse, new() - where TRequestParameters : RequestParameters, new() => - DoRequest(request, null); - internal TResponse DoRequest( - TRequest request, - Action? forceConfiguration) + TRequest request) where TRequest : Request where TResponse : TransportResponse, new() where TRequestParameters : RequestParameters, new() - => DoRequestCoreAsync(false, request, forceConfiguration).EnsureCompleted(); - - internal Task DoRequestAsync( - TRequest request, - CancellationToken cancellationToken = default) - where TRequest : Request - where TResponse : TransportResponse, new() - where TRequestParameters : RequestParameters, new() - => DoRequestAsync(request, null, cancellationToken); + { + return DoRequestCoreAsync(false, request).EnsureCompleted(); + } internal Task DoRequestAsync( TRequest request, - Action? forceConfiguration, CancellationToken cancellationToken = default) where TRequest : Request where TResponse : TransportResponse, new() where TRequestParameters : RequestParameters, new() - => DoRequestCoreAsync(true, request, forceConfiguration, cancellationToken).AsTask(); + { + return DoRequestCoreAsync(true, request, cancellationToken).AsTask(); + } private ValueTask DoRequestCoreAsync( bool isAsync, TRequest request, - Action? forceConfiguration, CancellationToken cancellationToken = default) where TRequest : Request where TResponse : TransportResponse, new() where TRequestParameters : RequestParameters, new() { - // The product check modifies request parameters and therefore must not be executed concurrently. + if (request is null) + { + throw new ArgumentNullException(nameof(request)); + } + // We use a lockless CAS approach to make sure that only a single product check request is executed at a time. // We do not guarantee that the product check is always performed on the first request. @@ -157,12 +149,12 @@ private ValueTask DoRequestCoreAsync SendRequest() { - var (endpointPath, resolvedRouteValues, postData) = PrepareRequest(request); - var openTelemetryDataMutator = GetOpenTelemetryDataMutator(request, resolvedRouteValues); + PrepareRequest(request, out var endpointPath, out var postData, out var requestConfiguration, out var routeValues); + var openTelemetryDataMutator = GetOpenTelemetryDataMutator(request, routeValues); return isAsync - ? new ValueTask(_transport.RequestAsync(endpointPath, postData, openTelemetryDataMutator, request.RequestConfiguration, cancellationToken)) - : new ValueTask(_transport.Request(endpointPath, postData, openTelemetryDataMutator, request.RequestConfiguration)); + ? new ValueTask(_transport.RequestAsync(endpointPath, postData, openTelemetryDataMutator, requestConfiguration, cancellationToken)) + : new ValueTask(_transport.Request(endpointPath, postData, openTelemetryDataMutator, requestConfiguration)); } async ValueTask SendRequestWithProductCheck() @@ -178,7 +170,9 @@ async ValueTask SendRequestWithProductCheck() // 32-bit read/write operations are atomic and due to the initial memory barrier, we can be sure that // no other thread executes the product check at the same time. Locked access is not required here. if (_productCheckStatus is (int)ProductCheckStatus.InProgress) + { _productCheckStatus = (int)ProductCheckStatus.NotChecked; + } throw; } @@ -186,26 +180,25 @@ async ValueTask SendRequestWithProductCheck() async ValueTask SendRequestWithProductCheckCore() { + PrepareRequest(request, out var endpointPath, out var postData, out var requestConfiguration, out var routeValues); + var openTelemetryDataMutator = GetOpenTelemetryDataMutator(request, routeValues); + // Attach product check header - // TODO: The copy constructor should accept null values - var requestConfig = (request.RequestConfiguration is null) - ? new RequestConfiguration() + var requestConfig = (requestConfiguration is null) + ? new RequestConfiguration { ResponseHeadersToParse = new HeadersList("x-elastic-product") } - : new RequestConfiguration(request.RequestConfiguration) + : new RequestConfiguration(requestConfiguration) { - ResponseHeadersToParse = (request.RequestConfiguration.ResponseHeadersToParse is { Count: > 0 }) - ? new HeadersList(request.RequestConfiguration.ResponseHeadersToParse, "x-elastic-product") + ResponseHeadersToParse = (requestConfiguration.ResponseHeadersToParse is { Count: > 0 }) + ? new HeadersList(requestConfiguration.ResponseHeadersToParse, "x-elastic-product") : new HeadersList("x-elastic-product") }; // Send request - var (endpointPath, resolvedRouteValues, postData) = PrepareRequest(request); - var openTelemetryDataMutator = GetOpenTelemetryDataMutator(request, resolvedRouteValues); - TResponse response; if (isAsync) @@ -239,7 +232,9 @@ async ValueTask SendRequestWithProductCheckCore() : (int)ProductCheckStatus.Failed; if (_productCheckStatus == (int)ProductCheckStatus.Failed) + { throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError); + } return response; } @@ -249,15 +244,17 @@ async ValueTask SendRequestWithProductCheckCore() where TRequest : Request where TRequestParameters : RequestParameters, new() { - // If there are no subscribed listeners, we avoid some work and allocations + // If there are no subscribed listeners, we avoid some work and allocations. if (!Elastic.Transport.Diagnostics.OpenTelemetry.ElasticTransportActivitySourceHasListeners) + { return null; + } return OpenTelemetryDataMutator; void OpenTelemetryDataMutator(Activity activity) { - // We fall back to a general operation name in cases where the derived request fails to override the property + // We fall back to a general operation name in cases where the derived request fails to override the property. var operationName = !string.IsNullOrEmpty(request.OperationName) ? request.OperationName : request.HttpMethod.GetStringValue(); // TODO: Optimisation: We should consider caching these, either for cases where resolvedRouteValues is null, or @@ -267,7 +264,7 @@ void OpenTelemetryDataMutator(Activity activity) // The latter may bloat the cache as some combinations of path parts may rarely re-occur. activity.DisplayName = operationName; - + activity.SetTag(OpenTelemetry.SemanticConventions.DbOperation, !string.IsNullOrEmpty(request.OperationName) ? request.OperationName : "unknown"); activity.SetTag($"{OpenTelemetrySpanAttributePrefix}schema_url", OpenTelemetrySchemaVersion); @@ -282,21 +279,26 @@ void OpenTelemetryDataMutator(Activity activity) } } - private (EndpointPath endpointPath, Dictionary? resolvedRouteValues, PostData data) PrepareRequest(TRequest request) + private void PrepareRequest( + TRequest request, + out EndpointPath endpointPath, + out PostData? postData, + out IRequestConfiguration? requestConfiguration, + out Dictionary? routeValues) where TRequest : Request where TRequestParameters : RequestParameters, new() { - request.ThrowIfNull(nameof(request), "A request is required."); - - var (resolvedUrl, _, routeValues) = request.GetUrl(ElasticsearchClientSettings); + var (resolvedUrl, _, resolvedRouteValues) = request.GetUrl(ElasticsearchClientSettings); var pathAndQuery = request.RequestParameters.CreatePathWithQueryStrings(resolvedUrl, ElasticsearchClientSettings); - var postData = - request.HttpMethod == HttpMethod.GET || - request.HttpMethod == HttpMethod.HEAD || !request.SupportsBody + routeValues = resolvedRouteValues; + endpointPath = new EndpointPath(request.HttpMethod, pathAndQuery); + postData = + request.HttpMethod is HttpMethod.GET or HttpMethod.HEAD || !request.SupportsBody ? null : PostData.Serializable(request); - return (new EndpointPath(request.HttpMethod, pathAndQuery), routeValues, postData); + requestConfiguration = request.RequestConfiguration; + ElasticsearchClientSettings.OnBeforeRequest?.Invoke(this, request, endpointPath, ref postData, ref requestConfiguration); } } diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Client/NamespacedClientProxy.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Client/NamespacedClientProxy.cs index 65b32e91a8..b7aeacb4fc 100644 --- a/src/Elastic.Clients.Elasticsearch/_Shared/Client/NamespacedClientProxy.cs +++ b/src/Elastic.Clients.Elasticsearch/_Shared/Client/NamespacedClientProxy.cs @@ -5,15 +5,16 @@ using System; using System.Threading; using System.Threading.Tasks; + using Elastic.Clients.Elasticsearch.Requests; using Elastic.Transport; -using Elastic.Transport.Products.Elasticsearch; namespace Elastic.Clients.Elasticsearch; public abstract class NamespacedClientProxy { - private const string InvalidOperation = "The client has not been initialised for proper usage as may have been partially mocked. Ensure you are using a " + + private const string InvalidOperation = + "The client has not been initialised for proper usage as may have been partially mocked. Ensure you are using a " + "new instance of ElasticsearchClient to perform requests over a network to Elasticsearch."; protected ElasticsearchClient Client { get; } @@ -21,27 +22,24 @@ public abstract class NamespacedClientProxy /// /// Initializes a new instance for mocking. /// - protected NamespacedClientProxy() { } + protected NamespacedClientProxy() + { + } internal NamespacedClientProxy(ElasticsearchClient client) => Client = client; - internal TResponse DoRequest(TRequest request) - where TRequest : Request - where TResponse : TransportResponse, new() - where TRequestParameters : RequestParameters, new() - => DoRequest(request, null); - internal TResponse DoRequest( - TRequest request, - Action? forceConfiguration) + TRequest request) where TRequest : Request where TResponse : TransportResponse, new() where TRequestParameters : RequestParameters, new() { if (Client is null) - ThrowHelper.ThrowInvalidOperationException(InvalidOperation); + { + throw new InvalidOperationException(InvalidOperation); + } - return Client.DoRequest(request, forceConfiguration); + return Client.DoRequest(request); } internal Task DoRequestAsync( @@ -49,20 +47,13 @@ internal Task DoRequestAsync CancellationToken cancellationToken = default) where TRequest : Request where TResponse : TransportResponse, new() - where TRequestParameters : RequestParameters, new() - => DoRequestAsync(request, null, cancellationToken); - - internal Task DoRequestAsync( - TRequest request, - Action? forceConfiguration, - CancellationToken cancellationToken = default) - where TRequest : Request - where TResponse : TransportResponse, new() where TRequestParameters : RequestParameters, new() { if (Client is null) - ThrowHelper.ThrowInvalidOperationException(InvalidOperation); + { + throw new InvalidOperationException(InvalidOperation); + } - return Client.DoRequestAsync(request, forceConfiguration, cancellationToken); + return Client.DoRequestAsync(request, cancellationToken); } } diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/ElasticsearchClientSettings.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/ElasticsearchClientSettings.cs index 4c655162fd..9b5197a4b9 100644 --- a/src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/ElasticsearchClientSettings.cs +++ b/src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/ElasticsearchClientSettings.cs @@ -8,9 +8,10 @@ using System.Linq; using System.Linq.Expressions; using System.Reflection; +using System.Runtime.InteropServices; using Elastic.Clients.Elasticsearch.Esql; - +using Elastic.Clients.Elasticsearch.Requests; using Elastic.Clients.Elasticsearch.Serialization; using Elastic.Transport; @@ -110,6 +111,7 @@ public abstract class ElasticsearchClientSettingsBase : private readonly FluentDictionary _propertyMappings = new(); private readonly FluentDictionary _routeProperties = new(); private readonly Serializer _sourceSerializer; + private BeforeRequestEvent? _onBeforeRequest; private bool _experimentalEnableSerializeNullInferredValues; private ExperimentalSettings _experimentalSettings = new(); @@ -158,7 +160,7 @@ protected ElasticsearchClientSettingsBase( FluentDictionary IElasticsearchClientSettings.RouteProperties => _routeProperties; Serializer IElasticsearchClientSettings.SourceSerializer => _sourceSerializer; - + BeforeRequestEvent? IElasticsearchClientSettings.OnBeforeRequest => _onBeforeRequest; ExperimentalSettings IElasticsearchClientSettings.Experimental => _experimentalSettings; bool IElasticsearchClientSettings.ExperimentalEnableSerializeNullInferredValues => _experimentalEnableSerializeNullInferredValues; @@ -322,6 +324,20 @@ public TConnectionSettings DefaultMappingFor(IEnumerable typeMap return (TConnectionSettings)this; } + + /// + public TConnectionSettings OnBeforeRequest(BeforeRequestEvent handler) + { + return Assign(handler, static (a, v) => a._onBeforeRequest += v ?? DefaultBeforeRequestHandler); + } + + private static void DefaultBeforeRequestHandler(ElasticsearchClient client, + Request request, + EndpointPath endpointPath, + ref PostData? postData, + ref IRequestConfiguration? requestConfiguration) + { + } } /// diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/IElasticsearchClientSettings.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/IElasticsearchClientSettings.cs index 37bf49198a..ffda6461b8 100644 --- a/src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/IElasticsearchClientSettings.cs +++ b/src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/IElasticsearchClientSettings.cs @@ -5,10 +5,26 @@ using System; using System.Collections.Generic; using System.Reflection; +using Elastic.Clients.Elasticsearch.Requests; using Elastic.Transport; namespace Elastic.Clients.Elasticsearch; +/// +/// An event that is fired before a request is sent. +/// +/// The instance used to send the request. +/// The request. +/// The endpoint path. +/// The post data. +/// The request configuration. +public delegate void BeforeRequestEvent( + ElasticsearchClient client, + Request request, + EndpointPath endpointPath, + ref PostData? postData, + ref IRequestConfiguration? requestConfiguration); + /// /// Provides the connection settings for Elastic.Clients.Elasticsearch's high level /// @@ -91,6 +107,14 @@ public interface IElasticsearchClientSettings : ITransportConfiguration /// Serializer SourceSerializer { get; } + /// + /// A callback that is invoked immediately before a request is sent. + /// + /// Allows to dynamically update the and . + /// + /// + BeforeRequestEvent? OnBeforeRequest { get; } + /// /// This is an advanced setting which controls serialization behaviour for inferred properies such as ID, routing and index name. /// When enabled, it may reduce allocations on serialisation paths where the cost can be more significant, such as in bulk operations. diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Core/Request/Request.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Core/Request/Request.cs index f748358cf9..cb269041d2 100644 --- a/src/Elastic.Clients.Elasticsearch/_Shared/Core/Request/Request.cs +++ b/src/Elastic.Clients.Elasticsearch/_Shared/Core/Request/Request.cs @@ -57,7 +57,8 @@ protected virtual (string ResolvedUrl, string UrlTemplate, Dictionary? resolvedRouteValues) GetUrl(IElasticsearchClientSettings settings) => ResolveUrl(RouteValues, settings); diff --git a/src/Playground/Playground.csproj b/src/Playground/Playground.csproj index 7028e3859b..61254173cb 100644 --- a/src/Playground/Playground.csproj +++ b/src/Playground/Playground.csproj @@ -13,7 +13,7 @@ - +