Skip to content

Commit 0952a6e

Browse files
authored
Add OnBeforeRequest callback (#8541)
1 parent 2499997 commit 0952a6e

File tree

8 files changed

+204
-86
lines changed

8 files changed

+204
-86
lines changed

docs/reference/transport.md

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ mapped_pages:
33
- https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/transport.html
44
---
55

6-
# Transport example [transport]
6+
# Low level Transport example [low-level-transport]
77

88
This page demonstrates how to use the low level transport to send requests.
99

@@ -16,8 +16,10 @@ public class MyRequestParameters : RequestParameters
1616
init => Q("pretty", value);
1717
}
1818
}
19+
```
1920

20-
// ...
21+
```csharp
22+
using Elastic.Transport;
2123

2224
var body = """
2325
{
@@ -49,3 +51,85 @@ var response = await client.Transport
4951
.ConfigureAwait(false);
5052
```
5153

54+
# `OnBeforeRequest` example [on-before-request]
55+
56+
The `OnBeforeRequest` callback in `IElasticsearchClientSettings` can be used to dynamically modify requests.
57+
58+
```csharp
59+
var settings = new ElasticsearchClientSettings(new Uri("http://localhost:9200))
60+
.OnBeforeRequest(OnBeforeRequest); <1>
61+
62+
RequestConfiguration? globalRequestConfiguration = null;
63+
ConditionalWeakTable<RequestConfiguration, RequestConfiguration>? globalRequestConfigurations = null;
64+
65+
void OnBeforeRequest(ElasticsearchClient client, Request request, EndpointPath endpointPath, ref PostData? postData, ref IRequestConfiguration? requestConfiguration)
66+
{
67+
// Each time a request is made, the transport creates a new `BoundConfiguration` for every `IRequestConfiguration`
68+
// that is not in the cache (based on reference equality).
69+
70+
// To prevent frequent allocations of our mutated request configurations (and the secondary allocations for
71+
// `BoundConfiguration`), we have to maintain a custom cache that maps every original request configuration to the
72+
// mutated one.
73+
74+
if (requestConfiguration is null)
75+
{
76+
globalRequestConfiguration = Interlocked.CompareExchange(
77+
ref globalRequestConfiguration,
78+
new RequestConfiguration
79+
{
80+
UserAgent = UserAgent.Create("my-custom-user-agent")
81+
},
82+
null) ?? globalRequestConfiguration;
83+
84+
requestConfiguration = globalRequestConfiguration;
85+
return;
86+
}
87+
88+
if (requestConfiguration is not RequestConfiguration rc)
89+
{
90+
// Only `RequestConfiguration` (not all implementations of `IRequestConfiguration`) gets cached in the
91+
// internal cache.
92+
requestConfiguration = MutateRequestConfiguration(requestConfiguration);
93+
return;
94+
}
95+
96+
// ReSharper disable InconsistentlySynchronizedField
97+
98+
var cache = (Interlocked.CompareExchange(
99+
ref globalRequestConfigurations,
100+
new ConditionalWeakTable<RequestConfiguration, RequestConfiguration>(),
101+
null
102+
) ?? globalRequestConfigurations);
103+
104+
if (cache.TryGetValue(rc, out var mutatedRequestConfiguration))
105+
{
106+
requestConfiguration = mutatedRequestConfiguration;
107+
return;
108+
}
109+
110+
mutatedRequestConfiguration = MutateRequestConfiguration(rc);
111+
112+
#if NET8_0_OR_GREATER
113+
cache.TryAdd(rc, mutatedRequestConfiguration);
114+
#else
115+
lock (cache)
116+
{
117+
cache.Add(rc, mutatedRequestConfiguration);
118+
}
119+
#endif
120+
121+
// ReSharper restore InconsistentlySynchronizedField
122+
123+
return;
124+
125+
RequestConfiguration MutateRequestConfiguration(IRequestConfiguration requestConfiguration)
126+
{
127+
return new RequestConfiguration(requestConfiguration)
128+
{
129+
UserAgent = UserAgent.Create("my-custom-user-agent")
130+
};
131+
}
132+
}
133+
```
134+
135+
1. Register the `OnBeforeRequest` callback.

src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
</PropertyGroup>
3232

3333
<ItemGroup>
34-
<PackageReference Include="Elastic.Transport" Version="0.8.0" />
34+
<PackageReference Include="Elastic.Transport" Version="0.8.1" />
3535
<PackageReference Include="PolySharp" Version="1.15.0">
3636
<PrivateAssets>all</PrivateAssets>
3737
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>

src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.cs

Lines changed: 57 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,9 @@
66
using System.Collections.Generic;
77
using System.Diagnostics;
88
using System.Linq;
9-
using System.Runtime.CompilerServices;
10-
using System.Text.Json;
119
using System.Threading.Tasks;
1210
using System.Threading;
1311
using Elastic.Transport;
14-
using Elastic.Transport.Diagnostics;
1512

1613
using Elastic.Clients.Elasticsearch.Requests;
1714

@@ -28,18 +25,23 @@ public partial class ElasticsearchClient
2825
private const string OpenTelemetrySchemaVersion = "https://opentelemetry.io/schemas/1.21.0";
2926

3027
private readonly ITransport<IElasticsearchClientSettings> _transport;
31-
internal static ConditionalWeakTable<JsonSerializerOptions, IElasticsearchClientSettings> SettingsTable { get; } = new();
3228

3329
/// <summary>
3430
/// Creates a client configured to connect to http://localhost:9200.
3531
/// </summary>
36-
public ElasticsearchClient() : this(new ElasticsearchClientSettings(new Uri("http://localhost:9200"))) { }
32+
public ElasticsearchClient() :
33+
this(new ElasticsearchClientSettings(new Uri("http://localhost:9200")))
34+
{
35+
}
3736

3837
/// <summary>
3938
/// Creates a client configured to connect to a node reachable at the provided <paramref name="uri" />.
4039
/// </summary>
4140
/// <param name="uri">The <see cref="Uri" /> to connect to.</param>
42-
public ElasticsearchClient(Uri uri) : this(new ElasticsearchClientSettings(uri)) { }
41+
public ElasticsearchClient(Uri uri) :
42+
this(new ElasticsearchClientSettings(uri))
43+
{
44+
}
4345

4446
/// <summary>
4547
/// Creates a client configured to communicate with Elastic Cloud using the provided <paramref name="cloudId" />.
@@ -51,8 +53,8 @@ public ElasticsearchClient(Uri uri) : this(new ElasticsearchClientSettings(uri))
5153
/// </summary>
5254
/// <param name="cloudId">The Cloud ID of an Elastic Cloud deployment.</param>
5355
/// <param name="credentials">The credentials to use for the connection.</param>
54-
public ElasticsearchClient(string cloudId, AuthorizationHeader credentials) : this(
55-
new ElasticsearchClientSettings(cloudId, credentials))
56+
public ElasticsearchClient(string cloudId, AuthorizationHeader credentials) :
57+
this(new ElasticsearchClientSettings(cloudId, credentials))
5658
{
5759
}
5860

@@ -69,8 +71,7 @@ internal ElasticsearchClient(ITransport<IElasticsearchClientSettings> transport)
6971
{
7072
transport.ThrowIfNull(nameof(transport));
7173
transport.Configuration.ThrowIfNull(nameof(transport.Configuration));
72-
transport.Configuration.RequestResponseSerializer.ThrowIfNull(
73-
nameof(transport.Configuration.RequestResponseSerializer));
74+
transport.Configuration.RequestResponseSerializer.ThrowIfNull(nameof(transport.Configuration.RequestResponseSerializer));
7475
transport.Configuration.Inferrer.ThrowIfNull(nameof(transport.Configuration.Inferrer));
7576

7677
_transport = transport;
@@ -96,47 +97,38 @@ private enum ProductCheckStatus
9697

9798
private partial void SetupNamespaces();
9899

99-
internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(TRequest request)
100-
where TRequest : Request<TRequestParameters>
101-
where TResponse : TransportResponse, new()
102-
where TRequestParameters : RequestParameters, new() =>
103-
DoRequest<TRequest, TResponse, TRequestParameters>(request, null);
104-
105100
internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(
106-
TRequest request,
107-
Action<IRequestConfiguration>? forceConfiguration)
101+
TRequest request)
108102
where TRequest : Request<TRequestParameters>
109103
where TResponse : TransportResponse, new()
110104
where TRequestParameters : RequestParameters, new()
111-
=> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(false, request, forceConfiguration).EnsureCompleted();
112-
113-
internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>(
114-
TRequest request,
115-
CancellationToken cancellationToken = default)
116-
where TRequest : Request<TRequestParameters>
117-
where TResponse : TransportResponse, new()
118-
where TRequestParameters : RequestParameters, new()
119-
=> DoRequestAsync<TRequest, TResponse, TRequestParameters>(request, null, cancellationToken);
105+
{
106+
return DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(false, request).EnsureCompleted();
107+
}
120108

121109
internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>(
122110
TRequest request,
123-
Action<IRequestConfiguration>? forceConfiguration,
124111
CancellationToken cancellationToken = default)
125112
where TRequest : Request<TRequestParameters>
126113
where TResponse : TransportResponse, new()
127114
where TRequestParameters : RequestParameters, new()
128-
=> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(true, request, forceConfiguration, cancellationToken).AsTask();
115+
{
116+
return DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(true, request, cancellationToken).AsTask();
117+
}
129118

130119
private ValueTask<TResponse> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(
131120
bool isAsync,
132121
TRequest request,
133-
Action<IRequestConfiguration>? forceConfiguration,
134122
CancellationToken cancellationToken = default)
135123
where TRequest : Request<TRequestParameters>
136124
where TResponse : TransportResponse, new()
137125
where TRequestParameters : RequestParameters, new()
138126
{
139-
// The product check modifies request parameters and therefore must not be executed concurrently.
127+
if (request is null)
128+
{
129+
throw new ArgumentNullException(nameof(request));
130+
}
131+
140132
// We use a lockless CAS approach to make sure that only a single product check request is executed at a time.
141133
// We do not guarantee that the product check is always performed on the first request.
142134

@@ -157,12 +149,12 @@ private ValueTask<TResponse> DoRequestCoreAsync<TRequest, TResponse, TRequestPar
157149

158150
ValueTask<TResponse> SendRequest()
159151
{
160-
var (endpointPath, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request);
161-
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, resolvedRouteValues);
152+
PrepareRequest<TRequest, TRequestParameters>(request, out var endpointPath, out var postData, out var requestConfiguration, out var routeValues);
153+
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, routeValues);
162154

163155
return isAsync
164-
? new ValueTask<TResponse>(_transport.RequestAsync<TResponse>(endpointPath, postData, openTelemetryDataMutator, request.RequestConfiguration, cancellationToken))
165-
: new ValueTask<TResponse>(_transport.Request<TResponse>(endpointPath, postData, openTelemetryDataMutator, request.RequestConfiguration));
156+
? new ValueTask<TResponse>(_transport.RequestAsync<TResponse>(endpointPath, postData, openTelemetryDataMutator, requestConfiguration, cancellationToken))
157+
: new ValueTask<TResponse>(_transport.Request<TResponse>(endpointPath, postData, openTelemetryDataMutator, requestConfiguration));
166158
}
167159

168160
async ValueTask<TResponse> SendRequestWithProductCheck()
@@ -178,34 +170,35 @@ async ValueTask<TResponse> SendRequestWithProductCheck()
178170
// 32-bit read/write operations are atomic and due to the initial memory barrier, we can be sure that
179171
// no other thread executes the product check at the same time. Locked access is not required here.
180172
if (_productCheckStatus is (int)ProductCheckStatus.InProgress)
173+
{
181174
_productCheckStatus = (int)ProductCheckStatus.NotChecked;
175+
}
182176

183177
throw;
184178
}
185179
}
186180

187181
async ValueTask<TResponse> SendRequestWithProductCheckCore()
188182
{
183+
PrepareRequest<TRequest, TRequestParameters>(request, out var endpointPath, out var postData, out var requestConfiguration, out var routeValues);
184+
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, routeValues);
185+
189186
// Attach product check header
190187

191-
// TODO: The copy constructor should accept null values
192-
var requestConfig = (request.RequestConfiguration is null)
193-
? new RequestConfiguration()
188+
var requestConfig = (requestConfiguration is null)
189+
? new RequestConfiguration
194190
{
195191
ResponseHeadersToParse = new HeadersList("x-elastic-product")
196192
}
197-
: new RequestConfiguration(request.RequestConfiguration)
193+
: new RequestConfiguration(requestConfiguration)
198194
{
199-
ResponseHeadersToParse = (request.RequestConfiguration.ResponseHeadersToParse is { Count: > 0 })
200-
? new HeadersList(request.RequestConfiguration.ResponseHeadersToParse, "x-elastic-product")
195+
ResponseHeadersToParse = (requestConfiguration.ResponseHeadersToParse is { Count: > 0 })
196+
? new HeadersList(requestConfiguration.ResponseHeadersToParse, "x-elastic-product")
201197
: new HeadersList("x-elastic-product")
202198
};
203199

204200
// Send request
205201

206-
var (endpointPath, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request);
207-
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, resolvedRouteValues);
208-
209202
TResponse response;
210203

211204
if (isAsync)
@@ -239,7 +232,9 @@ async ValueTask<TResponse> SendRequestWithProductCheckCore()
239232
: (int)ProductCheckStatus.Failed;
240233

241234
if (_productCheckStatus == (int)ProductCheckStatus.Failed)
235+
{
242236
throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError);
237+
}
243238

244239
return response;
245240
}
@@ -249,15 +244,17 @@ async ValueTask<TResponse> SendRequestWithProductCheckCore()
249244
where TRequest : Request<TRequestParameters>
250245
where TRequestParameters : RequestParameters, new()
251246
{
252-
// If there are no subscribed listeners, we avoid some work and allocations
247+
// If there are no subscribed listeners, we avoid some work and allocations.
253248
if (!Elastic.Transport.Diagnostics.OpenTelemetry.ElasticTransportActivitySourceHasListeners)
249+
{
254250
return null;
251+
}
255252

256253
return OpenTelemetryDataMutator;
257254

258255
void OpenTelemetryDataMutator(Activity activity)
259256
{
260-
// We fall back to a general operation name in cases where the derived request fails to override the property
257+
// We fall back to a general operation name in cases where the derived request fails to override the property.
261258
var operationName = !string.IsNullOrEmpty(request.OperationName) ? request.OperationName : request.HttpMethod.GetStringValue();
262259

263260
// TODO: Optimisation: We should consider caching these, either for cases where resolvedRouteValues is null, or
@@ -267,7 +264,7 @@ void OpenTelemetryDataMutator(Activity activity)
267264
// The latter may bloat the cache as some combinations of path parts may rarely re-occur.
268265

269266
activity.DisplayName = operationName;
270-
267+
271268
activity.SetTag(OpenTelemetry.SemanticConventions.DbOperation, !string.IsNullOrEmpty(request.OperationName) ? request.OperationName : "unknown");
272269
activity.SetTag($"{OpenTelemetrySpanAttributePrefix}schema_url", OpenTelemetrySchemaVersion);
273270

@@ -282,21 +279,26 @@ void OpenTelemetryDataMutator(Activity activity)
282279
}
283280
}
284281

285-
private (EndpointPath endpointPath, Dictionary<string, string>? resolvedRouteValues, PostData data) PrepareRequest<TRequest, TRequestParameters>(TRequest request)
282+
private void PrepareRequest<TRequest, TRequestParameters>(
283+
TRequest request,
284+
out EndpointPath endpointPath,
285+
out PostData? postData,
286+
out IRequestConfiguration? requestConfiguration,
287+
out Dictionary<string, string>? routeValues)
286288
where TRequest : Request<TRequestParameters>
287289
where TRequestParameters : RequestParameters, new()
288290
{
289-
request.ThrowIfNull(nameof(request), "A request is required.");
290-
291-
var (resolvedUrl, _, routeValues) = request.GetUrl(ElasticsearchClientSettings);
291+
var (resolvedUrl, _, resolvedRouteValues) = request.GetUrl(ElasticsearchClientSettings);
292292
var pathAndQuery = request.RequestParameters.CreatePathWithQueryStrings(resolvedUrl, ElasticsearchClientSettings);
293293

294-
var postData =
295-
request.HttpMethod == HttpMethod.GET ||
296-
request.HttpMethod == HttpMethod.HEAD || !request.SupportsBody
294+
routeValues = resolvedRouteValues;
295+
endpointPath = new EndpointPath(request.HttpMethod, pathAndQuery);
296+
postData =
297+
request.HttpMethod is HttpMethod.GET or HttpMethod.HEAD || !request.SupportsBody
297298
? null
298299
: PostData.Serializable(request);
299300

300-
return (new EndpointPath(request.HttpMethod, pathAndQuery), routeValues, postData);
301+
requestConfiguration = request.RequestConfiguration;
302+
ElasticsearchClientSettings.OnBeforeRequest?.Invoke(this, request, endpointPath, ref postData, ref requestConfiguration);
301303
}
302304
}

0 commit comments

Comments
 (0)