Skip to content

Commit df27a3c

Browse files
authored
Merge pull request danielgerlag#868 from VKAlwaysWin/VKAlwaysWin/AzureWorkflowPurger
Adding Azure Cosmos DB Workflow Purger
2 parents f47ad73 + 5d890c0 commit df27a3c

File tree

6 files changed

+60
-12
lines changed

6 files changed

+60
-12
lines changed
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
using System;
2+
using System.Threading;
23
using System.Threading.Tasks;
34
using WorkflowCore.Models;
45

56
namespace WorkflowCore.Interface
67
{
78
public interface IWorkflowPurger
89
{
9-
Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan);
10+
Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan, CancellationToken cancellationToken = default);
1011
}
1112
}

src/providers/WorkflowCore.Persistence.EntityFramework/Services/WorkflowPurger.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Linq;
3+
using System.Threading;
34
using System.Threading.Tasks;
45
using Microsoft.EntityFrameworkCore;
56
using WorkflowCore.Interface;
@@ -18,12 +19,12 @@ public WorkflowPurger(IWorkflowDbContextFactory contextFactory)
1819
_contextFactory = contextFactory;
1920
}
2021

21-
public async Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan)
22+
public async Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan, CancellationToken cancellationToken = default)
2223
{
2324
var olderThanUtc = olderThan.ToUniversalTime();
2425
using (var db = ConstructDbContext())
2526
{
26-
var workflows = await db.Set<PersistedWorkflow>().Where(x => x.Status == status && x.CompleteTime < olderThanUtc).ToListAsync();
27+
var workflows = await db.Set<PersistedWorkflow>().Where(x => x.Status == status && x.CompleteTime < olderThanUtc).ToListAsync(cancellationToken);
2728
foreach (var wf in workflows)
2829
{
2930
foreach (var pointer in wf.ExecutionPointers)
@@ -38,7 +39,7 @@ public async Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan)
3839
db.Remove(wf);
3940
}
4041

41-
await db.SaveChangesAsync();
42+
await db.SaveChangesAsync(cancellationToken);
4243
}
4344
}
4445

src/providers/WorkflowCore.Persistence.MongoDB/Services/WorkflowPurger.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,26 @@
33
using MongoDB.Driver;
44
using WorkflowCore.Models;
55
using WorkflowCore.Interface;
6+
using System.Threading;
67

78
namespace WorkflowCore.Persistence.MongoDB.Services
89
{
910
public class WorkflowPurger : IWorkflowPurger
1011
{
1112
private readonly IMongoDatabase _database;
12-
private IMongoCollection<WorkflowInstance> WorkflowInstances => _database.GetCollection<WorkflowInstance>(MongoPersistenceProvider.WorkflowCollectionName);
1313

14+
private IMongoCollection<WorkflowInstance> WorkflowInstances => _database.GetCollection<WorkflowInstance>(MongoPersistenceProvider.WorkflowCollectionName);
1415

1516
public WorkflowPurger(IMongoDatabase database)
1617
{
1718
_database = database;
1819
}
1920

20-
public async Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan)
21+
public async Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan, CancellationToken cancellationToken = default)
2122
{
2223
var olderThanUtc = olderThan.ToUniversalTime();
23-
await WorkflowInstances.DeleteManyAsync(x => x.Status == status && x.CompleteTime < olderThanUtc);
24+
await WorkflowInstances.DeleteManyAsync(x => x.Status == status
25+
&& x.CompleteTime < olderThanUtc, cancellationToken);
2426
}
2527
}
2628
}

src/providers/WorkflowCore.Persistence.RavenDB/Services/WorkflowPurger.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
using WorkflowCore.Models;
44
using System.Threading.Tasks;
55
using Raven.Client.Documents;
6-
using System.Linq;
76
using Raven.Client.Documents.Operations;
87
using Raven.Client.Documents.Queries;
8+
using System.Threading;
99

1010
namespace WorkflowCore.Persistence.RavenDB.Services
1111
{
@@ -18,21 +18,21 @@ public WorkflowPurger(IDocumentStore database)
1818
_database = database;
1919
}
2020

21-
public async Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan)
21+
public async Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan, CancellationToken cancellationToken = default)
2222
{
23-
await DeleteWorkflowInstances(status, olderThan);
23+
await DeleteWorkflowInstances(status, olderThan, cancellationToken);
2424
}
2525

