Skip to content

Commit 37835cd

Browse files
authored
Merge pull request #65 from kuzzleio/2.0.5-proposal
# [2.0.5](https://github.com/kuzzleio/sdk-csharp/releases/tag/2.0.5) (2020-09-29) #### Bug fixes - [ [#64](#64) ] Fix network recovery ([scottinet](https://github.com/scottinet)) ---
2 parents b253d47 + c1a4ee7 commit 37835cd

File tree

9 files changed

+144
-75
lines changed

9 files changed

+144
-75
lines changed

Kuzzle.Tests/Offline/Query/QueryReplayerTest.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,12 @@ public void SuccessRejectAllQueries() {
147147
testableOfflineManager.MaxQueueSize = -1;
148148
queryReplayer.Lock = false;
149149

150-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foo', action: 'bar'}")));
151-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'bar', action: 'foor'}")));
152-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'foobar'}")));
153-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'foobar'}")));
154-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'barfoo'}")));
155-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'barfoo'}")));
150+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '1', controller: 'foo', action: 'bar'}")));
151+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '2', controller: 'bar', action: 'foor'}")));
152+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '3', controller: 'foobar', action: 'foobar'}")));
153+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '4', controller: 'barfoo', action: 'foobar'}")));
154+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '5', controller: 'foobar', action: 'barfoo'}")));
155+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '6', controller: 'barfoo', action: 'barfoo'}")));
156156

157157
Assert.Equal(6, queryReplayer.Count);
158158

@@ -168,12 +168,12 @@ public void SuccessRejectQueries() {
168168
testableOfflineManager.MaxQueueSize = -1;
169169
queryReplayer.Lock = false;
170170

171-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foo', action: 'bar'}")));
172-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'bar', action: 'foor'}")));
173-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'foobar'}")));
174-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'foobar'}")));
175-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'foobar', action: 'barfoo'}")));
176-
Assert.True(queryReplayer.Enqueue(JObject.Parse("{controller: 'barfoo', action: 'barfoo'}")));
171+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '1', controller: 'foo', action: 'bar'}")));
172+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '2', controller: 'bar', action: 'foor'}")));
173+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '3', controller: 'foobar', action: 'foobar'}")));
174+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '4', controller: 'barfoo', action: 'foobar'}")));
175+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '5', controller: 'foobar', action: 'barfoo'}")));
176+
Assert.True(queryReplayer.Enqueue(JObject.Parse("{requestId: '6', controller: 'barfoo', action: 'barfoo'}")));
177177

178178
Assert.Equal(6, queryReplayer.Count);
179179

