Christoffer Quist Adamsen | 65ef298 | 2023-08-24 08:45:39 +0200 | [diff] [blame] | 1 | #!/usr/bin/env python3 |
| 2 | # Copyright (c) 2023, the R8 project authors. Please see the AUTHORS file |
| 3 | # for details. All rights reserved. Use of this source code is governed by a |
| 4 | # BSD-style license that can be found in the LICENSE file. |
| 5 | |
| 6 | import sys |
| 7 | import threading |
| 8 | from threading import Thread |
| 9 | import traceback |
| 10 | |
Christoffer Quist Adamsen | 2434a4d | 2023-10-16 11:29:03 +0200 | [diff] [blame] | 11 | |
Christoffer Quist Adamsen | 65ef298 | 2023-08-24 08:45:39 +0200 | [diff] [blame] | 12 | # A thread that is given a list of jobs. The thread will repeatedly take a job |
| 13 | # from the list of jobs (which is shared with other threads) and execute it |
| 14 | # until there is no more jobs. |
| 15 | # |
| 16 | # If stop_on_first_failure is True, then the thread will exit upon the first |
| 17 | # failing job. The thread will then clear the jobs to ensure that all other |
| 18 | # workers will also terminate after completing there current job. |
| 19 | # |
| 20 | # Each job is a lambda that takes the worker_id as an argument. To guarantee |
| 21 | # termination each job must itself terminate (i.e., each job is responsible for |
| 22 | # setting an appropriate timeout). |
| 23 | class WorkerThread(Thread): |
| 24 | |
Christoffer Quist Adamsen | 2434a4d | 2023-10-16 11:29:03 +0200 | [diff] [blame] | 25 | # The initialization of a WorkerThread is never run concurrently with the |
| 26 | # initialization of other WorkerThreads. |
| 27 | def __init__(self, jobs, jobs_lock, stop_on_first_failure, worker_id): |
| 28 | Thread.__init__(self) |
| 29 | self.completed = False |
| 30 | self.jobs = jobs |
| 31 | self.jobs_lock = jobs_lock |
| 32 | self.number_of_jobs = len(jobs) |
| 33 | self.stop_on_first_failure = stop_on_first_failure |
| 34 | self.success = True |
| 35 | self.worker_id = worker_id |
Christoffer Quist Adamsen | 65ef298 | 2023-08-24 08:45:39 +0200 | [diff] [blame] | 36 | |
Christoffer Quist Adamsen | 2434a4d | 2023-10-16 11:29:03 +0200 | [diff] [blame] | 37 | def run(self): |
| 38 | print_thread("Starting worker", self.worker_id) |
| 39 | while True: |
| 40 | (job, job_id) = self.take_job(self.jobs, self.jobs_lock) |
| 41 | if job is None: |
| 42 | break |
| 43 | try: |
| 44 | print_thread( |
| 45 | "Starting job %s/%s" % (job_id, self.number_of_jobs), |
| 46 | self.worker_id) |
| 47 | exit_code = job(self.worker_id) |
| 48 | print_thread( |
| 49 | "Job %s finished with exit code %s" % (job_id, exit_code), |
| 50 | self.worker_id) |
| 51 | if exit_code: |
| 52 | self.success = False |
| 53 | if self.stop_on_first_failure: |
| 54 | self.clear_jobs(self.jobs, self.jobs_lock) |
| 55 | break |
| 56 | except: |
| 57 | print_thread("Job %s crashed" % job_id, self.worker_id) |
| 58 | print_thread(traceback.format_exc(), self.worker_id) |
| 59 | self.success = False |
| 60 | if self.stop_on_first_failure: |
| 61 | self.clear_jobs(self.jobs, self.jobs_lock) |
| 62 | break |
| 63 | print_thread("Exiting", self.worker_id) |
| 64 | self.completed = True |
Christoffer Quist Adamsen | 65ef298 | 2023-08-24 08:45:39 +0200 | [diff] [blame] | 65 | |
Christoffer Quist Adamsen | 2434a4d | 2023-10-16 11:29:03 +0200 | [diff] [blame] | 66 | def take_job(self, jobs, jobs_lock): |
| 67 | jobs_lock.acquire() |
| 68 | job_id = self.number_of_jobs - len(jobs) + 1 |
| 69 | job = jobs.pop(0) if jobs else None |
| 70 | jobs_lock.release() |
| 71 | return (job, job_id) |
Christoffer Quist Adamsen | 65ef298 | 2023-08-24 08:45:39 +0200 | [diff] [blame] | 72 | |
Christoffer Quist Adamsen | 2434a4d | 2023-10-16 11:29:03 +0200 | [diff] [blame] | 73 | def clear_jobs(self, jobs, jobs_lock): |
| 74 | jobs_lock.acquire() |
| 75 | jobs.clear() |
| 76 | jobs_lock.release() |
| 77 | |
Christoffer Quist Adamsen | 65ef298 | 2023-08-24 08:45:39 +0200 | [diff] [blame] | 78 | |
| 79 | def run_in_parallel(jobs, number_of_workers, stop_on_first_failure): |
Christoffer Quist Adamsen | 2434a4d | 2023-10-16 11:29:03 +0200 | [diff] [blame] | 80 | assert number_of_workers > 0 |
| 81 | if number_of_workers > len(jobs): |
| 82 | number_of_workers = len(jobs) |
| 83 | if number_of_workers == 1: |
| 84 | return run_in_sequence(jobs, stop_on_first_failure) |
| 85 | jobs_lock = threading.Lock() |
| 86 | threads = [] |
| 87 | for worker_id in range(1, number_of_workers + 1): |
| 88 | threads.append( |
| 89 | WorkerThread(jobs, jobs_lock, stop_on_first_failure, worker_id)) |
| 90 | for thread in threads: |
| 91 | thread.start() |
| 92 | for thread in threads: |
| 93 | thread.join() |
| 94 | for thread in threads: |
| 95 | if not thread.completed or not thread.success: |
| 96 | return 1 |
| 97 | return 0 |
| 98 | |
Christoffer Quist Adamsen | 65ef298 | 2023-08-24 08:45:39 +0200 | [diff] [blame] | 99 | |
| 100 | def run_in_sequence(jobs, stop_on_first_failure): |
Christoffer Quist Adamsen | 2434a4d | 2023-10-16 11:29:03 +0200 | [diff] [blame] | 101 | combined_exit_code = 0 |
| 102 | worker_id = None |
| 103 | for job in jobs: |
| 104 | try: |
| 105 | exit_code = job(worker_id) |
| 106 | if exit_code: |
| 107 | combined_exit_code = exit_code |
| 108 | if stop_on_first_failure: |
| 109 | break |
| 110 | except: |
| 111 | print(traceback.format_exc()) |
| 112 | combined_exit_code = 1 |
| 113 | if stop_on_first_failure: |
| 114 | break |
| 115 | return combined_exit_code |
| 116 | |
Christoffer Quist Adamsen | 65ef298 | 2023-08-24 08:45:39 +0200 | [diff] [blame] | 117 | |
| 118 | def print_thread(msg, worker_id): |
Christoffer Quist Adamsen | 2434a4d | 2023-10-16 11:29:03 +0200 | [diff] [blame] | 119 | if worker_id is None: |
| 120 | print(msg) |
| 121 | else: |
Christoffer Adamsen | 4c77afa | 2024-04-11 07:05:28 +0000 | [diff] [blame] | 122 | for line in msg.splitlines(): |
| 123 | print('WORKER %s: %s' % (worker_id, line)) |