Skip to content

Commit 8a6d1ff

Browse files
authored
Search Index API & Elasticsearch support (danielgerlag#243)
1 parent d46399a commit 8a6d1ff

31 files changed

+1092
-96
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ There are several persistence providers available as separate Nuget packages.
120120
* [Sqlite](src/providers/WorkflowCore.Persistence.Sqlite)
121121
* Redis *(coming soon...)*
122122

123+
## Search
124+
125+
A search index provider can be plugged in to Workflow Core, enabling you to index your workflows and search against the data and state of them.
126+
These are also available as separate Nuget packages.
127+
* [Elasticsearch](src/providers/WorkflowCore.Providers.Elasticsearch)
128+
123129
## Extensions
124130

125131
* [User (human) workflows](src/extensions/WorkflowCore.Users)

WorkflowCore.sln

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Tests.DynamoDB
126126
EndProject
127127
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Providers.Redis", "src\providers\WorkflowCore.Providers.Redis\WorkflowCore.Providers.Redis.csproj", "{435C6263-C6F8-4E93-B417-D861E9C22E18}"
128128
EndProject
129+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Providers.Elasticsearch", "src\providers\WorkflowCore.Providers.Elasticsearch\WorkflowCore.Providers.Elasticsearch.csproj", "{F6348170-B695-4D97-BAE6-4F0F643F3BEF}"
130+
EndProject
131+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Tests.Elasticsearch", "test\WorkflowCore.Tests.Elasticsearch\WorkflowCore.Tests.Elasticsearch.csproj", "{44644716-0CE8-4837-B189-AB65AE2106AA}"
132+
EndProject
129133
Global
130134
GlobalSection(SolutionConfigurationPlatforms) = preSolution
131135
Debug|Any CPU = Debug|Any CPU
@@ -316,6 +320,14 @@ Global
316320
{435C6263-C6F8-4E93-B417-D861E9C22E18}.Debug|Any CPU.Build.0 = Debug|Any CPU
317321
{435C6263-C6F8-4E93-B417-D861E9C22E18}.Release|Any CPU.ActiveCfg = Release|Any CPU
318322
{435C6263-C6F8-4E93-B417-D861E9C22E18}.Release|Any CPU.Build.0 = Release|Any CPU
323+
{F6348170-B695-4D97-BAE6-4F0F643F3BEF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
324+
{F6348170-B695-4D97-BAE6-4F0F643F3BEF}.Debug|Any CPU.Build.0 = Debug|Any CPU
325+
{F6348170-B695-4D97-BAE6-4F0F643F3BEF}.Release|Any CPU.ActiveCfg = Release|Any CPU
326+
{F6348170-B695-4D97-BAE6-4F0F643F3BEF}.Release|Any CPU.Build.0 = Release|Any CPU
327+
{44644716-0CE8-4837-B189-AB65AE2106AA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
328+
{44644716-0CE8-4837-B189-AB65AE2106AA}.Debug|Any CPU.Build.0 = Debug|Any CPU
329+
{44644716-0CE8-4837-B189-AB65AE2106AA}.Release|Any CPU.ActiveCfg = Release|Any CPU
330+
{44644716-0CE8-4837-B189-AB65AE2106AA}.Release|Any CPU.Build.0 = Release|Any CPU
319331
EndGlobalSection
320332
GlobalSection(SolutionProperties) = preSolution
321333
HideSolutionNode = FALSE
@@ -370,6 +382,8 @@ Global
370382
{DF7F7ECA-1771-40C9-9CD0-AFEFC44E60DE} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
371383
{3ECEC028-7E2C-4983-B928-26C073B51BB7} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
372384
{435C6263-C6F8-4E93-B417-D861E9C22E18} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
385+
{F6348170-B695-4D97-BAE6-4F0F643F3BEF} = {2EEE6ABD-EE9B-473F-AF2D-6DABB85D7BA2}
386+
{44644716-0CE8-4837-B189-AB65AE2106AA} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
373387
EndGlobalSection
374388
GlobalSection(ExtensibilityGlobals) = postSolution
375389
SolutionGuid = {DC0FA8D3-6449-4FDA-BB46-ECF58FAD23B4}

src/WorkflowCore/Interface/IPersistenceProvider.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public interface IPersistenceProvider
1717

1818
Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt);
1919

20+
[Obsolete]
2021
Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take);
2122

2223
Task<WorkflowInstance> GetWorkflowInstance(string Id);
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Threading.Tasks;
5+
using WorkflowCore.Models;
6+
using WorkflowCore.Models.Search;
7+
8+
namespace WorkflowCore.Interface
9+
{
10+
public interface ISearchIndex
11+
{
12+
Task IndexWorkflow(WorkflowInstance workflow);
13+
14+
Task<Page<WorkflowSearchResult>> Search(string terms, int skip, int take, params SearchFilter[] filters);
15+
16+
Task Start();
17+
18+
Task Stop();
19+
}
20+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace WorkflowCore.Interface
6+
{
7+
public interface ISearchable
8+
{
9+
IEnumerable<string> GetSearchTokens();
10+
}
11+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace WorkflowCore.Models.Search
6+
{
7+
public class Page<T>
8+
{
9+
public ICollection<T> Data { get; set; }
10+
public long Total { get; set; }
11+
}
12+
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq.Expressions;
4+
using System.Text;
5+
6+
namespace WorkflowCore.Models.Search
7+
{
8+
public abstract class SearchFilter
9+
{
10+
public bool IsData { get; set; }
11+
public Type DataType { get; set; }
12+
public Expression Property { get; set; }
13+
14+
static Func<T, V> Lambda<T, V>(Func<T, V> del)
15+
{
16+
return del;
17+
}
18+
}
19+
20+
public class ScalarFilter : SearchFilter
21+
{
22+
public object Value { get; set; }
23+
24+
public static SearchFilter Equals(Expression<Func<WorkflowSearchResult, object>> property, object value) => new ScalarFilter()
25+
{
26+
Property = property,
27+
Value = value
28+
};
29+
30+
public static SearchFilter Equals<T>(Expression<Func<T, object>> property, object value) => new ScalarFilter()
31+
{
32+
IsData = true,
33+
DataType = typeof(T),
34+
Property = property,
35+
Value = value
36+
};
37+
}
38+
39+
public class DateRangeFilter : SearchFilter
40+
{
41+
public DateTime? BeforeValue { get; set; }
42+
public DateTime? AfterValue { get; set; }
43+
44+
public static DateRangeFilter Before(Expression<Func<WorkflowSearchResult, object>> property, DateTime value) => new DateRangeFilter()
45+
{
46+
Property = property,
47+
BeforeValue = value
48+
};
49+
50+
public static DateRangeFilter After(Expression<Func<WorkflowSearchResult, object>> property, DateTime value) => new DateRangeFilter()
51+
{
52+
Property = property,
53+
AfterValue = value
54+
};
55+
56+
public static DateRangeFilter Between(Expression<Func<WorkflowSearchResult, object>> property, DateTime start, DateTime end) => new DateRangeFilter()
57+
{
58+
Property = property,
59+
BeforeValue = end,
60+
AfterValue = start
61+
};
62+
63+
public static DateRangeFilter Before<T>(Expression<Func<T, object>> property, DateTime value) => new DateRangeFilter()
64+
{
65+
IsData = true,
66+
DataType = typeof(T),
67+
Property = property,
68+
BeforeValue = value
69+
};
70+
71+
public static DateRangeFilter After<T>(Expression<Func<T, object>> property, DateTime value) => new DateRangeFilter()
72+
{
73+
IsData = true,
74+
DataType = typeof(T),
75+
Property = property,
76+
AfterValue = value
77+
};
78+
79+
public static DateRangeFilter Between<T>(Expression<Func<T, object>> property, DateTime start, DateTime end) => new DateRangeFilter()
80+
{
81+
IsData = true,
82+
DataType = typeof(T),
83+
Property = property,
84+
BeforeValue = end,
85+
AfterValue = start
86+
};
87+
}
88+
89+
public class NumericRangeFilter : SearchFilter
90+
{
91+
public double? LessValue { get; set; }
92+
public double? GreaterValue { get; set; }
93+
94+
public static NumericRangeFilter LessThan(Expression<Func<WorkflowSearchResult, object>> property, double value) => new NumericRangeFilter()
95+
{
96+
Property = property,
97+
LessValue = value
98+
};
99+
100+
public static NumericRangeFilter GreaterThan(Expression<Func<WorkflowSearchResult, object>> property, double value) => new NumericRangeFilter()
101+
{
102+
Property = property,
103+
GreaterValue = value
104+
};
105+
106+
public static NumericRangeFilter Between(Expression<Func<WorkflowSearchResult, object>> property, double start, double end) => new NumericRangeFilter()
107+
{
108+
Property = property,
109+
LessValue = end,
110+
GreaterValue = start
111+
};
112+
113+
public static NumericRangeFilter LessThan<T>(Expression<Func<T, object>> property, double value) => new NumericRangeFilter()
114+
{
115+
IsData = true,
116+
DataType = typeof(T),
117+
Property = property,
118+
LessValue = value
119+
};
120+
121+
public static NumericRangeFilter GreaterThan<T>(Expression<Func<T, object>> property, double value) => new NumericRangeFilter()
122+
{
123+
IsData = true,
124+
DataType = typeof(T),
125+
Property = property,
126+
GreaterValue = value
127+
};
128+
129+
public static NumericRangeFilter Between<T>(Expression<Func<T, object>> property, double start, double end) => new NumericRangeFilter()
130+
{
131+
IsData = true,
132+
DataType = typeof(T),
133+
Property = property,
134+
LessValue = end,
135+
GreaterValue = start
136+
};
137+
}
138+
139+
public class StatusFilter : ScalarFilter
140+
{
141+
protected StatusFilter()
142+
{
143+
Expression<Func<WorkflowSearchResult, object>> lambda = (WorkflowSearchResult x) => x.Status;
144+
Property = lambda;
145+
}
146+
147+
public static StatusFilter Equals(WorkflowStatus value) => new StatusFilter()
148+
{
149+
Value = value.ToString()
150+
};
151+
}
152+
153+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace WorkflowCore.Models.Search
6+
{
7+
public class StepInfo
8+
{
9+
public int StepId { get; set; }
10+
11+
public string Name { get; set; }
12+
}
13+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Reflection;
4+
using WorkflowCore.Interface;
5+
6+
namespace WorkflowCore.Models.Search
7+
{
8+
public class WorkflowSearchResult
9+
{
10+
public string Id { get; set; }
11+
12+
public string WorkflowDefinitionId { get; set; }
13+
14+
public int Version { get; set; }
15+
16+
public string Description { get; set; }
17+
18+
public string Reference { get; set; }
19+
20+
public DateTime? NextExecutionUtc { get; set; }
21+
22+
public WorkflowStatus Status { get; set; }
23+
24+
public object Data { get; set; }
25+
26+
public DateTime CreateTime { get; set; }
27+
28+
public DateTime? CompleteTime { get; set; }
29+
30+
public ICollection<StepInfo> WaitingSteps { get; set; } = new HashSet<StepInfo>();
31+
32+
public ICollection<StepInfo> SleepingSteps { get; set; } = new HashSet<StepInfo>();
33+
34+
public ICollection<StepInfo> FailedSteps { get; set; } = new HashSet<StepInfo>();
35+
36+
37+
}
38+
39+
public class WorkflowSearchResult<TData> : WorkflowSearchResult
40+
{
41+
public new TData Data { get; set; }
42+
}
43+
44+
}

src/WorkflowCore/Models/WorkflowOptions.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ public class WorkflowOptions
1212
internal Func<IServiceProvider, IQueueProvider> QueueFactory;
1313
internal Func<IServiceProvider, IDistributedLockProvider> LockFactory;
1414
internal Func<IServiceProvider, ILifeCycleEventHub> EventHubFactory;
15+
internal Func<IServiceProvider, ISearchIndex> SearchIndexFactory;
1516
internal TimeSpan PollInterval;
1617
internal TimeSpan IdleTime;
1718
internal TimeSpan ErrorRetryInterval;
@@ -28,6 +29,7 @@ public WorkflowOptions(IServiceCollection services)
2829
QueueFactory = new Func<IServiceProvider, IQueueProvider>(sp => new SingleNodeQueueProvider());
2930
LockFactory = new Func<IServiceProvider, IDistributedLockProvider>(sp => new SingleNodeLockProvider());
3031
PersistanceFactory = new Func<IServiceProvider, IPersistenceProvider>(sp => new MemoryPersistenceProvider());
32+
SearchIndexFactory = new Func<IServiceProvider, ISearchIndex>(sp => new NullSearchIndex());
3133
EventHubFactory = new Func<IServiceProvider, ILifeCycleEventHub>(sp => new SingleNodeEventHub(sp.GetService<ILoggerFactory>()));
3234
}
3335

@@ -51,6 +53,11 @@ public void UseEventHub(Func<IServiceProvider, ILifeCycleEventHub> factory)
5153
EventHubFactory = factory;
5254
}
5355

56+
public void UseSearchIndex(Func<IServiceProvider, ISearchIndex> factory)
57+
{
58+
SearchIndexFactory = factory;
59+
}
60+
5461
public void UsePollInterval(TimeSpan interval)
5562
{
5663
PollInterval = interval;

src/WorkflowCore/ServiceCollectionExtensions.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A
2828
services.AddSingleton<IQueueProvider>(options.QueueFactory);
2929
services.AddSingleton<IDistributedLockProvider>(options.LockFactory);
3030
services.AddSingleton<ILifeCycleEventHub>(options.EventHubFactory);
31+
services.AddSingleton<ISearchIndex>(options.SearchIndexFactory);
32+
3133
services.AddSingleton<IWorkflowRegistry, WorkflowRegistry>();
3234
services.AddSingleton<WorkflowOptions>(options);
33-
services.AddSingleton<ILifeCycleEventPublisher, LifeCycleEventPublisher>();
35+
services.AddSingleton<ILifeCycleEventPublisher, LifeCycleEventPublisher>();
3436

3537
services.AddTransient<IBackgroundTask, WorkflowConsumer>();
3638
services.AddTransient<IBackgroundTask, EventConsumer>();

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,19 @@ internal class WorkflowConsumer : QueueConsumer, IBackgroundTask
1212
{
1313
private readonly IDistributedLockProvider _lockProvider;
1414
private readonly IDateTimeProvider _datetimeProvider;
15+
private readonly ISearchIndex _searchIndex;
1516
private readonly ObjectPool<IPersistenceProvider> _persistenceStorePool;
1617
private readonly ObjectPool<IWorkflowExecutor> _executorPool;
1718

1819
protected override QueueType Queue => QueueType.Workflow;
1920

20-
public WorkflowConsumer(IPooledObjectPolicy<IPersistenceProvider> persistencePoolPolicy, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, IPooledObjectPolicy<IWorkflowExecutor> executorPoolPolicy, IDateTimeProvider datetimeProvider, WorkflowOptions options)
21+
public WorkflowConsumer(IPooledObjectPolicy<IPersistenceProvider> persistencePoolPolicy, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, IPooledObjectPolicy<IWorkflowExecutor> executorPoolPolicy, IDateTimeProvider datetimeProvider, ISearchIndex searchIndex, WorkflowOptions options)
2122
: base(queueProvider, loggerFactory, options)
2223
{
2324
_persistenceStorePool = new DefaultObjectPool<IPersistenceProvider>(persistencePoolPolicy);
2425
_executorPool = new DefaultObjectPool<IWorkflowExecutor>(executorPoolPolicy);
2526
_lockProvider = lockProvider;
27+
_searchIndex = searchIndex;
2628
_datetimeProvider = datetimeProvider;
2729
}
2830

@@ -50,6 +52,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5052
{
5153
_executorPool.Return(executor);
5254
await persistenceStore.PersistWorkflow(workflow);
55+
await _searchIndex.IndexWorkflow(workflow);
5356
}
5457
}
5558
}

0 commit comments

Comments
 (0)