Skip to content

Commit 89163d2

Browse files
authored
Sync execution of workflows (danielgerlag#381)
1 parent 1ff237f commit 89163d2

File tree

4 files changed

+116
-1
lines changed

4 files changed

+116
-1
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using WorkflowCore.Models;
4+
5+
namespace WorkflowCore.Interface
6+
{
7+
public interface ISyncWorkflowRunner
8+
{
9+
Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, int version, TData data, string reference, TimeSpan timeOut, bool persistSate = true)
10+
where TData : new();
11+
}
12+
}

src/WorkflowCore/ServiceCollectionExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A
5959
services.AddTransient<IPooledObjectPolicy<IPersistenceProvider>, InjectedObjectPoolPolicy<IPersistenceProvider>>();
6060
services.AddTransient<IPooledObjectPolicy<IWorkflowExecutor>, InjectedObjectPoolPolicy<IWorkflowExecutor>>();
6161

62+
services.AddTransient<ISyncWorkflowRunner, SyncWorkflowRunner>();
6263
services.AddTransient<IDefinitionLoader, DefinitionLoader>();
6364

6465
services.AddTransient<Foreach>();
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
using System;
2+
using System.Diagnostics;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using WorkflowCore.Exceptions;
6+
using WorkflowCore.Interface;
7+
using WorkflowCore.Models;
8+
using WorkflowCore.Models.LifeCycleEvents;
9+
10+
namespace WorkflowCore.Services
11+
{
12+
public class SyncWorkflowRunner : ISyncWorkflowRunner
13+
{
14+
private readonly IWorkflowHost _host;
15+
private readonly IWorkflowExecutor _executor;
16+
private readonly IDistributedLockProvider _lockService;
17+
private readonly IWorkflowRegistry _registry;
18+
private readonly IPersistenceProvider _persistenceStore;
19+
private readonly IExecutionPointerFactory _pointerFactory;
20+
private readonly IQueueProvider _queueService;
21+
22+
public SyncWorkflowRunner(IWorkflowHost host, IWorkflowExecutor executor, IDistributedLockProvider lockService, IWorkflowRegistry registry, IPersistenceProvider persistenceStore, IExecutionPointerFactory pointerFactory, IQueueProvider queueService)
23+
{
24+
_host = host;
25+
_executor = executor;
26+
_lockService = lockService;
27+
_registry = registry;
28+
_persistenceStore = persistenceStore;
29+
_pointerFactory = pointerFactory;
30+
_queueService = queueService;
31+
}
32+
33+
public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, int version, TData data, string reference, TimeSpan timeOut, bool persistSate = true)
34+
where TData : new()
35+
{
36+
var def = _registry.GetDefinition(workflowId, version);
37+
if (def == null)
38+
{
39+
throw new WorkflowNotRegisteredException(workflowId, version);
40+
}
41+
42+
var wf = new WorkflowInstance
43+
{
44+
WorkflowDefinitionId = workflowId,
45+
Version = def.Version,
46+
Data = data,
47+
Description = def.Description,
48+
NextExecution = 0,
49+
CreateTime = DateTime.Now.ToUniversalTime(),
50+
Status = WorkflowStatus.Suspended,
51+
Reference = reference
52+
};
53+
54+
if ((def.DataType != null) && (data == null))
55+
{
56+
if (typeof(TData) == def.DataType)
57+
wf.Data = new TData();
58+
else
59+
wf.Data = def.DataType.GetConstructor(new Type[0]).Invoke(new object[0]);
60+
}
61+
62+
wf.ExecutionPointers.Add(_pointerFactory.BuildGenesisPointer(def));
63+
64+
var stopWatch = new Stopwatch();
65+
66+
var id = Guid.NewGuid().ToString();
67+
68+
if (persistSate)
69+
id = await _persistenceStore.CreateNewWorkflow(wf);
70+
else
71+
wf.Id = id;
72+
73+
wf.Status = WorkflowStatus.Runnable;
74+
75+
if (!await _lockService.AcquireLock(id, CancellationToken.None))
76+
{
77+
throw new InvalidOperationException();
78+
}
79+
80+
try
81+
{
82+
stopWatch.Start();
83+
while ((wf.Status == WorkflowStatus.Runnable) && (timeOut.TotalMilliseconds > stopWatch.ElapsedMilliseconds))
84+
{
85+
await _executor.Execute(wf);
86+
if (persistSate)
87+
await _persistenceStore.PersistWorkflow(wf);
88+
}
89+
}
90+
finally
91+
{
92+
stopWatch.Stop();
93+
await _lockService.ReleaseLock(id);
94+
}
95+
96+
if (persistSate)
97+
await _queueService.QueueWork(id, QueueType.Index);
98+
99+
return wf;
100+
}
101+
}
102+
}

src/WorkflowCore/WorkflowCore.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
1616
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
1717
<Description>Workflow Core is a light weight workflow engine targeting .NET Standard.</Description>
18-
<Version>2.0.1</Version>
18+
<Version>2.1.0</Version>
1919
<AssemblyVersion>2.0.1.0</AssemblyVersion>
2020
<FileVersion>2.0.1.0</FileVersion>
2121
<PackageReleaseNotes></PackageReleaseNotes>

0 commit comments

Comments
 (0)