@@ -78,7 +78,7 @@ def __init__(self, server_id, server_address=('', 0), buf_size=None):
78
78
socket .gethostname ()
79
79
), \
80
80
self .socket .getsockname ()[1 ]
81
- self .name = self .IP + ":" + str (self .port )
81
+ self .name = self .IP + ":" + str (self .port ) + ' (' + str ( os . getpid ()) + ')'
82
82
83
83
self .zookeeper_handler .set_data ('/cluster/servers/' + ':' .join (list (map (str , self .server_address ))) )
84
84
@@ -97,11 +97,11 @@ def __init__(self, server_id, server_address=('', 0), buf_size=None):
97
97
}
98
98
self .zookeeper_handler .set_node ('/cluster/meta' , data = data )
99
99
self .is_master = True
100
- print ("{server}: Setting status to INITIALIZING" .format (server = self .name ))
101
100
102
101
if self .is_master :
103
102
print ("{server}: Assigned role of MASTER" .format (server = self .name ))
104
103
self .name = '[MASTER] ' + self .name
104
+ print ("{server}: Set status to INITIALIZING" .format (server = self .name ))
105
105
self .slave_list = []
106
106
107
107
Server .__master_routine (self )
@@ -121,7 +121,11 @@ def __init__(self, server_id, server_address=('', 0), buf_size=None):
121
121
122
122
def __del__ (self ):
123
123
print ("{server}: Shutting down, thanks!" .format (server = self .name ))
124
+
125
+ self .keep_alive = False
124
126
self .socket .close ()
127
+ self .service_task .join (1 )
128
+
125
129
126
130
# -------------------------------------------------------------------------------------
127
131
@@ -151,7 +155,7 @@ def __master_routine(self):
151
155
pass
152
156
self .accept_reg = False
153
157
154
- print ("{server}: Setting status to READY" .format (server = self .name ))
158
+ print ("{server}: Set status to READY" .format (server = self .name ))
155
159
156
160
self .zookeeper_handler .set_data ('/cluster/meta' , data = self .zookeeper_handler .get ('/cluster/meta' )['data' ].update ({'status' : 'ready' }))
157
161
@@ -325,24 +329,26 @@ def backup_data(entries, server_address, dtype):
325
329
326
330
elif op_code == "put" :
327
331
""" Format: { 'op' : ..., 'key' : ..., 'value' : ... } """
328
- print ("{server}: PUT from {client}" .format (server = self .name , client = client_address ))
329
332
try :
330
333
# Assume one of the cluster servers sent this request
331
334
if data .has_key ('type' ):
335
+ print ("{server}: bkupPUT from {client}" .format (server = self .name , client = client_address ))
336
+
332
337
if data ['type' ] == 'secondary' : # data is meant to be backup
333
338
self .data_dict ['secondary' ].update (data ['data' ])
334
339
elif data ['type' ] == 'primary' : # happens if this server was down earlier, the secondary server is now sending this data
335
340
self .data_dict ['primary' ].update (data ['data' ])
336
341
337
342
else :
343
+ print ("{server}: PUT from {client}" .format (server = self .name , client = client_address ))
338
344
entry = copy .copy (data ['data' ])
339
345
hashed_key = get_hash_partition (data ['data' ].popitem ()[0 ], self .cluster_size )
340
346
341
347
if hashed_key == self .primary_key :
342
348
self .data_dict ['primary' ].update (entry ) # update it in the primary key list
343
349
self .backup_data_dict ['secondary' ].update (entry ) # mark it for backup later
344
350
elif hashed_key == self .secondary_key :
345
- print ("{server}: Secondary server servicing PUT request " .format (server = self .name , client = client_address ))
351
+ print ("{server}: Secondary server for this key " .format (server = self .name , client = client_address ))
346
352
self .data_dict ['secondary' ].update (entry )
347
353
self .backup_data_dict ['primary' ].update (entry )
348
354
else :
@@ -361,17 +367,18 @@ def backup_data(entries, server_address, dtype):
361
367
client_socket .sendall (json .dumps (response ).encode ('utf-8' ))
362
368
363
369
elif op_code == "get" :
364
- print self .data_dict
365
370
print ("{server}: GET from {client}" .format (server = self .name , client = client_address ))
371
+ pprint .pprint (self .data_dict )
366
372
try :
367
373
response = { 'status' : self .__SUCCESS ,
368
374
'data' : ''
369
375
}
370
- print data [ 'key' ]
376
+ pprint . pprint ( data )
371
377
if self .data_dict ['primary' ].has_key (data ['key' ]):
372
378
response ['data' ] = self .data_dict ['primary' ][data ['key' ]]
373
379
374
380
elif self .data_dict ['secondary' ].has_key (data ['key' ]):
381
+ print ("{server}: Secondary server for this key" .format (server = self .name , client = client_address ))
375
382
response ['data' ] = self .data_dict ['secondary' ][data ['key' ]]
376
383
377
384
else :
@@ -391,13 +398,4 @@ def backup_data(entries, server_address, dtype):
391
398
392
399
backup_service_thread .join (0 )
393
400
394
- # -------------------------------------------------------------------------------------
395
-
396
- def stop (self ):
397
- if self .keep_alive :
398
- self .keep_alive = False
399
-
400
- self .socket .close ()
401
- self .service_task .join (1 )
402
-
403
401
""" ------------------------------------------------------------------------------------------- """
0 commit comments