2626

2727
/// <summary>
2828
/// Delete all Workflow Documents
2929
/// </summary>
3030
/// <returns></returns>
31-
private Task<Operation> DeleteWorkflowInstances(WorkflowStatus status, DateTime olderThan)
31+
private Task<Operation> DeleteWorkflowInstances(WorkflowStatus status, DateTime olderThan, CancellationToken cancellationToken = default)
3232
{
3333
var utcTime = olderThan.ToUniversalTime();
3434
var queryToDelete = new IndexQuery { Query = $"FROM {nameof(WorkflowInstance)} where status = '{status}' and CompleteTime < '{olderThan}'" };
35-
return _database.Operations.SendAsync(new DeleteByQueryOperation(queryToDelete, new QueryOperationOptions { AllowStale = false }));
35+
return _database.Operations.SendAsync(new DeleteByQueryOperation(queryToDelete, new QueryOperationOptions { AllowStale = false }), token: cancellationToken);
3636
}
3737
}
3838
}

src/providers/WorkflowCore.Providers.Azure/ServiceCollectionExtensions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using Microsoft.Extensions.Logging;
2+
using WorkflowCore.Interface;
23
using WorkflowCore.Models;
34
using WorkflowCore.Providers.Azure.Interface;
45
using WorkflowCore.Providers.Azure.Services;
@@ -39,6 +40,7 @@ public static WorkflowOptions UseCosmosDbPersistence(
3940

4041
options.Services.AddSingleton<ICosmosClientFactory>(sp => new CosmosClientFactory(connectionString));
4142
options.Services.AddTransient<ICosmosDbProvisioner>(sp => new CosmosDbProvisioner(sp.GetService<ICosmosClientFactory>(), cosmosDbStorageOptions));
43+
options.Services.AddSingleton<IWorkflowPurger>(sp => new WorkflowPurger(sp.GetService<ICosmosClientFactory>(), databaseId, cosmosDbStorageOptions));
4244
options.UsePersistence(sp => new CosmosDbPersistenceProvider(sp.GetService<ICosmosClientFactory>(), databaseId, sp.GetService<ICosmosDbProvisioner>(), cosmosDbStorageOptions));
4345
return options;
4446
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Microsoft.Azure.Cosmos;
6+
using Microsoft.Azure.Cosmos.Linq;
7+
using WorkflowCore.Interface;
8+
using WorkflowCore.Models;
9+
using WorkflowCore.Providers.Azure.Interface;
10+
using WorkflowCore.Providers.Azure.Models;
11+
12+
namespace WorkflowCore.Providers.Azure.Services
13+
{
14+
public class WorkflowPurger : IWorkflowPurger
15+
{
16+
private readonly Lazy<Container> _workflowContainer;
17+
18+
public WorkflowPurger(ICosmosClientFactory clientFactory, string dbId, CosmosDbStorageOptions cosmosDbStorageOptions)
19+
{
20+
_workflowContainer = new Lazy<Container>(() => clientFactory.GetCosmosClient()
21+
.GetDatabase(dbId)
22+
.GetContainer(cosmosDbStorageOptions.WorkflowContainerName));
23+
}
24+
25+
public async Task PurgeWorkflows(WorkflowStatus status, DateTime olderThan, CancellationToken cancellationToken = default)
26+
{
27+
var olderThanUtc = olderThan.ToUniversalTime();
28+
using (FeedIterator<PersistedWorkflow> feedIterator = _workflowContainer.Value.GetItemLinqQueryable<PersistedWorkflow>()
29+
.Where(x => x.Status == status && x.CompleteTime < olderThanUtc)
30+
.ToFeedIterator())
31+
{
32+
while (feedIterator.HasMoreResults)
33+
{
34+
foreach (var item in await feedIterator.ReadNextAsync(cancellationToken))
35+
{
36+
await _workflowContainer.Value.DeleteItemAsync<PersistedWorkflow>(item.id, new PartitionKey(item.id), cancellationToken: cancellationToken);
37+
}
38+
}
39+
}
40+
}
41+
}
42+
}

0 commit comments

Comments
 (0)