Skip to content

Commit db293fc

Browse files
committed
add foreach.RunParallel option to allow synchronous execution of child branches
1 parent 7df09a6 commit db293fc

File tree

17 files changed

+323
-9
lines changed

17 files changed

+323
-9
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ These are also available as separate Nuget packages.
160160

161161
* [Parallel ForEach](src/samples/WorkflowCore.Sample09)
162162

163+
* [Sync ForEach](src/samples/WorkflowCore.Sample09s)
164+
163165
* [While Loop](src/samples/WorkflowCore.Sample10)
164166

165167
* [If Statement](src/samples/WorkflowCore.Sample11)

WorkflowCore.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.DSL", "src\Wor
141141
EndProject
142142
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample18", "src\samples\WorkflowCore.Sample18\WorkflowCore.Sample18.csproj", "{5BE6D628-B9DB-4C76-AAEB-8F3800509A84}"
143143
EndProject
144+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Sample09s", "src\samples\WorkflowCore.Sample09s\WorkflowCore.Sample09s.csproj", "{E32CF21A-29CC-46D1-8044-FCC327F2B281}"
145+
EndProject
144146
Global
145147
GlobalSection(SolutionConfigurationPlatforms) = preSolution
146148
Debug|Any CPU = Debug|Any CPU
@@ -343,6 +345,10 @@ Global
343345
{5BE6D628-B9DB-4C76-AAEB-8F3800509A84}.Debug|Any CPU.Build.0 = Debug|Any CPU
344346
{5BE6D628-B9DB-4C76-AAEB-8F3800509A84}.Release|Any CPU.ActiveCfg = Release|Any CPU
345347
{5BE6D628-B9DB-4C76-AAEB-8F3800509A84}.Release|Any CPU.Build.0 = Release|Any CPU
348+
{E32CF21A-29CC-46D1-8044-FCC327F2B281}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
349+
{E32CF21A-29CC-46D1-8044-FCC327F2B281}.Debug|Any CPU.Build.0 = Debug|Any CPU
350+
{E32CF21A-29CC-46D1-8044-FCC327F2B281}.Release|Any CPU.ActiveCfg = Release|Any CPU
351+
{E32CF21A-29CC-46D1-8044-FCC327F2B281}.Release|Any CPU.Build.0 = Release|Any CPU
346352
EndGlobalSection
347353
GlobalSection(SolutionProperties) = preSolution
348354
HideSolutionNode = FALSE
@@ -400,6 +406,7 @@ Global
400406
{78217204-B873-40B9-8875-E3925B2FBCEC} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
401407
{20B98905-08CB-4854-8E2C-A31A078383E9} = {EF47161E-E399-451C-BDE8-E92AAD3BD761}
402408
{5BE6D628-B9DB-4C76-AAEB-8F3800509A84} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
409+
{E32CF21A-29CC-46D1-8044-FCC327F2B281} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
403410
EndGlobalSection
404411
GlobalSection(ExtensibilityGlobals) = postSolution
405412
SolutionGuid = {DC0FA8D3-6449-4FDA-BB46-ECF58FAD23B4}

src/WorkflowCore/Interface/IStepBuilder.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,13 @@ public interface IStepBuilder<TData, TStepBody>
186186
/// <returns></returns>
187187
IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection);
188188

189+
/// <summary>
190+
/// Execute a block of steps, once for each item in a collection in a RunParallel foreach
191+
/// </summary>
192+
/// <param name="collection">Resolves a collection for iterate over</param>
193+
/// <returns></returns>
194+
IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection, Expression<Func<TData, bool>> runParallel);
195+
189196
/// <summary>
190197
/// Repeat a block of steps until a condition becomes true
191198
/// </summary>
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace WorkflowCore.Models
2+
{
3+
public class IteratorPersistenceData : ControlPersistenceData
4+
{
5+
public int Index { get; set; } = 0;
6+
}
7+
}

