Skip to content

Commit a0d267a

Browse files
committed
optimize close behavior
1 parent 096e973 commit a0d267a

File tree

7 files changed

+105
-36
lines changed

7 files changed

+105
-36
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ __pycache__
22
*.egg-info
33
dist
44
build
5-
test.py
5+
test*

Connector/NodeBasicMethods.py

Lines changed: 47 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ def __init__(self, connection, server=None):
4242
self._queues = Queues(self)
4343
self._result_queues_for_future = Queues(self, private=True)
4444
self._recved_binary_queue = CloseableQueue()
45+
self._is_closing = False
46+
self._is_internal_closing = False
4547

4648
if self._parent is None:
4749
self._local_vars = {}
@@ -99,20 +101,21 @@ def hold_on(self):
99101
threading.Event().wait()
100102

101103

102-
def close(self):
103-
if self._parent is None:
104-
try:
105-
self._request(session_id=self._get_session_id())
106-
except BaseException:
107-
pass
108-
109-
try:
110-
self._connection.close()
111-
except BaseException:
112-
pass
104+
def _close(self):
105+
if self._is_internal_closing or self.is_closed:
106+
return
107+
108+
self._is_internal_closing = True
109+
110+
self_address = self.address
111+
try:
112+
self._connection.close()
113+
except BaseException:
114+
pass
113115

114-
self._connection = None
116+
self._connection = None
115117

118+
if self._parent is None:
116119
try:
117120
self._queue.close()
118121
except BaseException:
@@ -147,25 +150,18 @@ def close(self):
147150
self._actual_pipes.clear()
148151
except BaseException:
149152
pass
150-
151153
else:
152154
try:
153-
self._connection.close()
154-
except BaseException:
155-
pass
156-
157-
self._connection = None
158-
159-
try:
160-
if self.address is not None:
155+
if self_address is not None:
161156
result = self._parent._callback_threads_pool.submit(
162-
self._parent._on_disconnected, self.address)
163-
self._parent.disconnect_result[self.address] = result
157+
self._parent._on_disconnected, self_address
158+
)
159+
self._parent.disconnect_result[self_address] = result
164160
except BaseException:
165161
pass
166162

167163
try:
168-
self._parent._connected_nodes.pop(self.address)
164+
self._parent._connected_nodes.pop(self_address)
169165
except BaseException:
170166
pass
171167

@@ -193,6 +189,28 @@ def close(self):
193189
except BaseException:
194190
pass
195191

192+
self._is_internal_closing = False
193+
194+
195+
def close(self):
196+
if self._is_closing or self.is_closed:
197+
return
198+
199+
self._is_closing = True
200+
201+
try:
202+
self._request(session_id=self._get_session_id())
203+
except BaseException:
204+
pass
205+
206+
self._close()
207+
208+
self._is_closing = False
209+
210+
211+
def _process_close(self, session_id, request):
212+
self._close()
213+
196214

197215
def _get_session_id(self):
198216
return uuid.uuid4().bytes
@@ -228,10 +246,6 @@ def _traceback(self, remote=True):
228246
return exception_str
229247

230248

231-
def _process_close(self, request):
232-
self.close()
233-
234-
235249
def _send(self, session_id, type, value):
236250
binary = dill.dumps(value, 3)
237251
type_bytes = type.to_bytes(1, byteorder='little', signed=False)
@@ -392,10 +406,11 @@ def _recving_loop(self):
392406
while True:
393407
try:
394408
recved_binary = self._connection.recv(self.recv_buffer)
395-
self._recved_binary_queue.put(recved_binary)
396409
except BaseException:
397-
self.close()
410+
self._close()
398411
return
412+
413+
self._recved_binary_queue.put(recved_binary)
399414

400415

401416
def _decoding_loop(self):
@@ -449,12 +464,13 @@ def _decoding_loop(self):
449464
def init_basic_methods(cls):
450465
cls.__init__ = __init__
451466
cls.__del__ = __del__
467+
cls._close = _close
452468
cls.close = close
469+
cls._process_close = _process_close
453470
cls.hold_on = hold_on
454471
cls._start = _start
455472
cls._get_session_id = _get_session_id
456473
cls._traceback = _traceback
457-
cls._process_close = _process_close
458474
cls._send = _send
459475
cls._send_signal = _send_signal
460476
cls._request = _request

