3
3
#include <dirent.h>
4
4
#include <fcntl.h>
5
5
#include <lzma.h>
6
+ #include <pthread.h>
7
+ #include <sys/socket.h>
6
8
#include <sys/stat.h>
7
9
#include <unistd.h>
8
10
11
+ #include "affinity-count.h"
9
12
#include "castore.h"
10
13
#include "def.h"
11
14
#include "dirent-util.h"
19
22
/* #undef EBADMSG */
20
23
/* #define EBADMSG __LINE__ */
21
24
25
+ #define WORKER_THREADS_MAX 64U
26
+
22
27
struct CaStore {
23
28
char * root ;
24
29
bool is_cache :1 ;
@@ -34,6 +39,10 @@ struct CaStore {
34
39
35
40
uint64_t n_requests ;
36
41
uint64_t n_request_bytes ;
42
+
43
+ pthread_t worker_threads [WORKER_THREADS_MAX ];
44
+ size_t n_worker_threads , n_worker_threads_max ;
45
+ int worker_thread_socket [2 ];
37
46
};
38
47
39
48
struct CaStoreIterator {
@@ -47,28 +56,36 @@ struct CaStoreIterator {
47
56
CaStore * ca_store_new (void ) {
48
57
CaStore * store ;
49
58
50
- store = new0 (CaStore , 1 );
59
+ store = new (CaStore , 1 );
51
60
if (!store )
52
61
return NULL ;
53
62
54
- store -> digest_type = _CA_DIGEST_TYPE_INVALID ;
55
-
56
- store -> compression = CA_CHUNK_COMPRESSED ;
57
- store -> compression_type = CA_COMPRESSION_DEFAULT ;
63
+ * store = (CaStore ) {
64
+ .digest_type = _CA_DIGEST_TYPE_INVALID ,
65
+ .compression = CA_CHUNK_COMPRESSED ,
66
+ .compression_type = CA_COMPRESSION_DEFAULT ,
67
+ .worker_thread_socket = { -1 , -1 },
68
+ .n_worker_threads_max = (size_t ) -1 ,
69
+ };
58
70
59
71
return store ;
60
72
}
61
73
62
74
CaStore * ca_store_new_cache (void ) {
63
75
CaStore * s ;
64
76
65
- s = new0 (CaStore , 1 );
77
+ s = new (CaStore , 1 );
66
78
if (!s )
67
79
return NULL ;
68
80
69
- s -> is_cache = true;
70
- s -> compression = CA_CHUNK_AS_IS ;
71
- s -> compression_type = CA_COMPRESSION_DEFAULT ;
81
+ * s = (CaStore ) {
82
+ .is_cache = true,
83
+ .compression = CA_CHUNK_AS_IS ,
84
+ .compression_type = CA_COMPRESSION_DEFAULT ,
85
+
86
+ .worker_thread_socket = { -1 , -1 },
87
+ .n_worker_threads_max = (size_t ) -1 ,
88
+ };
72
89
73
90
return s ;
74
91
}
@@ -77,6 +94,8 @@ CaStore* ca_store_unref(CaStore *store) {
77
94
if (!store )
78
95
return NULL ;
79
96
97
+ (void ) ca_store_finalize (store );
98
+
80
99
if (store -> is_cache && store -> root )
81
100
(void ) rm_rf (store -> root , REMOVE_ROOT |REMOVE_PHYSICAL );
82
101
@@ -240,6 +259,203 @@ int ca_store_has(CaStore *store, const CaChunkID *chunk_id) {
240
259
return ca_chunk_file_test (AT_FDCWD , store -> root , chunk_id );
241
260
}
242
261
262
+ struct queue_entry {
263
+ CaChunkID chunk_id ;
264
+ CaChunkCompression effective_compression ;
265
+ void * data ;
266
+ size_t size ;
267
+ };
268
+
269
+ static void * worker_thread (void * p ) {
270
+ CaStore * store = p ;
271
+ int ret = 0 , r ;
272
+
273
+ assert (store );
274
+ assert (store -> worker_thread_socket [1 ] >= 0 );
275
+
276
+ (void ) pthread_setname_np (pthread_self (), "worker-thread" );
277
+
278
+ for (;;) {
279
+ struct queue_entry e ;
280
+ ssize_t n ;
281
+
282
+ n = recv (store -> worker_thread_socket [0 ], & e , sizeof (e ), 0 );
283
+ if (n < 0 ) {
284
+ if (errno == EINTR )
285
+ continue ;
286
+
287
+ log_debug_errno (errno , "Failed to read from thread pool socket: %m" );
288
+ return INT_TO_PTR (errno );
289
+ }
290
+ if (n == 0 ) /* Either EOF or zero-sized datagram (Linux doesn't really allow us to
291
+ * distinguish that), we take both as an indication to exit the worker thread. */
292
+ break ;
293
+
294
+ assert (n == sizeof (e ));
295
+
296
+ r = ca_chunk_file_save (
297
+ AT_FDCWD , store -> root ,
298
+ & e .chunk_id ,
299
+ e .effective_compression , store -> compression ,
300
+ store -> compression_type ,
301
+ e .data , e .size );
302
+ free (e .data );
303
+
304
+ if (r < 0 ) {
305
+ log_debug_errno (r , "Failed to store chunk in store: %m" );
306
+
307
+ if (r != - EEXIST )
308
+ ret = r ;
309
+ }
310
+ }
311
+
312
+ return INT_TO_PTR (ret );
313
+ }
314
+
315
+ static int determine_worker_threads_max (CaStore * store ) {
316
+ const char * e ;
317
+ int r ;
318
+
319
+ assert (store );
320
+
321
+ if (store -> n_worker_threads_max != (size_t ) -1 )
322
+ return 0 ;
323
+
324
+ e = getenv ("CASYNC_WORKER_THREADS" );
325
+ if (e ) {
326
+ unsigned u ;
327
+
328
+ r = safe_atou (e , & u );
329
+ if (r < 0 )
330
+ log_debug_errno (r , "Failed to parse $CASYNC_WORKER_THREADS, ignoring: %s" , e );
331
+ else if (u > WORKER_THREADS_MAX ) {
332
+ log_debug ("$CASYNC_WORKER_THREADS out of range, clamping to %zu: %s" , (size_t ) WORKER_THREADS_MAX , e );
333
+ store -> n_worker_threads_max = WORKER_THREADS_MAX ;
334
+ } else {
335
+ store -> n_worker_threads_max = u ;
336
+ return 0 ;
337
+ }
338
+ }
339
+
340
+ r = cpus_in_affinity_mask ();
341
+ if (r < 0 )
342
+ return log_debug_errno (r , "Failed to determine CPUs in affinity mask: %m" );
343
+
344
+ store -> n_worker_threads_max = MIN ((size_t ) r , WORKER_THREADS_MAX );
345
+ return 0 ;
346
+ }
347
+
348
+ static int start_worker_thread (CaStore * store ) {
349
+ int r ;
350
+
351
+ assert (store );
352
+
353
+ r = determine_worker_threads_max (store );
354
+ if (r < 0 )
355
+ return r ;
356
+
357
+ if (store -> n_worker_threads >= (size_t ) store -> n_worker_threads_max )
358
+ return 0 ;
359
+
360
+ if (store -> worker_thread_socket [0 ] < 0 )
361
+ if (socketpair (AF_UNIX , SOCK_SEQPACKET |SOCK_CLOEXEC , 0 , store -> worker_thread_socket ) < 0 )
362
+ return - errno ;
363
+
364
+ r = pthread_create (store -> worker_threads + store -> n_worker_threads , NULL , worker_thread , store );
365
+ if (r != 0 )
366
+ return - r ;
367
+
368
+ store -> n_worker_threads ++ ;
369
+
370
+ log_debug ("Started store worker thread %zu." , store -> n_worker_threads );
371
+ return 0 ;
372
+ }
373
+
374
+ static int submit_to_worker_thread (
375
+ CaStore * store ,
376
+ const CaChunkID * chunkid ,
377
+ CaChunkCompression effective_compression ,
378
+ const void * p ,
379
+ uint64_t l ) {
380
+
381
+ struct queue_entry e ;
382
+ void * copy = NULL ;
383
+ ssize_t n ;
384
+ int r ;
385
+
386
+ assert (store );
387
+
388
+ /* If there's no need to compress/decompress, then let's do things client side, since the operation
389
+ * is likely IO bound, not CPU bound */
390
+ if (store -> compression == CA_CHUNK_AS_IS ||
391
+ store -> compression == effective_compression )
392
+ return - ENOANO ;
393
+
394
+ /* Before we submit the chunk for compression, let's see if it exists already. If so, let's return
395
+ * -EEXIST right away, so that the caller can count reused chunks. Note that this is a bit racy
396
+ * currently, as submitted but not yet processed chunks are not considered. */
397
+ r = ca_store_has (store , chunkid );
398
+ if (r < 0 )
399
+ return r ;
400
+ if (r > 0 )
401
+ return - EEXIST ;
402
+
403
+ /* Let's start a new worker thread each time we have a new job to process, until we reached all
404
+ * worker threads we need */
405
+ (void ) start_worker_thread (store );
406
+
407
+ /* If there are no worker threads, do things client side */
408
+ if (store -> n_worker_threads <= 0 ||
409
+ store -> worker_thread_socket [1 ] < 0 )
410
+ return - ENETDOWN ;
411
+
412
+ copy = memdup (p , l );
413
+ if (!copy )
414
+ return - ENOMEM ;
415
+
416
+ e = (struct queue_entry ) {
417
+ .chunk_id = * chunkid ,
418
+ .effective_compression = effective_compression ,
419
+ .data = copy ,
420
+ .size = l ,
421
+ };
422
+
423
+ n = send (store -> worker_thread_socket [1 ], & e , sizeof (e ), 0 );
424
+ if (n < 0 ) {
425
+ free (copy );
426
+ return - errno ;
427
+ }
428
+
429
+ assert (n == sizeof (e ));
430
+ return 0 ;
431
+ }
432
+
433
+ int ca_store_finalize (CaStore * store ) {
434
+ int ret = 0 , r ;
435
+ size_t i ;
436
+
437
+ assert (store );
438
+
439
+ /* Trigger EOF in all worker threads */
440
+ store -> worker_thread_socket [1 ] = safe_close (store -> worker_thread_socket [1 ]);
441
+
442
+ for (i = 0 ; i < store -> n_worker_threads ; i ++ ) {
443
+ void * p ;
444
+ r = pthread_join (store -> worker_threads [i ], & p );
445
+ if (r != 0 )
446
+ ret = - r ;
447
+ if (p != NULL )
448
+ ret = - PTR_TO_INT (p );
449
+ }
450
+
451
+ store -> n_worker_threads = 0 ;
452
+ store -> worker_thread_socket [0 ] = safe_close (store -> worker_thread_socket [0 ]);
453
+
454
+ /* Propagate errors we ran into while processing store requests. This is useful for callers to
455
+ * determine whether the worker threads ran into any problems. */
456
+ return ret ;
457
+ }
458
+
243
459
int ca_store_put (
244
460
CaStore * store ,
245
461
const CaChunkID * chunk_id ,
@@ -273,6 +489,14 @@ int ca_store_put(
273
489
store -> mkdir_done = true;
274
490
}
275
491
492
+ r = submit_to_worker_thread (
493
+ store ,
494
+ chunk_id ,
495
+ effective_compression ,
496
+ data , size );
497
+ if (r >= 0 )
498
+ return 0 ;
499
+
276
500
return ca_chunk_file_save (
277
501
AT_FDCWD , store -> root ,
278
502
chunk_id ,
0 commit comments