src/WorkflowCore/Primitives/Foreach.cs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,45 @@ namespace WorkflowCore.Primitives
88
{
99
public class Foreach : ContainerStepBody
1010
{
11-
public IEnumerable Collection { get; set; }
11+
public IEnumerable Collection { get; set; }
12+
public bool RunParallel { get; set; } = true;
1213

1314
public override ExecutionResult Run(IStepExecutionContext context)
1415
{
1516
if (context.PersistenceData == null)
1617
{
1718
var values = Collection.Cast<object>();
18-
return ExecutionResult.Branch(new List<object>(values), new ControlPersistenceData() { ChildrenActive = true });
19+
if (RunParallel)
20+
{
21+
return ExecutionResult.Branch(new List<object>(values), new IteratorPersistenceData() { ChildrenActive = true });
22+
}
23+
else
24+
{
25+
return ExecutionResult.Branch(new List<object>(new object[] { values.ElementAt(0) }), new IteratorPersistenceData() { ChildrenActive = true });
26+
}
1927
}
2028

21-
if (context.PersistenceData is ControlPersistenceData)
29+
if (context.PersistenceData is IteratorPersistenceData persistanceData && persistanceData?.ChildrenActive == true)
2230
{
23-
if ((context.PersistenceData as ControlPersistenceData).ChildrenActive)
31+
if (context.Workflow.IsBranchComplete(context.ExecutionPointer.Id))
2432
{
25-
if (context.Workflow.IsBranchComplete(context.ExecutionPointer.Id))
33+
if (!RunParallel)
2634
{
27-
return ExecutionResult.Next();
35+
var values = Collection.Cast<object>();
36+
persistanceData.Index++;
37+
if (persistanceData.Index < values.Count())
38+
{
39+
return ExecutionResult.Branch(new List<object>(new object[] { values.ElementAt(persistanceData.Index) }), persistanceData);
40+
}
2841
}
42+
43+
return ExecutionResult.Next();
2944
}
45+
46+
return ExecutionResult.Persist(persistanceData);
3047
}
3148

3249
return ExecutionResult.Persist(context.PersistenceData);
33-
}
50+
}
3451
}
3552
}

src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,25 @@ public IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TD
306306
return stepBuilder;
307307
}
308308

309+
public IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection, Expression<Func<TData, bool>> runParallel)
310+
{
311+
var newStep = new WorkflowStep<Foreach>();
312+
313+
Expression<Func<Foreach, IEnumerable>> inputExpr = (x => x.Collection);
314+
newStep.Inputs.Add(new MemberMapParameter(collection, inputExpr));
315+
316+
Expression<Func<Foreach, bool>> pExpr = (x => x.RunParallel);
317+
newStep.Inputs.Add(new MemberMapParameter(runParallel, pExpr));
318+
319+
WorkflowBuilder.AddStep(newStep);
320+
var stepBuilder = new StepBuilder<TData, Foreach>(WorkflowBuilder, newStep);
321+
322+
Step.Outcomes.Add(new ValueOutcome() { NextStep = newStep.Id });
323+
324+
return stepBuilder;
325+
}
326+
327+
309328
public IContainerStepBuilder<TData, While, While> While(Expression<Func<TData, bool>> condition)
310329
{
311330
var newStep = new WorkflowStep<While>();

src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ static MongoPersistenceProvider()
7979

8080
BsonClassMap.RegisterClassMap<ControlPersistenceData>(x => x.AutoMap());
8181
BsonClassMap.RegisterClassMap<SchedulePersistenceData>(x => x.AutoMap());
82+
BsonClassMap.RegisterClassMap<IteratorPersistenceData>(x => x.AutoMap());
8283
}
8384

8485
static bool indexesCreated = false;

src/samples/WorkflowCore.Sample09/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Foreach sample
1+
# Foreach Parallel sample
22

33
Illustrates how to implement a parallel foreach within your workflow.
44