Connector/NodeFileTransferMethods.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ def _write_to_file(self, data, file_name):
1010
self._request(self._get_session_id(), data=data, file_name=file_name)
1111

1212

13-
def _process__write_to_file(self, request):
13+
def _process__write_to_file(self, session_id, request):
1414
try:
1515
file_name = os.path.abspath(request["data"]["file_name"])
1616
if self._out_file is not None and self._out_file.name != file_name:
@@ -34,7 +34,7 @@ def _close_file(self):
3434
self._request(self._get_session_id())
3535

3636

37-
def _process__close_file(self, request):
37+
def _process__close_file(self, session_id, request):
3838
try:
3939
if self._out_file is not None:
4040
self._out_file.close()

Connector/NodeInternalClasses.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,18 @@ def __delitem__(self, name):
3131

3232
collections.OrderedDict.__delitem__(self, name)
3333

34+
def __repr__(self):
35+
result = "[\n"
36+
i = 0
37+
for value in self.values():
38+
address = value.address
39+
result += f" Client('{address[0]}', {address[1]})"
40+
if i != len(self)-1:
41+
result += ","
42+
result += "\n"
43+
result += "]"
44+
return result
45+
3446

3547
class QueueDict(dict):
3648

Connector/Server.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def __init__(self, ip=None, port=None):
2323
self._queue = CloseableQueue()
2424
self._queues = QueueDict()
2525

26+
self._is_closing = False
2627
self._continue_accept = False
2728
self._accept_thread = None
2829

@@ -39,6 +40,23 @@ def start(self, ip=None, port=None):
3940
return
4041

4142
self._connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
43+
if hasattr(self, "_send_buffer"):
44+
self._connection.setsockopt(
45+
socket.SOL_SOCKET, socket.SO_SNDBUF, self._send_buffer
46+
)
47+
else:
48+
self._send_buffer = self._connection.getsockopt(
49+
socket.SOL_SOCKET, socket.SO_SNDBUF
50+
)
51+
52+
if hasattr(self, "_recv_buffer"):
53+
self._connection.setsockopt(
54+
socket.SOL_SOCKET, socket.SO_RCVBUF, self._recv_buffer
55+
)
56+
else:
57+
self._recv_buffer = self._connection.getsockopt(
58+
socket.SOL_SOCKET, socket.SO_RCVBUF
59+
)
4260

4361
if ip is None:
4462
ip = get_ip()
@@ -56,6 +74,10 @@ def start(self, ip=None, port=None):
5674
self._accept_thread.start()
5775

5876
def close(self):
77+
if self._is_closing:
78+
return
79+
80+
self._is_closing = True
5981
self._continue_accept = False
6082

6183
try:
@@ -92,6 +114,8 @@ def close(self):
92114
self._accept_thread.join()
93115
self._accept_thread = None
94116

117+
self._is_closing = False
118+
95119
@property
96120
def address(self):
97121
try:
@@ -234,6 +258,7 @@ def _acceptting_loop(self):
234258
self.connect_results[address] = result
235259
node._respond_ok(session_id=b'\x00'*16)
236260
except BaseException:
261+
self.close()
237262
break
238263

239264
@property

Connector/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
11
from .Client import Client
22
from .Server import Server
33
from .Config import Config
4+
5+
import gc
6+
import atexit
7+
8+
9+
@atexit.register
10+
def close_all():
11+
for obj in gc.get_objects():
12+
if isinstance(obj, Client):
13+
obj.close()
14+
15+
for obj in gc.get_objects():
16+
if isinstance(obj, Server):
17+
obj.close()

static_check.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
import os
2+
import subprocess
3+
import sys
24

35

46
def check_syntax(folder_name):
57
for home, dirs, files in os.walk(folder_name):
68
for file_name in files:
79
if file_name.endswith(".py") and file_name != "__init__.py":
8-
os.system("pyflakes " + home + "/" + file_name)
10+
subprocess.call([sys.executable, "-m", "pyflakes", home + "/" + file_name])
911

1012

1113
def check_style(folder_name):
1214
for home, dirs, files in os.walk(folder_name):
1315
for file_name in files:
1416
if file_name.endswith(".py"):
15-
os.system("pycodestyle " + home + "/" + file_name)
17+
subprocess.call([sys.executable, "-m", "pycodestyle", home + "/" + file_name])
1618

1719

1820
check_syntax("Connector")

0 commit comments

Comments
 (0)