2
2
3
3
import sys
4
4
5
- import memray
5
+
6
6
from flask import Response
7
7
8
8
print ("Initializing " , file = sys .stderr )
9
9
import flask
10
10
from util .gcp_utils import (
11
11
increment_invocation_count ,
12
12
count_invocations_by_path ,
13
- memray_filename ,
14
13
)
15
14
from util .detect_gae import detect_gae
16
15
20
19
21
20
app .wsgi_app = google .appengine .api .wrap_wsgi_app (app .wsgi_app )
22
21
23
- from util .utils import init_logging , memray_tempdir
22
+ from util .utils import init_logging
24
23
25
24
# Must init logging before any library code writes logs (which would then just override our config)
26
25
init_logging ()
54
53
is_in_test_or_dev_project ,
55
54
is_test_or_dev_configuration ,
56
55
iris_homepage_text ,
57
- use_memray ,
58
56
)
59
57
from util .utils import log_time , timing
60
58
76
74
PluginHolder .init ()
77
75
78
76
79
- @app .route ("/list_memray" )
80
- def list_memray ():
81
- s = ""
82
- for f in os .listdir (memray_tempdir ()):
83
- s += f"<a href='/memrayfiles/{ f } '>{ f } </a><br>\n "
84
- if not s :
85
- s = "NONE FOUND"
86
- return Response (s , mimetype = "text/html" , status = 200 )
87
-
88
-
89
- @app .route ("/memrayfiles/<filename>" )
90
- def memrayfiles (filename ):
91
-
92
- with open (f"{ memray_tempdir ()} /{ filename } " , "rb" ) as f :
93
- content = f .read ()
94
-
95
- return Response (content , mimetype = "application/octet-stream" , status = 200 )
96
-
97
-
98
77
@app .route ("/" )
99
78
def index ():
100
- def __index ():
101
- increment_invocation_count ("index" )
102
- with gae_memory_logging ("index" ):
103
- msg = iris_homepage_text ()
104
- if config_utils .is_test_or_dev_configuration ():
105
- msg += "\n I'm running in test or dev mode."
106
79
107
- logging .info (
108
- "index(); invocations of GAE instance : %s" , count_invocations_by_path ()
109
- )
110
- return Response (msg , mimetype = "text/plain" , status = 200 )
80
+ increment_invocation_count ("index" )
81
+ with gae_memory_logging ("index" ):
82
+ msg = iris_homepage_text ()
83
+ if config_utils .is_test_or_dev_configuration ():
84
+ msg += "\n I'm running in test or dev mode."
111
85
112
- if use_memray ():
113
- with memray .Tracker (memray_filename ("index" )):
114
- return __index ()
115
- else :
116
- return __index ()
86
+ logging .info (
87
+ "index(); invocations of GAE instance : %s" , count_invocations_by_path ()
88
+ )
89
+ return Response (msg , mimetype = "text/plain" , status = 200 )
117
90
118
91
119
92
@app .route ("/_ah/warmup" )
120
93
def warmup ():
121
- def __warmup ():
122
94
123
- increment_invocation_count ("warmup" )
124
- with gae_memory_logging ("warmup" ):
125
- logging .info ("warmup() called" )
95
+ increment_invocation_count ("warmup" )
96
+ with gae_memory_logging ("warmup" ):
97
+ logging .info ("warmup() called" )
126
98
127
- return "" , 200 , {}
128
-
129
- if use_memray ():
130
- with memray .Tracker (memray_filename ("warmup" )):
131
- return __warmup ()
132
- else :
133
- return __warmup ()
99
+ return "" , 200 , {}
134
100
135
101
136
102
@app .route ("/schedule" , methods = ["GET" ])
@@ -140,31 +106,23 @@ def schedule():
140
106
Send out a message per-plugin per-project to label all objects of that type and project.
141
107
"""
142
108
143
- def __schedule ():
144
-
145
- increment_invocation_count ("schedule" )
146
- with gae_memory_logging ("schedule" ):
147
- try :
148
- logging .info ("Schedule called" )
149
-
150
- is_cron = flask .request .headers .get ("X-Appengine-Cron" )
151
- if not is_cron :
152
- return "Access Denied: No Cron header found" , 403
153
-
154
- enabled_projects = __get_enabled_projects ()
155
- __send_pubsub_per_projectplugin (enabled_projects )
156
- # All errors are actually caught before this point,
157
- # since most errors are unrecoverable.
158
- return "OK" , 200
159
- except Exception :
160
- logging .exception ("In schedule()" )
161
- return "Error" , 500
162
-
163
- if use_memray ():
164
- with memray .Tracker (memray_filename ("schedule" )):
165
- return __schedule ()
166
- else :
167
- return __schedule ()
109
+ increment_invocation_count ("schedule" )
110
+ with gae_memory_logging ("schedule" ):
111
+ try :
112
+ logging .info ("Schedule called" )
113
+
114
+ is_cron = flask .request .headers .get ("X-Appengine-Cron" )
115
+ if not is_cron :
116
+ return "Access Denied: No Cron header found" , 403
117
+
118
+ enabled_projects = __get_enabled_projects ()
119
+ __send_pubsub_per_projectplugin (enabled_projects )
120
+ # All errors are actually caught before this point,
121
+ # since most errors are unrecoverable.
122
+ return "OK" , 200
123
+ except Exception :
124
+ logging .exception ("In schedule()" )
125
+ return "Error" , 500
168
126
169
127
170
128
@lru_cache (maxsize = 1 )
@@ -237,68 +195,59 @@ def __send_pubsub_per_projectplugin(configured_projects):
237
195
238
196
@app .route ("/label_one" , methods = ["POST" ])
239
197
def label_one ():
240
- def __label_one ():
241
198
242
- increment_invocation_count ("label_one" )
243
- with gae_memory_logging ("label_one" ):
199
+ increment_invocation_count ("label_one" )
200
+ with gae_memory_logging ("label_one" ):
244
201
245
- plugins_found = []
246
- data = {}
247
- try :
248
- """
249
- PubSub push endpoint for messages from the Log Sink
250
- """
251
- # Performance question: There are multiple log lines for each object-creation, for example,
252
- # one for request and one for response. So, we may be labeling each object multiple times,
253
- # which is a waste of resources.
254
- #
255
- # Or maybe not. Maybe the first PubSub-triggered action fails, because the resource is not initialized, and
256
- # then the second one succeeds; need to check that.
257
-
258
- data = __extract_pubsub_content ()
202
+ plugins_found = []
203
+ data = {}
204
+ try :
205
+ """
206
+ PubSub push endpoint for messages from the Log Sink
207
+ """
208
+ # Performance question: There are multiple log lines for each object-creation, for example,
209
+ # one for request and one for response. So, we may be labeling each object multiple times,
210
+ # which is a waste of resources.
211
+ #
212
+ # Or maybe not. Maybe the first PubSub-triggered action fails, because the resource is not initialized, and
213
+ # then the second one succeeds; need to check that.
259
214
260
- method_from_log = data [ "protoPayload" ][ "methodName" ]
215
+ data = __extract_pubsub_content ()
261
216
262
- for plugin_cls in PluginHolder .plugins .keys ():
263
- method_names = plugin_cls .method_names ()
217
+ method_from_log = data ["protoPayload" ]["methodName" ]
264
218
265
- for supported_method in method_names :
266
- if supported_method .lower () in method_from_log .lower ():
267
- if plugin_cls .is_labeled_on_creation ():
268
- __label_one_0 (data , plugin_cls )
219
+ for plugin_cls in PluginHolder .plugins .keys ():
220
+ method_names = plugin_cls .method_names ()
269
221
270
- plugins_found .append (
271
- plugin_cls .__name__
272
- ) # Append it even if not used due to is_labeled_on_creation False
222
+ for supported_method in method_names :
223
+ if supported_method .lower () in method_from_log .lower ():
224
+ if plugin_cls .is_labeled_on_creation ():
225
+ __label_one_0 (data , plugin_cls )
273
226
274
- if not plugins_found :
275
- logging .info (
276
- "(OK if plugin is disabled.) No plugins found for %s. Enabled plugins are %s" ,
277
- method_from_log ,
278
- config_utils .enabled_plugins (),
279
- )
227
+ plugins_found .append (
228
+ plugin_cls .__name__
229
+ ) # Append it even if not used due to is_labeled_on_creation False
280
230
281
- if len (plugins_found ) > 1 :
282
- raise Exception (
283
- "Error: Multiple plugins found %s for %s"
284
- % (plugins_found , method_from_log )
285
- )
286
- logging .info ("OK for label_one %s" , method_from_log )
287
- # All errors are actually caught before this point,
288
- # since most errors are unrecoverable.
289
- return "OK" , 200
290
- except Exception :
291
- project_id = (
292
- data .get ("resource" , {}).get ("labels" , {}).get ("project_id" )
231
+ if not plugins_found :
232
+ logging .info (
233
+ "(OK if plugin is disabled.) No plugins found for %s. Enabled plugins are %s" ,
234
+ method_from_log ,
235
+ config_utils .enabled_plugins (),
293
236
)
294
- logging .exception ("Error on label_one %s %s" , plugins_found , project_id )
295
- return "Error" , 500
296
237
297
- if use_memray ():
298
- with memray .Tracker (memray_filename ("label_one" )):
299
- return __label_one ()
300
- else :
301
- return __label_one ()
238
+ if len (plugins_found ) > 1 :
239
+ raise Exception (
240
+ "Error: Multiple plugins found %s for %s"
241
+ % (plugins_found , method_from_log )
242
+ )
243
+ logging .info ("OK for label_one %s" , method_from_log )
244
+ # All errors are actually caught before this point,
245
+ # since most errors are unrecoverable.
246
+ return "OK" , 200
247
+ except Exception :
248
+ project_id = data .get ("resource" , {}).get ("labels" , {}).get ("project_id" )
249
+ logging .exception ("Error on label_one %s %s" , plugins_found , project_id )
250
+ return "Error" , 500
302
251
303
252
304
253
def __label_one_0 (data , plugin_cls : Type [Plugin ]):
@@ -355,55 +304,43 @@ def __extract_pubsub_content() -> Dict:
355
304
356
305
@app .route ("/do_label" , methods = ["POST" ])
357
306
def do_label ():
358
- def __do_label ():
359
-
360
- increment_invocation_count ("do_label" )
361
- with gae_memory_logging ("do_label" ):
362
-
363
- """Receive a push message from PubSub, sent from schedule() above,
364
- with instructions to label all objects of a given plugin and project_id.
365
- """
366
- project_id = (
367
- "" # set up variables to allow logging in Exception block at end
368
- )
369
- plugin_class_name = ""
370
- try :
371
- data = __extract_pubsub_content ()
372
- plugin_class_name = data ["plugin" ]
373
-
374
- plugin = PluginHolder .get_plugin_instance_by_name (plugin_class_name )
375
- if not plugin :
307
+ increment_invocation_count ("do_label" )
308
+ with gae_memory_logging ("do_label" ):
309
+
310
+ """Receive a push message from PubSub, sent from schedule() above,
311
+ with instructions to label all objects of a given plugin and project_id.
312
+ """
313
+ project_id = "" # set up variables to allow logging in Exception block at end
314
+ plugin_class_name = ""
315
+ try :
316
+ data = __extract_pubsub_content ()
317
+ plugin_class_name = data ["plugin" ]
318
+
319
+ plugin = PluginHolder .get_plugin_instance_by_name (plugin_class_name )
320
+ if not plugin :
321
+ logging .info (
322
+ "(OK if plugin is disabled.) No plugins found for %s. Enabled plugins are %s" ,
323
+ plugin_class_name ,
324
+ config_utils .enabled_plugins (),
325
+ )
326
+ else :
327
+ project_id = data ["project_id" ]
328
+ with timing (f"do_label { plugin_class_name } { project_id } " ):
376
329
logging .info (
377
- "(OK if plugin is disabled.) No plugins found for %s. Enabled plugins are %s" ,
378
- plugin_class_name ,
379
- config_utils . enabled_plugins () ,
330
+ "do_label() for %s in %s" ,
331
+ plugin . __class__ . __name__ ,
332
+ project_id ,
380
333
)
381
- else :
382
- project_id = data ["project_id" ]
383
- with timing (f"do_label { plugin_class_name } { project_id } " ):
384
- logging .info (
385
- "do_label() for %s in %s" ,
386
- plugin .__class__ .__name__ ,
387
- project_id ,
388
- )
389
- plugin .label_all (project_id )
390
- logging .info ("OK on do_label %s %s" , plugin_class_name , project_id )
391
- # All errors are actually caught before this point, since most errors are unrecoverable.
392
- # However, Subscription gets "InternalServerError"" "InactiveRpcError" on occasion
393
- # so retry could be relevant. B
394
-
395
- return "OK" , 200
396
- except Exception :
397
- logging .exception (
398
- "Error on do_label %s %s" , plugin_class_name , project_id
399
- )
400
- return "Error" , 500
401
-
402
- if use_memray ():
403
- with memray .Tracker (memray_filename ("do_label" )):
404
- return __do_label ()
405
- else :
406
- return __do_label ()
334
+ plugin .label_all (project_id )
335
+ logging .info ("OK on do_label %s %s" , plugin_class_name , project_id )
336
+ # All errors are actually caught before this point, since most errors are unrecoverable.
337
+ # However, Subscription gets "InternalServerError"" "InactiveRpcError" on occasion
338
+ # so retry could be relevant. B
339
+
340
+ return "OK" , 200
341
+ except Exception :
342
+ logging .exception ("Error on do_label %s %s" , plugin_class_name , project_id )
343
+ return "Error" , 500
407
344
408
345
409
346
def __check_pubsub_verification_token ():
0 commit comments