@@ -14,7 +14,7 @@ builder
1414
.Then<SayGoodbye>();
1515
```
1616

17-
or get the collectioin from workflow data.
17+
or get the collection from workflow data.
1818

1919
```c#
2020
builder
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using WorkflowCore.Interface;
5+
using WorkflowCore.Models;
6+
7+
namespace WorkflowCore.Sample09s
8+
{
9+
public class ForEachSyncWorkflow : IWorkflow
10+
{
11+
public string Id => "ForeachSync";
12+
public int Version => 1;
13+
14+
public void Build(IWorkflowBuilder<object> builder)
15+
{
16+
builder
17+
.StartWith<SayHello>()
18+
.ForEach(data => new List<int>() { 1, 2, 3, 4 }, data => false)
19+
.Do(x => x
20+
.StartWith<DisplayContext>()
21+
.Input(step => step.Item, (data, context) => context.Item)
22+
.Then<DoSomething>())
23+
.Then<SayGoodbye>();
24+
}
25+
}
26+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
using Microsoft.Extensions.Logging;
3+
using System;
4+
using WorkflowCore.Interface;
5+
6+
namespace WorkflowCore.Sample09s
7+
{
8+
class Program
9+
{
10+
public static void Main(string[] args)
11+
{
12+
IServiceProvider serviceProvider = ConfigureServices();
13+
14+
//start the workflow host
15+
var host = serviceProvider.GetService<IWorkflowHost>();
16+
host.RegisterWorkflow<ForEachSyncWorkflow>();
17+
host.Start();
18+
19+
Console.WriteLine("Starting workflow...");
20+
string workflowId = host.StartWorkflow("Foreach").Result;
21+
22+
23+
Console.ReadLine();
24+
host.Stop();
25+
}
26+
27+
private static IServiceProvider ConfigureServices()
28+
{
29+
//setup dependency injection
30+
IServiceCollection services = new ServiceCollection();
31+
services.AddLogging();
32+
services.AddWorkflow();
33+
//services.AddWorkflow(x => x.UseMongoDB(@"mongodb://localhost:27017", "workflow-test002"));
34+
//services.AddWorkflow(x => x.UseSqlServer(@"Server=.;Database=WorkflowCore3;Trusted_Connection=True;", true, true));
35+
//services.AddWorkflow(x => x.UseSqlite(@"Data Source=database2.db;", true));
36+
//services.AddWorkflow(x => x.UsePostgreSQL(@"Server=127.0.0.1;Port=32768;Database=workflow_test002;User Id=postgres;", true, true));
37+
38+
39+
var serviceProvider = services.BuildServiceProvider();
40+
41+
return serviceProvider;
42+
}
43+
44+
}
45+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Foreach Sync sample
2+
3+
Illustrates how to implement a synchronous foreach within your workflow.
4+
5+
6+
```c#
7+
builder
8+
.StartWith<SayHello>()
9+
.ForEach(data => new List<int>() { 1, 2, 3, 4 }, data => false)
10+
.Do(x => x
11+
.StartWith<DisplayContext>()
12+
.Input(step => step.Item, (data, context) => context.Item)
13+
.Then<DoSomething>())
14+
.Then<SayGoodbye>();
15+
```
16+
17+
or get the collection from workflow data.
18+
19+
```c#
20+
builder
21+
.StartWith<SayHello>()
22+
.ForEach(data => data.MyCollection, data => false)
23+
.Do(x => x
24+
.StartWith<DisplayContext>()
25+
.Input(step => step.Item, (data, context) => context.Item)
26+
.Then<DoSomething>())
27+
.Then<SayGoodbye>();
28+
29+
```
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using WorkflowCore.Interface;
5+
using WorkflowCore.Models;
6+
7+
namespace WorkflowCore.Sample09s
8+
{
9+
public class DisplayContext : StepBody
10+
{
11+
12+
public object Item { get; set; }
13+
14+
public override ExecutionResult Run(IStepExecutionContext context)
15+
{
16+
Console.WriteLine($"Working on item {Item}");
17+
return ExecutionResult.Next();
18+
}
19+
}
20+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using WorkflowCore.Interface;
5+
using WorkflowCore.Models;
6+
7+
namespace WorkflowCore.Sample09s
8+
{
9+
public class DoSomething : StepBody
10+
{
11+
public override ExecutionResult Run(IStepExecutionContext context)
12+
{
13+
Console.WriteLine("Doing something...");
14+
return ExecutionResult.Next();
15+
}
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using WorkflowCore.Interface;
5+
using WorkflowCore.Models;
6+
7+
namespace WorkflowCore.Sample09s
8+
{
9+
public class SayGoodbye : StepBody
10+
{
11+
public override ExecutionResult Run(IStepExecutionContext context)
12+
{
13+
Console.WriteLine("Goodbye");
14+
return ExecutionResult.Next();
15+
}
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using WorkflowCore.Interface;
5+
using WorkflowCore.Models;
6+
7+
namespace WorkflowCore.Sample09s
8+
{
9+
public class SayHello : StepBody
10+
{
11+
public override ExecutionResult Run(IStepExecutionContext context)
12+
{
13+
Console.WriteLine("Hello");
14+
return ExecutionResult.Next();
15+
}
16+
}
17+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>netcoreapp3.0</TargetFramework>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="3.1.0" />
10+
</ItemGroup>
11+
12+
<ItemGroup>
13+
<ProjectReference Include="..\..\providers\WorkflowCore.Persistence.MongoDB\WorkflowCore.Persistence.MongoDB.csproj" />
14+
<ProjectReference Include="..\..\providers\WorkflowCore.Persistence.PostgreSQL\WorkflowCore.Persistence.PostgreSQL.csproj" />
15+
<ProjectReference Include="..\..\providers\WorkflowCore.Persistence.SqlServer\WorkflowCore.Persistence.SqlServer.csproj" />
16+
<ProjectReference Include="..\..\WorkflowCore\WorkflowCore.csproj" />
17+
</ItemGroup>
18+
19+
</Project>

0 commit comments

Comments
 (0)