Kuzzle/Kuzzle.cs

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ internal interface IKuzzle {
7474
public sealed class Kuzzle : IKuzzleApi, IKuzzle {
7575
private AbstractProtocol networkProtocol;
7676

77+
private SemaphoreSlim requestsSemaphore = new SemaphoreSlim(1, 1);
78+
7779
internal readonly Dictionary<string, TaskCompletionSource<Response>>
7880
requests = new Dictionary<string, TaskCompletionSource<Response>>();
7981

@@ -220,7 +222,6 @@ public AbstractProtocol NetworkProtocol {
220222
}
221223
}
222224

223-
224225
/// <summary>
225226
/// Handles the ResponseEvent event from the network protocol
226227
/// </summary>
@@ -229,39 +230,55 @@ public AbstractProtocol NetworkProtocol {
229230
internal void ResponsesListener(object sender, string payload) {
230231
Response response = Response.FromString(payload);
231232

232-
if (requests.ContainsKey(response.Room)) {
233-
if (response.Error != null) {
234-
if (response.Error.Message == "Token expired") {
235-
EventHandler.DispatchTokenExpired();
236-
}
233+
if (!requests.ContainsKey(response.Room)) {
234+
EventHandler.DispatchUnhandledResponse(response);
235+
return;
236+
}
237237

238-
requests[response.RequestId].SetException(
239-
new Exceptions.ApiErrorException(response));
240-
} else {
241-
requests[response.RequestId].SetResult(response);
242-
}
238+
TaskCompletionSource<Response> task = requests[response.RequestId];
243239

244-
lock (requests) {
245-
requests.Remove(response.RequestId);
240+
if (response.Error != null) {
241+
if (response.Error.Message == "Token expired") {
242+
EventHandler.DispatchTokenExpired();
246243
}
247244

248-
Offline?.QueryReplayer?.Remove((obj) => obj["requestId"].ToString() == response.RequestId);
245+
task.SetException(new Exceptions.ApiErrorException(response));
246+
}
247+
else {
248+
task.SetResult(response);
249+
}
249250

250-
} else {
251-
EventHandler.DispatchUnhandledResponse(response);
251+
requestsSemaphore.Wait();
252+
try {
253+
requests.Remove(response.RequestId);
254+
}
255+
finally {
256+
requestsSemaphore.Release();
252257
}
258+
259+
Offline?.QueryReplayer?.Remove(
260+
(obj) => obj["requestId"].ToString() == response.RequestId);
253261
}
254262

255263
internal void StateChangeListener(object sender, ProtocolState state) {
256-
// If not connected anymore: close tasks and clean up the requests buffer
257-
if (state == ProtocolState.Closed) {
258-
lock (requests) {
264+
// If not connected anymore: close pending tasks and clean up the requests
265+
// buffer.
266+
// If reconnecting, only requests submitted AFTER the disconnection event
267+
// can be queued: we have no information about requests submitted before
268+
// that event. For all we know, Kuzzle could have received & processed
269+
// those requests, but couldn't forward the response to us
270+
if (state == ProtocolState.Closed || state == ProtocolState.Reconnecting) {
271+
requestsSemaphore.Wait();
272+
try {
259273
foreach (var task in requests.Values) {
260274
task.SetException(new Exceptions.ConnectionLostException());
261275
}
262276

263277
requests.Clear();
264278
}
279+
finally {
280+
requestsSemaphore.Release();
281+
}
265282
}
266283
}
267284

@@ -382,14 +399,16 @@ public ConfiguredTaskAwaitable<Response> QueryAsync(JObject query) {
382399
query["volatile"]["sdkName"] = SdkName;
383400
query["volatile"]["sdkInstanceId"] = InstanceId;
384401

402+
requestsSemaphore.Wait();
403+
requests[requestId] = new TaskCompletionSource<Response>(
404+
TaskCreationOptions.RunContinuationsAsynchronously);
405+
requestsSemaphore.Release();
406+
385407
if (NetworkProtocol.State == ProtocolState.Open) {
386408
NetworkProtocol.Send(query);
387-
} else if (NetworkProtocol.State == ProtocolState.Reconnecting) {
388-
Offline.QueryReplayer.Enqueue(query);
389409
}
390-
391-
lock (requests) {
392-
requests[requestId] = new TaskCompletionSource<Response>();
410+
else if (NetworkProtocol.State == ProtocolState.Reconnecting) {
411+
Offline.QueryReplayer.Enqueue(query);
393412
}
394413

395414
return requests[requestId].Task.ConfigureAwait(false);

Kuzzle/Kuzzle.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@
4646
<MonoDevelop>
4747
<Properties>
4848
<Deployment.LinuxDeployData generatePcFile="False" />
49+
<Policies>
50+
<TextStylePolicy RemoveTrailingWhitespace="True" NoTabsAfterNonTabs="False" EolMarker="Native" FileWidth="80" TabWidth="2" TabsToSpaces="True" IndentWidth="2" scope="text/x-csharp" />
51+
<CSharpFormattingPolicy IndentBlock="True" IndentBraces="False" IndentSwitchSection="True" IndentSwitchCaseSection="True" LabelPositioning="OneLess" NewLineForElse="True" NewLineForCatch="True" NewLineForFinally="True" NewLineForMembersInObjectInit="True" NewLineForMembersInAnonymousTypes="True" NewLineForClausesInQuery="True" SpaceWithinMethodDeclarationParenthesis="False" SpaceBetweenEmptyMethodDeclarationParentheses="False" SpaceAfterMethodCallName="False" SpaceWithinMethodCallParentheses="False" SpaceBetweenEmptyMethodCallParentheses="False" SpaceAfterControlFlowStatementKeyword="True" SpaceWithinExpressionParentheses="False" SpaceWithinCastParentheses="False" SpaceWithinOtherParentheses="False" SpaceAfterCast="False" SpacesIgnoreAroundVariableDeclaration="False" SpaceBeforeOpenSquareBracket="False" SpaceBetweenEmptySquareBrackets="False" SpaceWithinSquareBrackets="False" SpaceAfterColonInBaseTypeDeclaration="True" SpaceAfterComma="True" SpaceAfterDot="False" SpaceAfterSemicolonsInForStatement="True" SpaceBeforeColonInBaseTypeDeclaration="True" SpaceBeforeComma="False" SpaceBeforeDot="False" SpaceBeforeSemicolonsInForStatement="False" SpacingAroundBinaryOperator="Single" WrappingPreserveSingleLine="True" WrappingKeepStatementsOnSingleLine="True" PlaceSystemDirectiveFirst="True" NewLinesForBracesInTypes="False" NewLinesForBracesInMethods="False" NewLinesForBracesInProperties="False" NewLinesForBracesInAccessors="False" NewLinesForBracesInAnonymousMethods="False" NewLinesForBracesInControlBlocks="False" NewLinesForBracesInAnonymousTypes="False" NewLinesForBracesInObjectCollectionArrayInitializers="False" NewLinesForBracesInLambdaExpressionBody="False" SpacingAfterMethodDeclarationName="True" scope="text/x-csharp" />
52+
</Policies>
4953
</Properties>
5054
</MonoDevelop>
5155
</ProjectExtensions>

Kuzzle/Kuzzle.nuspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<metadata>
44
<id>kuzzlesdk</id>
55
<title>Kuzzle SDK</title>
6-
<version>2.0.4</version>
6+
<version>2.0.5</version>
77
<authors>Kuzzle Team</authors>
88
<owners>Kuzzle Team</owners>
99
<license type="expression">Apache-2.0</license>

Kuzzle/Offline/OfflineManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,6 @@ internal override void OnUserLoggedIn(object sender, UserLoggedInEvent e) {
226226

227227
internal void StateChangeListener(object sender, ProtocolState state) {
228228
if (state == ProtocolState.Open && previousState == ProtocolState.Reconnecting) {
229-
230229
kuzzle.GetEventHandler().DispatchReconnected();
231230

232231
Task.Run(async () => {
@@ -235,6 +234,7 @@ internal void StateChangeListener(object sender, ProtocolState state) {
235234
});
236235

237236
}
237+
238238
previousState = state;
239239
}
240240

Kuzzle/Offline/Query/QueryReplayer.cs

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Diagnostics;
44
using System.Threading;
55
using System.Threading.Tasks;
6+
using KuzzleSdk.API;
67
using KuzzleSdk.API.Offline;
78
using Newtonsoft.Json.Linq;
89

@@ -73,6 +74,7 @@ internal sealed class QueryReplayer : IQueryReplayer {
7374
private CancellationTokenSource cancellationTokenSource;
7475
private bool currentlyReplaying = false;
7576
private Stopwatch stopWatch = new Stopwatch();
77+
private SemaphoreSlim queueSemaphore = new SemaphoreSlim(1, 1);
7678

7779
/// <summary>
7880
/// Tells if the QueryReplayer is locked (i.e. it doesn't accept new queries).
@@ -103,28 +105,35 @@ internal QueryReplayer(IOfflineManager offlineManager, IKuzzle kuzzle) {
103105
public bool Enqueue(JObject query) {
104106
if (Lock || WaitLoginToReplay) return false;
105107

106-
lock (queue) {
108+
queueSemaphore.Wait();
109+
try {
107110
if (queue.Count < offlineManager.MaxQueueSize || offlineManager.MaxQueueSize < 0) {
108111
if (queue.Count == 0) {
109112
stopWatch.Reset();
110113
stopWatch.Start();
111114
queue.Add(new TimedQuery(query, 0));
112-
} else {
115+
}
116+
else {
113117
TimedQuery previous = queue[queue.Count - 1];
114118
Int64 elapsedTime = stopWatch.ElapsedMilliseconds - previous.Time;
115119
elapsedTime = Math.Min(elapsedTime, offlineManager.MaxRequestDelay);
116120
queue.Add(new TimedQuery(query, previous.Time + elapsedTime));
117121
}
118-
if (query["controller"]?.ToString() == "auth"
119-
&& (query["action"]?.ToString() == "login"
120-
|| query["action"]?.ToString() == "logout")
121-
) {
122-
Lock = true;
123-
}
122+
123+
String controller = query["controller"]?.ToString();
124+
String action = query["action"]?.ToString();
125+
126+
if (controller == "auth" && (action == "login" || action == "logout")) {
127+
Lock = true;
128+
}
124129

125130
return true;
126131
}
127132
}
133+
finally {
134+
queueSemaphore.Release();
135+
}
136+
128137
return false;
129138
}
130139

@@ -139,7 +148,8 @@ public int Count {
139148
/// Remove and return the first query that has been added to the queue.
140149
/// </summary>
141150
public JObject Dequeue() {
142-
lock (queue) {
151+
queueSemaphore.Wait();
152+
try {
143153
if (queue.Count == 0) {
144154
return null;
145155
}
@@ -149,6 +159,9 @@ public JObject Dequeue() {
149159

150160
return query;
151161
}
162+
finally {
163+
queueSemaphore.Release();
164+
}
152165
}
153166

154167
/// <summary>
@@ -163,27 +176,42 @@ public void RejectAllQueries(Exception exception) {
163176
/// it is set with an exception and removed from the replayable queue.
164177
/// </summary>
165178
public void RejectQueries(Predicate<JObject> predicate, Exception exception) {
166-
lock (queue) {
179+
queueSemaphore.Wait();
180+
try {
167181
foreach (TimedQuery timedQuery in queue) {
168182
if (predicate(timedQuery.Query)) {
169-
kuzzle.GetRequestById(timedQuery.Query["requestId"]?.ToString())?.SetException(exception);
183+
String requestId = timedQuery.Query["requestId"]?.ToString();
184+
185+
if (requestId != null) {
186+
TaskCompletionSource<Response> task = kuzzle.GetRequestById(requestId);
187+
188+
if (task != null) {
189+
task.SetException(exception);
190+
}
191+
}
170192
}
171193
}
194+
172195
queue.RemoveAll((obj) => predicate(obj.Query));
196+
173197
if (queue.Count == 0) {
174198
Lock = false;
175199
currentlyReplaying = false;
176200
WaitLoginToReplay = false;
177201
}
178202
}
203+
finally {
204+
queueSemaphore.Release();
205+
}
179206
}
180207

181208
/// <summary>
182209
/// Remove every query that satisfies the predicate
183210
/// </summary>
184211
/// <returns>How many items where removed.</returns>
185212
public int Remove(Predicate<JObject> predicate) {
186-
lock (queue) {
213+
queueSemaphore.Wait();
214+
try {
187215
if (queue.Count > 0) {
188216
Predicate<TimedQuery> timedQueryPredicate = timedQuery => predicate(timedQuery.Query);
189217
int itemsRemoved = queue.RemoveAll(timedQueryPredicate);
@@ -196,19 +224,22 @@ public int Remove(Predicate<JObject> predicate) {
196224
return itemsRemoved;
197225
}
198226
}
227+
finally {
228+
queueSemaphore.Release();
229+
}
199230
return 0;
200231
}
201232

202233
/// <summary>
203234
/// Clear the queue.
204235
/// </summary>
205236
public void Clear() {
206-
lock (queue) {
207-
queue.Clear();
208-
Lock = false;
209-
currentlyReplaying = false;
210-
WaitLoginToReplay = false;
211-
}
237+
queueSemaphore.Wait();
238+
queue.Clear();
239+
Lock = false;
240+
currentlyReplaying = false;
241+
WaitLoginToReplay = false;
242+
queueSemaphore.Release();
212243
}
213244

214245
internal delegate Task ReplayQueryFunc(TimedQuery timedQuery, CancellationToken cancellationToken);
@@ -255,7 +286,8 @@ public CancellationTokenSource ReplayQueries(Predicate<JObject> predicate, bool
255286

256287
if (resetWaitLogin) WaitLoginToReplay = false;
257288

258-
lock (queue) {
289+
queueSemaphore.Wait();
290+
try {
259291
if (queue.Count > 0) {
260292
currentlyReplaying = true;
261293

@@ -267,6 +299,9 @@ public CancellationTokenSource ReplayQueries(Predicate<JObject> predicate, bool
267299

268300
}
269301
}
302+
finally {
303+
queueSemaphore.Release();
304+
}
270305
return cancellationTokenSource;
271306
}
272307

0 commit comments

Comments
 (0)