@@ -52,7 +52,7 @@ class Server:
52
52
__NOT_FOUND = "404 Not Found"
53
53
__INT_ERROR = "500 Internal Server Error"
54
54
55
- __timeout = 5 # registration timeout
55
+ __timeout = 8 # registration timeout
56
56
__backup_delay = 5 # delay between successive backups
57
57
58
58
@@ -69,8 +69,6 @@ def __init__(self, server_id, server_address=('', 0), buf_size=None):
69
69
self .zookeeper_handler = KazooServiceRegistry ()
70
70
self .buf_size = buf_size if buf_size else self .__buf_size
71
71
72
-
73
-
74
72
self .config_list = None
75
73
76
74
self .socket = get_socket (server_address )
@@ -99,6 +97,7 @@ def __init__(self, server_id, server_address=('', 0), buf_size=None):
99
97
}
100
98
self .zookeeper_handler .set_node ('/cluster/meta' , data = data )
101
99
self .is_master = True
100
+ print ("{server}: Setting status to INITIALIZING" .format (server = self .name ))
102
101
103
102
if self .is_master :
104
103
print ("{server}: Assigned role of MASTER" .format (server = self .name ))
@@ -108,18 +107,13 @@ def __init__(self, server_id, server_address=('', 0), buf_size=None):
108
107
Server .__master_routine (self )
109
108
110
109
else :
111
- print ("{server}: Assigned role of SLAVE" .format (server = self .name ))
112
110
self .name = '[SLAVE] ' + self .name
113
111
114
112
Server .__slave_routine (self , tuple (master_address ))
115
113
116
114
time .sleep (np .random .randint (low = 0 , high = 3 ))
117
115
118
- print ("{server}: Updating mapping..." .format (server = self .name ))
119
-
120
- data = {'address' : list (self .server_address ) }
121
- self .zookeeper_handler .set_data ('/cluster/mapping/' + str (self .primary_key ) + '/primary' , data = data )
122
- self .zookeeper_handler .set_data ('/cluster/mapping/' + str (self .secondary_key ) + '/secondary' , data = data )
116
+ print ("{server}: Updating mapping" .format (server = self .name ))
123
117
124
118
self .cluster_size = self .zookeeper_handler .get ('/cluster/meta' )['data' ]['size' ]
125
119
@@ -143,19 +137,21 @@ def __master_routine(self):
143
137
set the key to server mapping in zookeeper too
144
138
self.nd.set_node('/cluster/mapping/' + str(num), data={'primary': ..., 'secondary', ... })
145
139
"""
146
- self .op_list = ["__reg__" , "__unreg__ " , "map" , "get" , "put" ]
140
+ self .op_list = ["__reg__" , "__config__ " , "map" , "get" , "put" ]
147
141
148
- self .start () # deploy master and listen for registrations
142
+ self .keep_alive = True
143
+ self .service_task = threading .Thread (target = self .__listen_forever )
144
+ self .service_task .start () # deploy master and listen for registrations
149
145
150
- print ("{server}: Beginning registrations for slave nodes " .format (server = self .name ))
146
+ print ("{server}: Beginning registration " .format (server = self .name ))
151
147
152
148
self .accept_reg = True
153
149
start = time .time ()
154
150
while time .time () <= start + self .__timeout :
155
151
pass
156
152
self .accept_reg = False
157
153
158
- print ("{server}: Stopping registration " .format (server = self .name ))
154
+ print ("{server}: Setting status to READY " .format (server = self .name ))
159
155
160
156
self .zookeeper_handler .set_data ('/cluster/meta' , data = self .zookeeper_handler .get ('/cluster/meta' )['data' ].update ({'status' : 'ready' }))
161
157
@@ -175,17 +171,14 @@ def __master_routine(self):
175
171
176
172
key_list = zip (x , y )
177
173
178
- self .primary_key = key_list [0 ][0 ]
179
- self .secondary_key = key_list [0 ][1 ]
180
-
181
- for i , slave_address in enumerate (self .slave_list ):
174
+ for i , slave_address in enumerate ([self .server_address ] + self .slave_list ):
182
175
temp_socket = get_socket () # will shutdown the existing socket and recreate it with the same address
183
176
184
- print ("{server}: Configuring {client}" .format (server = self .name , client = ':' .join (list (map (str , self . server_address ))) ))
177
+ print ("{server}: Configuring {client}" .format (server = self .name , client = ':' .join (list (map (str , slave_address ))) ))
185
178
186
179
temp_socket .connect (slave_address )
187
180
188
- data = {'op' : '__config__' , 'primary' : key_list [i + 1 ][0 ], 'secondary' : key_list [i + 1 ][1 ] }
181
+ data = {'op' : '__config__' , 'primary' : key_list [i ][0 ], 'secondary' : key_list [i ][1 ] }
189
182
temp_socket .sendall (json .dumps (data ).encode ('utf-8' ))
190
183
_ = temp_socket .recv (self .buf_size ) # don't continue until the slave acknowledges the request and closes the connection
191
184
temp_socket .close () # just to be on the safe side
@@ -195,7 +188,7 @@ def __master_routine(self):
195
188
def __slave_routine (self , master_address ):
196
189
# get the master address, and send a signal to the master that the other server is ready
197
190
# wait for the ready signal, get server mappings from master and note it down
198
- self .op_list = ["__config__" , "get" , "put" ]
191
+ self .op_list = ["__notify__" , " __config__" , "get" , "put" ]
199
192
200
193
temp_socket = get_socket ()
201
194
temp_socket .connect (master_address )
@@ -206,9 +199,9 @@ def __slave_routine(self, master_address):
206
199
_ = temp_socket .recv (self .buf_size )
207
200
temp_socket .close ()
208
201
209
- self .start ()
210
-
211
- print ( "{server}: Waiting for config info from master..." . format ( server = self . name ))
202
+ self .keep_alive = True
203
+ self . service_task = threading . Thread ( target = self . __listen_forever )
204
+ self . service_task . start ()
212
205
213
206
self .configured = False
214
207
@@ -239,30 +232,23 @@ def backup_data(entries, server_address, dtype):
239
232
while True :
240
233
if time .time () > last_backup_timestamp + self .__backup_delay :
241
234
if len (self .backup_data_dict ['secondary' ]) > 0 :
242
- print ("{server}: Backing up data to secondary... " .format (server = self .name ))
235
+ print ("{server}: Backing up to secondary" .format (server = self .name ))
243
236
entries = copy .copy (self .backup_data_dict ['secondary' ])
244
237
server_address = get_secondary_server (get_hash_partition (list (entries .keys ())[0 ], self .cluster_size ))
245
238
self .backup_data_dict ['secondary' ] = {}
246
239
backup_data (entries , server_address , 'secondary' )
247
240
248
- if len (self .backup_data_dict ['primary' ]) > 0 :
249
- print ("{server}: Sending secondary data to primary..." .format (server = self .name ))
250
- server_address = get_primary_server (get_hash_partition (list (entries .keys ())[0 ], self .cluster_size ))
251
- entries = copy .copy (self .backup_data_dict ['primary' ])
252
- self .backup_data_dict ['primary' ] = {}
253
- backup_data (entries , server_address , 'primary' )
254
-
255
241
last_backup_timestamp = time .time ()
256
242
257
243
backup_service_thread = threading .Thread (target = run_backup_service )
258
244
backup_service_thread .start ()
259
245
260
246
261
247
""" The giant mainloop begins here """
262
- while self .__keep_alive :
248
+ while self .keep_alive :
263
249
(client_socket , client_address ) = self .socket .accept ()
264
- print ("{server}: Connected to {client}" .format (server = self .name , client = client_address ))
265
-
250
+ # print("{server}: Connected to {client}".format(server=self.name, client=client_address))
251
+
266
252
data = client_socket .recv (self .buf_size )
267
253
try :
268
254
data = json .loads (data .decode ('utf-8' ))
@@ -295,13 +281,8 @@ def backup_data(entries, server_address, dtype):
295
281
response = {'status' : self .__BAD_REQ }
296
282
client_socket .sendall (json .dumps (response ).encode ('utf-8' ))
297
283
298
- if op_code == "__unreg__" :
299
- # TODO: remove reg info for that server, remove mapping
300
- # self.accept_reg = True
301
- pass
302
-
303
284
elif op_code == "map" :
304
- print ("{server}: Received MAP request from {client}" .format (server = self .name , client = client_address ))
285
+ print ("{server}: MAP from {client}" .format (server = self .name , client = client_address ))
305
286
try :
306
287
if self .is_master :
307
288
response = { 'status' : self .__SUCCESS ,
@@ -325,12 +306,15 @@ def backup_data(entries, server_address, dtype):
325
306
response = {'status' : self .__BAD_REQ }
326
307
client_socket .sendall (json .dumps (response ).encode ('utf-8' ))
327
308
328
- # This code only applies to the slave
329
309
elif op_code == "__config__" :
330
310
try :
331
- print ("{server}: Received config info from master" .format (server = self .name ))
332
311
self .primary_key = data ['primary' ]
333
312
self .secondary_key = data ['secondary' ]
313
+
314
+ data = {'address' : list (self .server_address ) }
315
+ self .zookeeper_handler .set_data ('/cluster/mapping/' + str (self .primary_key ) + '/primary' , data = data )
316
+ self .zookeeper_handler .set_data ('/cluster/mapping/' + str (self .secondary_key ) + '/secondary' , data = data )
317
+
334
318
response = {'status' : self .__SUCCESS }
335
319
client_socket .sendall (json .dumps (response ).encode ('utf-8' ))
336
320
self .configured = True
@@ -341,7 +325,7 @@ def backup_data(entries, server_address, dtype):
341
325
342
326
elif op_code == "put" :
343
327
""" Format: { 'op' : ..., 'key' : ..., 'value' : ... } """
344
- print ("{server}: Received PUT request from {client}" .format (server = self .name , client = client_address ))
328
+ print ("{server}: PUT from {client}" .format (server = self .name , client = client_address ))
345
329
try :
346
330
# Assume one of the cluster servers sent this request
347
331
if data .has_key ('type' ):
@@ -378,7 +362,7 @@ def backup_data(entries, server_address, dtype):
378
362
379
363
elif op_code == "get" :
380
364
print self .data_dict
381
- print ("{server}: Received GET request from {client}" .format (server = self .name , client = client_address ))
365
+ print ("{server}: GET from {client}" .format (server = self .name , client = client_address ))
382
366
try :
383
367
response = { 'status' : self .__SUCCESS ,
384
368
'data' : ''
@@ -409,27 +393,11 @@ def backup_data(entries, server_address, dtype):
409
393
410
394
# -------------------------------------------------------------------------------------
411
395
412
- def start (self ):
413
- self .__keep_alive = True
414
- self .service_task = threading .Thread (target = self .__listen_forever )
415
- self .service_task .start ()
416
-
417
- # -------------------------------------------------------------------------------------
418
-
419
- def restart (self ):
420
- # TODO: re-register with the master
421
- self .__keep_alive = True
422
- self .service_task = threading .Thread (target = self .__listen_forever )
423
- self .service_task .start ()
424
-
425
- # -------------------------------------------------------------------------------------
426
-
427
396
def stop (self ):
428
- self .__keep_alive = False
429
- self .socket .close ()
430
- self .service_task .join (1 )
431
-
432
- # TODO: now unregister from the cluster, inform the master that you're out of there
397
+ if self .keep_alive :
398
+ self .keep_alive = False
433
399
400
+ self .socket .close ()
401
+ self .service_task .join (1 )
434
402
435
403
""" ------------------------------------------------------------------------------------------- """
0 commit comments