Skip to content

Commit 3377b8b

Browse files
authored
Fix data corruption due to race condition (#89)
* Stop the callback thread definitely even when an exception occurs during the archive process * Lock write and flush buffer process to avoid race condition
1 parent 313d508 commit 3377b8b

File tree

1 file changed

+26
-18
lines changed

1 file changed

+26
-18
lines changed

jupyter_archive/handlers.py

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import tarfile
44
import time
55
import zipfile
6+
import threading
67

78
from jupyter_server.base.handlers import JupyterHandler
89
from jupyter_server.utils import url2path, url_path_join
@@ -41,7 +42,8 @@ def write(self, data):
4142
if time_out_cnt <= 0:
4243
raise ValueError("Time out for writing into tornado buffer")
4344
self.position += len(data)
44-
self.handler.write(data)
45+
with self.handler.lock:
46+
self.handler.write(data)
4547
del data
4648

4749
def tell(self):
@@ -89,6 +91,7 @@ def make_reader(archive_path):
8991

9092

9193
class DownloadArchiveHandler(JupyterHandler):
94+
lock = threading.Lock()
9295

9396
@property
9497
def stream_max_buffer_size(self):
@@ -107,7 +110,8 @@ def flush(self, include_footers=False, force=False):
107110
stream_buffer = self.request.connection.stream._write_buffer
108111
if not force and stream_buffer and len(stream_buffer) > self.stream_max_buffer_size:
109112
return
110-
return super(DownloadArchiveHandler, self).flush(include_footers)
113+
with self.lock:
114+
return super(DownloadArchiveHandler, self).flush(include_footers)
111115

112116
@web.authenticated
113117
async def get(self, archive_path, include_body=False):
@@ -154,24 +158,28 @@ async def get(self, archive_path, include_body=False):
154158
self.flush_cb = ioloop.PeriodicCallback(self.flush, self.archive_download_flush_delay)
155159
self.flush_cb.start()
156160

157-
args = (
158-
archive_path,
159-
archive_format,
160-
archive_token,
161-
follow_symlinks,
162-
download_hidden,
163-
)
164-
await ioloop.IOLoop.current().run_in_executor(None, self.archive_and_download, *args)
165-
166-
if self.canceled:
167-
self.log.info("Download canceled.")
168-
else:
169-
# Here, we need to flush forcibly to move all data from _write_buffer to stream._write_buffer
170-
self.flush(force=True)
171-
self.log.info("Finished downloading {}.".format(archive_filename))
161+
try:
162+
args = (
163+
archive_path,
164+
archive_format,
165+
archive_token,
166+
follow_symlinks,
167+
download_hidden,
168+
)
169+
await ioloop.IOLoop.current().run_in_executor(None, self.archive_and_download, *args)
170+
171+
if self.canceled:
172+
self.log.info("Download canceled.")
173+
else:
174+
# Here, we need to flush forcibly to move all data from _write_buffer to stream._write_buffer
175+
self.flush(force=True)
176+
self.log.info("Finished downloading {}.".format(archive_filename))
177+
except Exception:
178+
raise
179+
finally:
180+
self.flush_cb.stop()
172181

173182
self.set_cookie("archiveToken", archive_token)
174-
self.flush_cb.stop()
175183
self.finish()
176184

177185
def archive_and_download(

0 commit comments

Comments
 (0)