9
9
from notebook .utils import url2path
10
10
11
11
12
+ # The delay in ms at which we send the chunk of data
13
+ # to the client.
14
+ ARCHIVE_DOWNLOAD_FLUSH_DELAY = 100
15
+
16
+
12
17
class ArchiveStream ():
13
18
def __init__ (self , handler ):
14
19
self .handler = handler
@@ -23,7 +28,10 @@ def tell(self):
23
28
return self .position
24
29
25
30
def flush (self ):
26
- self .handler .flush ()
31
+ # Note: Flushing is done elsewhere, in the main thread
32
+ # because `write()` is called in a background thread.
33
+ # self.handler.flush()
34
+ pass
27
35
28
36
29
37
def make_writer (handler , archive_format = "zip" ):
@@ -89,30 +97,40 @@ def get(self, archive_path, include_body=False):
89
97
self .set_header ('content-disposition' ,
90
98
'attachment; filename={}' .format (archive_filename ))
91
99
92
- task = asyncio .ensure_future (self .archive_and_download (archive_path , archive_format , archive_token ))
100
+ self .canceled = False
101
+ self .flush_cb = ioloop .PeriodicCallback (self .flush , ARCHIVE_DOWNLOAD_FLUSH_DELAY )
102
+ self .flush_cb .start ()
103
+
104
+ args = (archive_path , archive_format , archive_token )
105
+ yield ioloop .IOLoop .current ().run_in_executor (None , self .archive_and_download , * args )
93
106
94
- try :
95
- yield task
96
- except (asyncio .CancelledError , iostream .StreamClosedError ):
97
- task .cancel ()
107
+ if self .canceled :
98
108
self .log .info ('Download canceled.' )
99
109
else :
110
+ self .flush ()
100
111
self .log .info ('Finished downloading {}.' .format (archive_filename ))
101
112
102
- @gen .coroutine
113
+ self .set_cookie ("archiveToken" , archive_token )
114
+ self .flush_cb .stop ()
115
+ self .finish ()
116
+
103
117
def archive_and_download (self , archive_path , archive_format , archive_token ):
104
118
105
119
with make_writer (self , archive_format ) as archive :
106
120
prefix = len (str (archive_path .parent )) + len (os .path .sep )
107
121
for root , _ , files in os .walk (archive_path ):
108
122
for file_ in files :
109
123
file_name = os .path .join (root , file_ )
110
- self .log .debug ("{}\n " .format (file_name ))
111
- archive .add (file_name , os .path .join (root [prefix :], file_ ))
112
- yield self .flush ()
113
-
114
- self .set_cookie ("archiveToken" , archive_token )
115
- self .finish ()
124
+ if not self .canceled :
125
+ self .log .debug ("{}\n " .format (file_name ))
126
+ archive .add (file_name , os .path .join (root [prefix :], file_ ))
127
+ else :
128
+ break
129
+
130
+ def on_connection_close (self ):
131
+ super ().on_connection_close ()
132
+ self .canceled = True
133
+ self .flush_cb .stop ()
116
134
117
135
118
136
class ExtractArchiveHandler (IPythonHandler ):
0 commit comments