1
+ using Amazon . DynamoDBv2 ;
2
+ using Amazon . DynamoDBv2 . Model ;
3
+ using Amazon . Runtime ;
4
+ using Microsoft . Extensions . Logging ;
5
+ using System ;
6
+ using System . Collections . Generic ;
7
+ using System . Threading ;
8
+ using System . Threading . Tasks ;
9
+ using WorkflowCore . Interface ;
10
+
11
+ namespace WorkflowCore . Providers . AWS . Services
12
+ {
13
+ public class DynamoLockProvider : IDistributedLockProvider
14
+ {
15
+ private readonly ILogger _logger ;
16
+ private readonly AmazonDynamoDBClient _client ;
17
+ private readonly string _tableName ;
18
+ private readonly string _nodeId ;
19
+ private readonly long _ttl = 30000 ;
20
+ private readonly int _heartbeat = 10000 ;
21
+ private readonly long _jitter = 1000 ;
22
+ private readonly List < string > _localLocks ;
23
+ private Task _heartbeatTask ;
24
+ private CancellationTokenSource _cancellationTokenSource ;
25
+
26
+ public DynamoLockProvider ( AWSCredentials credentials , AmazonDynamoDBConfig config , string tableName , ILoggerFactory logFactory )
27
+ {
28
+ _logger = logFactory . CreateLogger < DynamoLockProvider > ( ) ;
29
+ _client = new AmazonDynamoDBClient ( credentials , config ) ;
30
+ _localLocks = new List < string > ( ) ;
31
+ _tableName = tableName ;
32
+ _nodeId = Guid . NewGuid ( ) . ToString ( ) ;
33
+ }
34
+
35
+ public async Task < bool > AcquireLock ( string Id , CancellationToken cancellationToken )
36
+ {
37
+ try
38
+ {
39
+ var req = new PutItemRequest ( )
40
+ {
41
+ TableName = _tableName ,
42
+ Item = new Dictionary < string , AttributeValue >
43
+ {
44
+ { "id" , new AttributeValue ( Id ) } ,
45
+ { "lockOwner" , new AttributeValue ( _nodeId ) } ,
46
+ { "expires" , new AttributeValue ( )
47
+ {
48
+ N = Convert . ToString ( new DateTimeOffset ( DateTime . UtcNow ) . ToUnixTimeMilliseconds ( ) + _ttl )
49
+ }
50
+ }
51
+ } ,
52
+ ConditionExpression = "attribute_not_exists(id) OR (expires < :expired)" ,
53
+ ExpressionAttributeValues = new Dictionary < string , AttributeValue >
54
+ {
55
+ { ":expired" , new AttributeValue ( )
56
+ {
57
+ N = Convert . ToString ( new DateTimeOffset ( DateTime . UtcNow ) . ToUnixTimeMilliseconds ( ) + _jitter )
58
+ }
59
+ }
60
+ }
61
+ } ;
62
+
63
+ var response = await _client . PutItemAsync ( req , _cancellationTokenSource . Token ) ;
64
+
65
+ if ( response . HttpStatusCode == System . Net . HttpStatusCode . OK )
66
+ {
67
+ _localLocks . Add ( Id ) ;
68
+ return true ;
69
+ }
70
+ }
71
+ catch ( ConditionalCheckFailedException )
72
+ {
73
+ }
74
+ return false ;
75
+ }
76
+
77
+ public async Task ReleaseLock ( string Id )
78
+ {
79
+ _localLocks . Remove ( Id ) ;
80
+ try
81
+ {
82
+ var req = new DeleteItemRequest ( )
83
+ {
84
+ TableName = _tableName ,
85
+ Key = new Dictionary < string , AttributeValue >
86
+ {
87
+ { "id" , new AttributeValue ( Id ) }
88
+ } ,
89
+ ConditionExpression = "lockOwner = :nodeId" ,
90
+ ExpressionAttributeValues = new Dictionary < string , AttributeValue >
91
+ {
92
+ { ":nodeId" , new AttributeValue ( _nodeId ) }
93
+ }
94
+
95
+ } ;
96
+ await _client . DeleteItemAsync ( req ) ;
97
+ }
98
+ catch ( ConditionalCheckFailedException )
99
+ {
100
+ }
101
+ }
102
+
103
+ public async Task Start ( )
104
+ {
105
+ await EnsureTable ( ) ;
106
+ if ( _heartbeatTask != null )
107
+ {
108
+ throw new InvalidOperationException ( ) ;
109
+ }
110
+
111
+ _cancellationTokenSource = new CancellationTokenSource ( ) ;
112
+
113
+ _heartbeatTask = new Task ( SendHeartbeat ) ;
114
+ _heartbeatTask . Start ( ) ;
115
+ }
116
+
117
+ public Task Stop ( )
118
+ {
119
+ _cancellationTokenSource . Cancel ( ) ;
120
+ _heartbeatTask . Wait ( ) ;
121
+ _heartbeatTask = null ;
122
+ return Task . CompletedTask ;
123
+ }
124
+
125
+ private async void SendHeartbeat ( )
126
+ {
127
+ while ( ! _cancellationTokenSource . IsCancellationRequested )
128
+ {
129
+ try
130
+ {
131
+ await Task . Delay ( _heartbeat , _cancellationTokenSource . Token ) ;
132
+ foreach ( var item in _localLocks )
133
+ {
134
+ var req = new PutItemRequest
135
+ {
136
+ TableName = _tableName ,
137
+ Item = new Dictionary < string , AttributeValue >
138
+ {
139
+ { "id" , new AttributeValue ( item ) } ,
140
+ { "lockOwner" , new AttributeValue ( _nodeId ) } ,
141
+ { "expires" , new AttributeValue ( )
142
+ {
143
+ N = Convert . ToString ( new DateTimeOffset ( DateTime . UtcNow ) . ToUnixTimeMilliseconds ( ) + _ttl )
144
+ }
145
+ }
146
+ } ,
147
+ ConditionExpression = "lockOwner = :nodeId" ,
148
+ ExpressionAttributeValues = new Dictionary < string , AttributeValue >
149
+ {
150
+ { ":nodeId" , new AttributeValue ( _nodeId ) }
151
+ }
152
+ } ;
153
+
154
+ await _client . PutItemAsync ( req , _cancellationTokenSource . Token ) ;
155
+ }
156
+ }
157
+ catch ( Exception ex )
158
+ {
159
+ _logger . LogError ( default ( EventId ) , ex , ex . Message ) ;
160
+ }
161
+ }
162
+ }
163
+
164
+ private async Task EnsureTable ( )
165
+ {
166
+ try
167
+ {
168
+ var poll = await _client . DescribeTableAsync ( _tableName ) ;
169
+ }
170
+ catch ( ResourceNotFoundException )
171
+ {
172
+ await CreateTable ( ) ;
173
+ }
174
+ }
175
+
176
+ private async Task CreateTable ( )
177
+ {
178
+ var createRequest = new CreateTableRequest ( _tableName , new List < KeySchemaElement > ( )
179
+ {
180
+ new KeySchemaElement ( "id" , KeyType . HASH )
181
+ } )
182
+ {
183
+ AttributeDefinitions = new List < AttributeDefinition > ( )
184
+ {
185
+ new AttributeDefinition ( "id" , ScalarAttributeType . S )
186
+ } ,
187
+ BillingMode = BillingMode . PAY_PER_REQUEST
188
+ } ;
189
+
190
+ var createResponse = await _client . CreateTableAsync ( createRequest ) ;
191
+
192
+ int i = 0 ;
193
+ bool created = false ;
194
+ while ( ( i < 10 ) && ( ! created ) )
195
+ {
196
+ try
197
+ {
198
+ await Task . Delay ( 1000 ) ;
199
+ var poll = await _client . DescribeTableAsync ( _tableName ) ;
200
+ created = ( poll . Table . TableStatus == TableStatus . ACTIVE ) ;
201
+ i ++ ;
202
+ }
203
+ catch ( ResourceNotFoundException )
204
+ {
205
+ }
206
+ }
207
+ }
208
+ }
209
+ }
0 commit comments