18
18
import argparse
19
19
import fcntl
20
20
import tempfile
21
+ import threading , subprocess
21
22
from multiprocessing .connection import Listener , Client
22
23
23
24
# Multiprocess communication auth key
24
25
AUTHKEY = 'vkidSQkgAHc='
26
+ DIR_LOCK_FILE = os .sep + '.dirlock'
25
27
26
-
27
- def acquire_lock (lock_file , sock_file , block ):
28
+ def acquire_lock (lock_file , sock_file , block , heartbeat ):
28
29
"""
29
30
Acquire a lock on the passed file, block if needed
30
31
:param lock_file:
31
32
:param sock_file:
32
33
:param block:
33
34
:return:
34
35
"""
35
- print ('[%s]: Acquiring lock on %s' % (time .strftime ('%Y:%m:%d %H:%M:%S' ), lock_file ))
36
- lock_handle = open (lock_file , 'w' )
36
+
37
+ # get dir lock first to check lock file existence
38
+ with open (os .path .dirname (lock_file ) + DIR_LOCK_FILE , 'w' ) as dir_lh :
39
+ fcntl .flock (dir_lh , fcntl .LOCK_EX )
40
+ if not os .path .exists (lock_file ):
41
+ print ('[%s]: Creating %s' % (time .strftime ('%Y:%m:%d %H:%M:%S' ), os .path .basename (lock_file )))
42
+ open (lock_file , 'w' ).close ()
43
+
44
+ lock_handle = open (lock_file )
45
+ print ('[%s]: Acquiring lock %s with heartbeat %s secs' %
46
+ (time .strftime ('%Y:%m:%d %H:%M:%S' ), os .path .basename (lock_file ), heartbeat ))
37
47
while True :
38
48
try :
39
49
fcntl .flock (lock_handle , fcntl .LOCK_EX | fcntl .LOCK_NB )
40
- print ('[%s]: Lock acquired on %s' % (time .strftime ('%Y:%m:%d %H:%M:%S' ), lock_file ))
50
+ print ('[%s]: Lock acquired' % (time .strftime ('%Y:%m:%d %H:%M:%S' )))
51
+ with open (os .path .dirname (lock_file ) + DIR_LOCK_FILE , 'w' ) as dir_lh :
52
+ fcntl .flock (dir_lh , fcntl .LOCK_EX )
53
+ print ('[%s]: Starting heartbeat' % (time .strftime ('%Y:%m:%d %H:%M:%S' )))
54
+ os .utime (lock_file , None )
41
55
break
42
56
except IOError as e :
43
57
if not block :
44
58
print (e )
45
59
return 1
60
+
46
61
time .sleep (0.1 )
47
62
63
+ # to handle stale NFS locks
64
+ pulse = int (time .time () - os .path .getmtime (lock_file ))
65
+ if heartbeat < pulse :
66
+ # something is wrong
67
+ print ('[%s]: Lost heartbeat by %s secs' % (time .strftime ('%Y:%m:%d %H:%M:%S' ), pulse ))
68
+ lock_handle .close ()
69
+ # get dir lock
70
+ with open (os .path .dirname (lock_file ) + DIR_LOCK_FILE , 'w' ) as dir_lh :
71
+ fcntl .flock (dir_lh , fcntl .LOCK_EX )
72
+ # pulse check again after acquring dir lock
73
+ if heartbeat < int (time .time () - os .path .getmtime (lock_file )):
74
+ print ('[%s]: Recreating %s' % (time .strftime ('%Y:%m:%d %H:%M:%S' ), os .path .basename (lock_file )))
75
+ os .remove (lock_file )
76
+ open (lock_file , 'w' ).close ()
77
+
78
+ lock_handle = open (lock_file )
79
+ print ('[%s]: Reacquiring lock %s' %
80
+ (time .strftime ('%Y:%m:%d %H:%M:%S' ), os .path .basename (lock_file )))
81
+
82
+
48
83
if os .fork ():
49
84
return 0
50
85
else :
51
86
# Spawn a child process to hold on to the lock
52
87
if os .path .exists (sock_file ):
53
88
os .remove (sock_file )
54
- print ('[%s]: Holding on to the lock using %s' % (time .strftime ('%Y:%m:%d %H:%M:%S' ), sock_file ))
89
+ print ('[%s]: Lock held %s' % (time .strftime ('%Y:%m:%d %H:%M:%S' ), os . path . basename ( lock_file ) ))
55
90
listener = Listener (address = sock_file , authkey = AUTHKEY )
56
91
92
+ def listen ():
93
+ while True :
94
+ conn = listener .accept ()
95
+ if conn .recv ():
96
+ break
97
+ release ()
98
+
57
99
def release (sig = None , frame = None ):
58
100
"""
59
101
Release if the process is stopped/terminated
@@ -67,15 +109,15 @@ def release(sig=None, frame=None):
67
109
time .sleep (30 )
68
110
lock_handle .close ()
69
111
listener .close ()
70
- print ('[%s]: Lock released on %s' % (time .strftime ('%Y:%m:%d %H:%M:%S' ), lock_file ))
112
+ print ('[%s]: Lock released %s' % (time .strftime ('%Y:%m:%d %H:%M:%S' ), os . path . basename ( lock_file ) ))
71
113
72
114
signal .signal (signal .SIGTERM , release )
73
115
signal .signal (signal .SIGINT , release )
74
- while True :
75
- conn = listener . accept ()
76
- if conn . recv () :
77
- break
78
- release ( )
116
+ threading . Thread ( target = listen ). start ()
117
+
118
+ while not lock_handle . closed :
119
+ os . utime ( lock_file , None )
120
+ time . sleep ( 5 )
79
121
80
122
81
123
def check_lock (sock_file ):
@@ -90,7 +132,7 @@ def check_lock(sock_file):
90
132
cl = Client (address = sock_file , authkey = AUTHKEY )
91
133
cl .send (False )
92
134
cl .close ()
93
- print ('[%s]: Lock held' % (time .strftime ('%Y:%m:%d %H:%M:%S' )))
135
+ print ('[%s]: Lock held %s ' % (time .strftime ('%Y:%m:%d %H:%M:%S' ), os . path . basename ( sock_file )))
94
136
return 0
95
137
96
138
@@ -102,7 +144,7 @@ def release_lock(sock_file):
102
144
"""
103
145
if not os .path .exists (sock_file ):
104
146
return 1
105
- print ('[%s]: Connecting to the lock process %s' % (time .strftime ('%Y:%m:%d %H:%M:%S' ), sock_file ))
147
+ print ('[%s]: Releasing lock %s' % (time .strftime ('%Y:%m:%d %H:%M:%S' ), os . path . basename ( sock_file ) ))
106
148
cl = Client (address = sock_file , authkey = AUTHKEY )
107
149
cl .send (True )
108
150
cl .close ()
@@ -120,14 +162,16 @@ def main():
120
162
parser .add_argument ('--release' , action = 'store_true' , dest = 'release' )
121
163
parser .add_argument ('--file' , dest = 'lock_file' )
122
164
parser .add_argument ('--block' , action = 'store_true' , dest = 'block' )
165
+ # heartbeat in secs
166
+ parser .add_argument ('--heartbeat' , type = int , dest = 'heartbeat' , default = 30 )
123
167
args = parser .parse_args ()
124
168
if not args .lock_file :
125
169
parser .print_help ()
126
170
sys .exit ()
127
171
# Derive sock_file name from lock_file
128
172
sock_file = os .path .join (tempfile .gettempdir (), os .path .basename (args .lock_file ))
129
173
if args .acquire :
130
- sys .exit (acquire_lock (args .lock_file , sock_file , args .block ))
174
+ sys .exit (acquire_lock (args .lock_file , sock_file , args .block , args . heartbeat ))
131
175
elif args .check :
132
176
sys .exit (check_lock (sock_file ))
133
177
elif args .release :
0 commit comments