Skip to content

Commit e6ea055

Browse files
committed
split internal classes into different files
1 parent a0d267a commit e6ea055

16 files changed

+548
-539
lines changed

Connector/AutoSendBuffer.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import threading
2+
import time
3+
4+
5+
class AutoSendBuffer:
6+
7+
def __init__(self, node, session_id, timeout=0.1, quiet=False):
8+
self._node = node
9+
self._session_id = session_id
10+
self._buffer_lock = threading.Lock()
11+
self._buffer = b''
12+
self._quiet = quiet
13+
self._timeout = timeout
14+
self._last_send_time = time.time()
15+
self._continue_send = True
16+
self._sending_thread = threading.Thread(target=self._timeout_send)
17+
self._sending_thread.start()
18+
19+
def __del__(self):
20+
try:
21+
self.stop()
22+
except BaseException:
23+
pass
24+
25+
def append(self, byte):
26+
with self._buffer_lock:
27+
self._buffer += byte
28+
29+
if byte == b"\n" or len(self._buffer) >= 512:
30+
self.send()
31+
32+
def send(self):
33+
with self._buffer_lock:
34+
if len(self._buffer) != 0:
35+
content = self._buffer.decode("utf-8")
36+
if not self._quiet:
37+
print(content, end="", flush=True)
38+
self._node._respond_ok(
39+
self._session_id, end=False, stdout=content
40+
)
41+
self._buffer = b''
42+
self._last_send_time = time.time()
43+
44+
def stop(self):
45+
self._continue_send = False
46+
self._sending_thread.join()
47+
self.send()
48+
49+
def _timeout_send(self):
50+
while self._continue_send:
51+
if time.time()-self._last_send_time >= self._timeout:
52+
self.send()
53+
time.sleep(1E-3)

Connector/Future.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import threading
2+
3+
from .utils import eprint
4+
5+
6+
class Future:
7+
8+
def __init__(self, node, session_id):
9+
self._node = node
10+
self._session_id = session_id
11+
self._cancelled = False
12+
self._done = None
13+
self._result = None
14+
self._monitor_done_thread = None
15+
self._done_callbacks = []
16+
17+
def cancel(self):
18+
if self.done():
19+
return False
20+
21+
self._node._stop_send = True
22+
self._node._send_signal(self._session_id, cancel=True)
23+
result_queues = self._node._result_queues_for_future
24+
self._result = result_queues[self._session_id].get()
25+
self._node._close_session(self._session_id)
26+
self._cancelled = True
27+
self._done = True
28+
return True
29+
30+
def cancelled(self):
31+
return self._cancelled
32+
33+
def done(self):
34+
if self._done is not None:
35+
return self._done
36+
else:
37+
result_queues = self._node._result_queues_for_future
38+
return (len(result_queues[self._session_id]) > 0)
39+
40+
def result(self):
41+
if self._result is None:
42+
result_queues = self._node._result_queues_for_future
43+
self._result = result_queues[self._session_id].get()
44+
self._node._close_session(self._session_id)
45+
46+
self._done = True
47+
if self._result["success"]:
48+
return self._result["return_value"]
49+
else:
50+
eprint(self._result["traceback"])
51+
raise self._result["exception"]
52+
53+
def exception(self):
54+
if self._result is None:
55+
result_queues = self._node._result_queues_for_future
56+
self._result = result_queues[self._session_id].get()
57+
self._node._close_session(self._session_id)
58+
59+
self._done = True
60+
return self._result["exception"]
61+
62+
def _get_result(self):
63+
if self._result is None:
64+
result_queues = self._node._result_queues_for_future
65+
self._result = result_queues[self._session_id].get()
66+
self._node._close_session(self._session_id)
67+
68+
for func in self._done_callbacks:
69+
func(self)
70+
71+
def add_done_callback(self, func):
72+
self._done_callbacks.append(func)
73+
74+
if self._monitor_done_thread is None:
75+
self._monitor_done_thread = threading.Thread(
76+
target=self._get_result
77+
)
78+
self._monitor_done_thread.start()
79+
80+
def remove_done_callback(self, func):
81+
try:
82+
self._done_callbacks.remove(func)
83+
except BaseException:
84+
pass

