1
1
import argparse
2
+ from concurrent .futures import ThreadPoolExecutor , wait
2
3
import sys
3
4
import traceback
4
5
from collections import defaultdict
@@ -73,7 +74,47 @@ class TravisJobStat:
73
74
duration_s : str
74
75
75
76
77
+ GITHUB_TO_TRAVIS_STATUS_MAP = {
78
+ "action_required" : "created" ,
79
+ "cancelled" : "failed" ,
80
+ "failure" : "failed" ,
81
+ "neutral" : "created" ,
82
+ "success" : "passed" ,
83
+ "skipped" : "failed" ,
84
+ "stale" : "failed" ,
85
+ "timed_out" : "failed" ,
86
+ }
87
+
88
+
76
89
def get_travis_status (commit_sha , cache_dir = "travis_events" ) -> List [TravisJobStat ]:
90
+ def get_gha_status (sha ):
91
+ data = requests .get (
92
+ f"https://api.github.com/repos/ray-project/ray/commits/{ sha } /check-suites" ,
93
+ headers = GH_HEADERS ,
94
+ ).json ()
95
+
96
+ if "check_suites" not in data :
97
+ return None
98
+
99
+ for check in data ["check_suites" ]:
100
+ slug = check ["app" ]["slug" ]
101
+ if slug == "github-actions" and check ["status" ] == "completed" :
102
+ data = requests .get (check ["check_runs_url" ], headers = GH_HEADERS ).json ()
103
+ if len (data ["check_runs" ]) == 0 :
104
+ return None
105
+ run = data ["check_runs" ][0 ]
106
+ return TravisJobStat (
107
+ job_id = run ["id" ],
108
+ os = "windows" ,
109
+ commit = sha ,
110
+ env = "github action main job" ,
111
+ state = GITHUB_TO_TRAVIS_STATUS_MAP [check ["conclusion" ]],
112
+ url = run ["html_url" ],
113
+ duration_s = _parse_duration (
114
+ check .get ("started_at" ), check .get ("completed_at" )
115
+ ),
116
+ )
117
+
77
118
def find_travis_build_id (sha ):
78
119
data = requests .get (
79
120
f"https://api.github.com/repos/ray-project/ray/commits/{ sha } /check-suites" ,
@@ -121,15 +162,23 @@ def list_travis_job_status(build_id):
121
162
status_file = dir_name / "status_complete"
122
163
data_file = dir_name / "status.json"
123
164
if not status_file .exists ():
165
+ statuses = []
166
+ gha_status = get_gha_status (commit_sha )
167
+ if gha_status :
168
+ statuses .append (gha_status )
169
+
124
170
build_id = find_travis_build_id (commit_sha )
125
- if build_id is None :
171
+ if build_id is not None :
172
+ job_status = list_travis_job_status (build_id )
173
+ statuses .extend (job_status )
174
+
175
+ if len (statuses ) == 0 :
126
176
return []
127
- job_status = list_travis_job_status (build_id )
128
177
129
178
with open (data_file , "w" ) as f :
130
- f .write (TravisJobStat .schema ().dumps (job_status , many = True ))
179
+ f .write (TravisJobStat .schema ().dumps (statuses , many = True ))
131
180
132
- job_states = {job .state for job in job_status }
181
+ job_states = {job .state for job in statuses }
133
182
if len (job_states .intersection ({"created" , "started" })) == 0 :
134
183
status_file .touch ()
135
184
@@ -147,28 +196,32 @@ class BuildkiteStatus:
147
196
state : str
148
197
url : str
149
198
commit : str
150
- created_at : Optional [str ]
199
+ startedAt : Optional [str ]
151
200
finished_at : Optional [str ]
152
201
153
202
def get_duration_s (self ) -> int :
154
- return _parse_duration (self .created_at , self .finished_at )
203
+ return _parse_duration (self .startedAt , self .finished_at )
155
204
156
205
157
- def get_buildkite_status () -> List [BuildkiteStatus ]:
206
+ def get_buildkite_status () -> Tuple [List , List [BuildkiteStatus ]]:
207
+ builds = []
158
208
result = []
159
209
page_token = None
160
210
for offset in [0 , 20 , 40 , 60 , 80 , 100 ]:
161
211
if offset == 0 :
162
- chunks , page_token = get_buildkite_status_paginated (f"first: 20" )
212
+ builds_chunk , result_chunks , page_token = get_buildkite_status_paginated (
213
+ f"first: 20"
214
+ )
163
215
else :
164
- chunks , page_token = get_buildkite_status_paginated (
216
+ builds_chunk , result_chunks , page_token = get_buildkite_status_paginated (
165
217
f'first: 20, after: "{ page_token } "'
166
218
)
167
- result .extend (chunks )
168
- return result
219
+ builds .extend (builds_chunk )
220
+ result .extend (result_chunks )
221
+ return builds , result
169
222
170
223
171
- def get_buildkite_status_paginated (page_command ) -> List [ BuildkiteStatus ] :
224
+ def get_buildkite_status_paginated (page_command ):
172
225
BUILDKITE_TOKEN = os .environ ["BUILDKITE_TOKEN" ]
173
226
tries = 5
174
227
for attempt in range (tries ):
@@ -185,6 +238,10 @@ def get_buildkite_status_paginated(page_command) -> List[BuildkiteStatus]:
185
238
}
186
239
edges {
187
240
node {
241
+ createdAt
242
+ startedAt
243
+ finishedAt
244
+ number
188
245
jobs(first: 100) {
189
246
edges {
190
247
node {
@@ -198,6 +255,8 @@ def get_buildkite_status_paginated(page_command) -> List[BuildkiteStatus]:
198
255
commit
199
256
}
200
257
createdAt
258
+ runnableAt
259
+ startedAt
201
260
finishedAt
202
261
artifacts(first: 100) {
203
262
edges {
@@ -232,8 +291,11 @@ def get_buildkite_status_paginated(page_command) -> List[BuildkiteStatus]:
232
291
page_cursor = resp .json ()["data" ]["pipeline" ]["builds" ]["pageInfo" ]["endCursor" ]
233
292
builds = resp .json ()["data" ]["pipeline" ]["builds" ]["edges" ]
234
293
results = []
235
- exception_counter : int = 0
236
- for build in builds :
294
+
295
+ thread_pool = ThreadPoolExecutor (100 )
296
+ futures = []
297
+
298
+ for build in tqdm (builds ):
237
299
jobs = build ["node" ]["jobs" ]["edges" ]
238
300
for job in jobs :
239
301
actual_job = job ["node" ]
@@ -246,7 +308,7 @@ def get_buildkite_status_paginated(page_command) -> List[BuildkiteStatus]:
246
308
state = actual_job ["state" ],
247
309
url = actual_job ["url" ],
248
310
commit = sha ,
249
- created_at = actual_job ["createdAt " ],
311
+ startedAt = actual_job ["startedAt " ],
250
312
finished_at = actual_job ["finishedAt" ],
251
313
)
252
314
results .append (status )
@@ -260,8 +322,8 @@ def get_buildkite_status_paginated(page_command) -> List[BuildkiteStatus]:
260
322
if not os .path .exists (on_disk_path ):
261
323
# Use the artifact link to download the logs. This might not work well
262
324
# on slower internet because the link is presigned S3 url and only last 10min.
263
- try :
264
- resp = requests .get (url )
325
+ def task ( url , on_disk_path , actual_job ) :
326
+ resp = requests .get (url , timeout = 5 )
265
327
assert (
266
328
resp .status_code == 200
267
329
), f"failed to download { url } for { actual_job } : { resp .text } "
@@ -270,15 +332,17 @@ def get_buildkite_status_paginated(page_command) -> List[BuildkiteStatus]:
270
332
os .makedirs (os .path .split (on_disk_path )[0 ], exist_ok = True )
271
333
with open (on_disk_path , "wb" ) as f :
272
334
f .write (resp .content )
273
- except Exception as e :
274
- traceback .print_exc ()
275
- exception_counter += 1
276
335
277
- if exception_counter > 100 :
278
- assert (
279
- False
280
- ), "More than 100 exception as occured when downloading Buldkite status."
281
- return results , page_cursor
336
+ futures .append (
337
+ thread_pool .submit (task , url , on_disk_path , actual_job )
338
+ )
339
+ wait (futures )
340
+ exception_set = [fut for fut in futures if fut .exception ()]
341
+
342
+ if len (exception_set ) > 100 :
343
+ print ("More than 100 exception as occured when downloading Buldkite status." )
344
+ raise exception_set [0 ]
345
+ return builds , results , page_cursor
282
346
283
347
284
348
def download_files_given_prefix (prefix : str ):
@@ -704,7 +768,9 @@ def main():
704
768
print ("Downloading Travis Status" )
705
769
travis_data = [get_travis_status (commit .sha ) for commit in tqdm (commits )]
706
770
print ("Downloading Buildkite Status" )
707
- buildkite_status = get_buildkite_status ()
771
+ raw_builds_result , buildkite_status = get_buildkite_status ()
772
+ with open ("cached_buildkite.json" , "w" ) as f :
773
+ json .dump (raw_builds_result , f )
708
774
709
775
print ("✍️ Writing Data" )
710
776
db = ResultsDB ("./results.db" )
0 commit comments