blob: 397ebdab0fdffcb69fb1fd8ea61db183b13d9404 [file] [log] [blame]
Christoffer Quist Adamsen65ef2982023-08-24 08:45:39 +02001#!/usr/bin/env python3
2# Copyright (c) 2023, the R8 project authors. Please see the AUTHORS file
3# for details. All rights reserved. Use of this source code is governed by a
4# BSD-style license that can be found in the LICENSE file.
5
6import sys
7import threading
8from threading import Thread
9import traceback
10
Christoffer Quist Adamsen2434a4d2023-10-16 11:29:03 +020011
Christoffer Quist Adamsen65ef2982023-08-24 08:45:39 +020012# A thread that is given a list of jobs. The thread will repeatedly take a job
13# from the list of jobs (which is shared with other threads) and execute it
14# until there is no more jobs.
15#
16# If stop_on_first_failure is True, then the thread will exit upon the first
17# failing job. The thread will then clear the jobs to ensure that all other
18# workers will also terminate after completing there current job.
19#
20# Each job is a lambda that takes the worker_id as an argument. To guarantee
21# termination each job must itself terminate (i.e., each job is responsible for
22# setting an appropriate timeout).
23class WorkerThread(Thread):
24
Christoffer Quist Adamsen2434a4d2023-10-16 11:29:03 +020025 # The initialization of a WorkerThread is never run concurrently with the
26 # initialization of other WorkerThreads.
27 def __init__(self, jobs, jobs_lock, stop_on_first_failure, worker_id):
28 Thread.__init__(self)
29 self.completed = False
30 self.jobs = jobs
31 self.jobs_lock = jobs_lock
32 self.number_of_jobs = len(jobs)
33 self.stop_on_first_failure = stop_on_first_failure
34 self.success = True
35 self.worker_id = worker_id
Christoffer Quist Adamsen65ef2982023-08-24 08:45:39 +020036
Christoffer Quist Adamsen2434a4d2023-10-16 11:29:03 +020037 def run(self):
38 print_thread("Starting worker", self.worker_id)
39 while True:
40 (job, job_id) = self.take_job(self.jobs, self.jobs_lock)
41 if job is None:
42 break
43 try:
44 print_thread(
45 "Starting job %s/%s" % (job_id, self.number_of_jobs),
46 self.worker_id)
47 exit_code = job(self.worker_id)
48 print_thread(
49 "Job %s finished with exit code %s" % (job_id, exit_code),
50 self.worker_id)
51 if exit_code:
52 self.success = False
53 if self.stop_on_first_failure:
54 self.clear_jobs(self.jobs, self.jobs_lock)
55 break
56 except:
57 print_thread("Job %s crashed" % job_id, self.worker_id)
58 print_thread(traceback.format_exc(), self.worker_id)
59 self.success = False
60 if self.stop_on_first_failure:
61 self.clear_jobs(self.jobs, self.jobs_lock)
62 break
63 print_thread("Exiting", self.worker_id)
64 self.completed = True
Christoffer Quist Adamsen65ef2982023-08-24 08:45:39 +020065
Christoffer Quist Adamsen2434a4d2023-10-16 11:29:03 +020066 def take_job(self, jobs, jobs_lock):
67 jobs_lock.acquire()
68 job_id = self.number_of_jobs - len(jobs) + 1
69 job = jobs.pop(0) if jobs else None
70 jobs_lock.release()
71 return (job, job_id)
Christoffer Quist Adamsen65ef2982023-08-24 08:45:39 +020072
Christoffer Quist Adamsen2434a4d2023-10-16 11:29:03 +020073 def clear_jobs(self, jobs, jobs_lock):
74 jobs_lock.acquire()
75 jobs.clear()
76 jobs_lock.release()
77
Christoffer Quist Adamsen65ef2982023-08-24 08:45:39 +020078
79def run_in_parallel(jobs, number_of_workers, stop_on_first_failure):
Christoffer Quist Adamsen2434a4d2023-10-16 11:29:03 +020080 assert number_of_workers > 0
81 if number_of_workers > len(jobs):
82 number_of_workers = len(jobs)
83 if number_of_workers == 1:
84 return run_in_sequence(jobs, stop_on_first_failure)
85 jobs_lock = threading.Lock()
86 threads = []
87 for worker_id in range(1, number_of_workers + 1):
88 threads.append(
89 WorkerThread(jobs, jobs_lock, stop_on_first_failure, worker_id))
90 for thread in threads:
91 thread.start()
92 for thread in threads:
93 thread.join()
94 for thread in threads:
95 if not thread.completed or not thread.success:
96 return 1
97 return 0
98
Christoffer Quist Adamsen65ef2982023-08-24 08:45:39 +020099
100def run_in_sequence(jobs, stop_on_first_failure):
Christoffer Quist Adamsen2434a4d2023-10-16 11:29:03 +0200101 combined_exit_code = 0
102 worker_id = None
103 for job in jobs:
104 try:
105 exit_code = job(worker_id)
106 if exit_code:
107 combined_exit_code = exit_code
108 if stop_on_first_failure:
109 break
110 except:
111 print(traceback.format_exc())
112 combined_exit_code = 1
113 if stop_on_first_failure:
114 break
115 return combined_exit_code
116
Christoffer Quist Adamsen65ef2982023-08-24 08:45:39 +0200117
118def print_thread(msg, worker_id):
Christoffer Quist Adamsen2434a4d2023-10-16 11:29:03 +0200119 if worker_id is None:
120 print(msg)
121 else:
Christoffer Adamsen4c77afa2024-04-11 07:05:28 +0000122 for line in msg.splitlines():
123 print('WORKER %s: %s' % (worker_id, line))