Connector/KillableThread.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import threading
2+
import ctypes
3+
4+
5+
class KillableThread(threading.Thread):
6+
7+
def __init__(self, target, args=(), kwargs={}):
8+
threading.Thread.__init__(
9+
self, target=target,
10+
args=args, kwargs=kwargs
11+
)
12+
13+
def __get_id(self):
14+
if hasattr(self, '_thread_id'):
15+
return self._thread_id
16+
17+
for id, thread in threading._active.items():
18+
if thread is self:
19+
return id
20+
21+
def kill(self):
22+
thread_id = self.__get_id()
23+
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
24+
thread_id, ctypes.py_object(SystemExit)
25+
)
26+
if res > 1:
27+
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
28+
print('Exception raise failure')

Connector/NodeBasicMethods.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99
from .CloseableQueue import CloseableQueue
1010
from .CloseablePipe import CloseablePipe
11-
from .NodeInternalClasses import Pipes, Queues, PipeDict, QueueDict, ServerPeer
11+
from .Pipes import PipeDict, Pipes
12+
from .Queues import QueueDict, Queues
13+
from .ServerPeer import ServerPeer
1214

1315
complex_request = ["get_file", "put_file",
1416
"get_folder", "put_folder",
@@ -104,9 +106,9 @@ def hold_on(self):
104106
def _close(self):
105107
if self._is_internal_closing or self.is_closed:
106108
return
107-
109+
108110
self._is_internal_closing = True
109-
111+
110112
self_address = self.address
111113
try:
112114
self._connection.close()
@@ -195,7 +197,7 @@ def _close(self):
195197
def close(self):
196198
if self._is_closing or self.is_closed:
197199
return
198-
200+
199201
self._is_closing = True
200202

201203
try:
@@ -409,7 +411,7 @@ def _recving_loop(self):
409411
except BaseException:
410412
self._close()
411413
return
412-
414+
413415
self._recved_binary_queue.put(recved_binary)
414416

415417

Connector/NodeFileTransferMethods.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33

44
from .utils import file_size, md5, eprint
55
from .Config import Config
6-
from .NodeInternalClasses import Future, Thread
6+
from .Future import Future
7+
from .KillableThread import KillableThread
78

89

910
def _write_to_file(self, data, file_name):
@@ -148,7 +149,7 @@ def session():
148149

149150
self._make_signal(session_id)
150151

151-
thread = Thread(target=session)
152+
thread = KillableThread(target=session)
152153
thread.start()
153154
signal = self._recv_signal(session_id)
154155
if signal["cancel"] and thread.is_alive():
@@ -201,7 +202,7 @@ def session():
201202
if block:
202203
session()
203204
else:
204-
thread = Thread(target=session)
205+
thread = KillableThread(target=session)
205206
thread.start()
206207
return Future(self, session_id)
207208

@@ -251,7 +252,7 @@ def session():
251252

252253
self._make_signal(session_id)
253254

254-
thread = Thread(target=session)
255+
thread = KillableThread(target=session)
255256
thread.start()
256257
signal = self._recv_signal(session_id)
257258
if signal["cancel"] and thread.is_alive():
@@ -444,7 +445,7 @@ def session():
444445

445446
self._make_signal(session_id)
446447

447-
thread = Thread(target=session)
448+
thread = KillableThread(target=session)
448449
thread.start()
449450
signal = self._recv_signal(session_id)
450451
if signal["cancel"] and thread.is_alive():
@@ -678,7 +679,7 @@ def session():
678679

679680
self._make_signal(session_id)
680681

681-
thread = Thread(target=session)
682+
thread = KillableThread(target=session)
682683
thread.start()
683684
signal = self._recv_signal(session_id)
684685
if signal["cancel"] and thread.is_alive():

0 commit comments

Comments
 (0)