Skip to content

Commit 102d3b1

Browse files
authored
Merge pull request danielgerlag#845 from Granjow/feature/restart-fixes
Fix lifecycle events when restarting Workflow Core
2 parents 68b98f0 + 2c594a0 commit 102d3b1

File tree

5 files changed

+76
-3
lines changed

5 files changed

+76
-3
lines changed

WorkflowCore.sln

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{EF47161E-E39
77
EndProject
88
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{F6AC9AEB-24EF-475A-B190-AA4D9E01270A}"
99
ProjectSection(SolutionItems) = preProject
10-
readme.md = readme.md
10+
README.md = README.md
1111
EndProjectSection
1212
EndProject
1313
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{5080DB09-CBE8-4C45-9957-C3BB7651755E}"

src/WorkflowCore/Interface/IDistributedLockProvider.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ namespace WorkflowCore.Interface
99
/// </remarks>
1010
public interface IDistributedLockProvider
1111
{
12+
/// <summary>
13+
/// Acquire a lock on the specified resource.
14+
/// </summary>
15+
/// <param name="Id">Resource ID to lock.</param>
16+
/// <param name="cancellationToken"></param>
17+
/// <returns>`true`, if the lock was acquired.</returns>
1218
Task<bool> AcquireLock(string Id, CancellationToken cancellationToken);
1319

1420
Task ReleaseLock(string Id);

src/WorkflowCore/Services/LifeCycleEventPublisher.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class LifeCycleEventPublisher : ILifeCycleEventPublisher, IDisposable
1111
{
1212
private readonly ILifeCycleEventHub _eventHub;
1313
private readonly ILogger _logger;
14-
private readonly BlockingCollection<LifeCycleEvent> _outbox;
14+
private BlockingCollection<LifeCycleEvent> _outbox;
1515
private Task _dispatchTask;
1616

1717
public LifeCycleEventPublisher(ILifeCycleEventHub eventHub, ILoggerFactory loggerFactory)
@@ -36,6 +36,11 @@ public void Start()
3636
throw new InvalidOperationException();
3737
}
3838

39+
if (_outbox.IsAddingCompleted)
40+
{
41+
_outbox = new BlockingCollection<LifeCycleEvent>();
42+
}
43+
3944
_dispatchTask = new Task(Execute);
4045
_dispatchTask.Start();
4146
}

src/WorkflowCore/Services/WorkflowHost.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ public WorkflowHost(IPersistenceProvider persistenceStore, IQueueProvider queueP
4747
_searchIndex = searchIndex;
4848
_activityController = activityController;
4949
_lifeCycleEventHub = lifeCycleEventHub;
50-
_lifeCycleEventHub.Subscribe(HandleLifeCycleEvent);
5150
}
5251

5352
public Task<string> StartWorkflow(string workflowId, object data = null, string reference=null)
@@ -91,6 +90,10 @@ public async Task StartAsync(CancellationToken cancellationToken)
9190
await _lifeCycleEventHub.Start();
9291
await _searchIndex.Start();
9392

93+
// Event subscriptions are removed when stopping the event hub.
94+
// Add them when starting.
95+
AddEventSubscriptions();
96+
9497
Logger.LogInformation("Starting background tasks");
9598

9699
foreach (var task in _backgroundTasks)
@@ -181,5 +184,10 @@ public Task SubmitActivityFailure(string token, object result)
181184
{
182185
return _activityController.SubmitActivityFailure(token, result);
183186
}
187+
188+
private void AddEventSubscriptions()
189+
{
190+
_lifeCycleEventHub.Subscribe(HandleLifeCycleEvent);
191+
}
184192
}
185193
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Microsoft.Extensions.Logging;
4+
using Moq;
5+
using WorkflowCore.Interface;
6+
using WorkflowCore.Models.LifeCycleEvents;
7+
using WorkflowCore.Services;
8+
using Xunit;
9+
10+
namespace WorkflowCore.UnitTests.Services
11+
{
12+
public class LifeCycleEventPublisherTests
13+
{
14+
[Fact(DisplayName = "Notifications should be published when the publisher is running")]
15+
public async Task PublishNotification_WhenStarted_PublishesNotification()
16+
{
17+
// Arrange
18+
var wasCalled = new TaskCompletionSource<bool>();
19+
var eventHubMock = new Mock<ILifeCycleEventHub>();
20+
eventHubMock
21+
.Setup(hub => hub.PublishNotification(It.IsAny<StepCompleted>()))
22+
.Callback(() => wasCalled.SetResult(true));
23+
LifeCycleEventPublisher publisher = new LifeCycleEventPublisher(eventHubMock.Object, new LoggerFactory());
24+
25+
// Act
26+
publisher.Start();
27+
publisher.PublishNotification(new StepCompleted());
28+
29+
// Assert
30+
await wasCalled.Task;
31+
}
32+
33+
[Fact(DisplayName = "Notifications should be published when the publisher is running")]
34+
public async Task PublishNotification_WhenRestarted_PublishesNotification()
35+
{
36+
// Arrange
37+
var wasCalled = new TaskCompletionSource<bool>();
38+
var eventHubMock = new Mock<ILifeCycleEventHub>();
39+
eventHubMock
40+
.Setup(hub => hub.PublishNotification(It.IsAny<StepCompleted>()))
41+
.Callback(() => wasCalled.SetResult(true));
42+
LifeCycleEventPublisher publisher = new LifeCycleEventPublisher(eventHubMock.Object, new LoggerFactory());
43+
44+
// Act
45+
publisher.Start();
46+
publisher.Stop();
47+
publisher.Start();
48+
publisher.PublishNotification(new StepCompleted());
49+
50+
// Assert
51+
await wasCalled.Task;
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)