Skip to content

Commit 238e766

Browse files
authored
Merge pull request danielgerlag#222 from Kahbazi/LifeCycleEventPublisher
Use BlockingCollection for LifeCycleEventPublisher
2 parents 3717923 + ce037c0 commit 238e766

File tree

1 file changed

+22
-29
lines changed

1 file changed

+22
-29
lines changed

src/WorkflowCore/Services/LifeCycleEventPublisher.cs

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,26 @@
99

1010
namespace WorkflowCore.Services
1111
{
12-
public class LifeCycleEventPublisher : ILifeCycleEventPublisher
12+
public class LifeCycleEventPublisher : ILifeCycleEventPublisher, IDisposable
1313
{
1414
private readonly ILifeCycleEventHub _eventHub;
1515
private readonly ILogger _logger;
16-
private readonly ConcurrentQueue<LifeCycleEvent> _outbox;
16+
private readonly BlockingCollection<LifeCycleEvent> _outbox;
1717
protected Task DispatchTask;
18-
private CancellationTokenSource _cancellationTokenSource;
1918

2019
public LifeCycleEventPublisher(ILifeCycleEventHub eventHub, ILoggerFactory loggerFactory)
2120
{
2221
_eventHub = eventHub;
23-
_outbox = new ConcurrentQueue<LifeCycleEvent>();
22+
_outbox = new BlockingCollection<LifeCycleEvent>();
2423
_logger = loggerFactory.CreateLogger(GetType());
2524
}
2625

2726
public void PublishNotification(LifeCycleEvent evt)
2827
{
29-
_outbox.Enqueue(evt);
28+
if (_outbox.IsAddingCompleted)
29+
return;
30+
31+
_outbox.Add(evt);
3032
}
3133

3234
public void Start()
@@ -36,45 +38,36 @@ public void Start()
3638
throw new InvalidOperationException();
3739
}
3840

39-
_cancellationTokenSource = new CancellationTokenSource();
40-
4141
DispatchTask = new Task(Execute);
4242
DispatchTask.Start();
4343
}
4444

4545
public void Stop()
4646
{
47-
_cancellationTokenSource.Cancel();
47+
_outbox.CompleteAdding();
48+
4849
DispatchTask.Wait();
4950
DispatchTask = null;
5051
}
5152

53+
public void Dispose()
54+
{
55+
_outbox.Dispose();
56+
}
57+
5258
private async void Execute()
5359
{
54-
var cancelToken = _cancellationTokenSource.Token;
55-
56-
while (!cancelToken.IsCancellationRequested)
60+
try
5761
{
58-
try
62+
foreach (var evt in _outbox.GetConsumingEnumerable())
5963
{
60-
if (!SpinWait.SpinUntil(() => _outbox.Count > 0, 1000))
61-
{
62-
continue;
63-
}
64-
65-
if (_outbox.TryDequeue(out LifeCycleEvent evt))
66-
{
67-
await _eventHub.PublishNotification(evt);
68-
}
69-
}
70-
catch (OperationCanceledException)
71-
{
72-
}
73-
catch (Exception ex)
74-
{
75-
_logger.LogError(ex.Message);
64+
await _eventHub.PublishNotification(evt);
7665
}
7766
}
67+
catch (Exception ex)
68+
{
69+
_logger.LogError(default(EventId), ex, ex.Message);
70+
}
7871
}
7972
}
80-
}
73+
}

0 commit comments

Comments
 (0)