blob: 5810332bc720b828e54406195ad9f7b62f891354 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright (c) 2023, the R8 project authors. Please see the AUTHORS file
# for details. All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
import sys
import threading
from threading import Thread
import traceback
# A thread that is given a list of jobs. The thread will repeatedly take a job
# from the list of jobs (which is shared with other threads) and execute it
# until there is no more jobs.
#
# If stop_on_first_failure is True, then the thread will exit upon the first
# failing job. The thread will then clear the jobs to ensure that all other
# workers will also terminate after completing there current job.
#
# Each job is a lambda that takes the worker_id as an argument. To guarantee
# termination each job must itself terminate (i.e., each job is responsible for
# setting an appropriate timeout).
class WorkerThread(Thread):
# The initialization of a WorkerThread is never run concurrently with the
# initialization of other WorkerThreads.
def __init__(self, jobs, jobs_lock, stop_on_first_failure, worker_id):
Thread.__init__(self)
self.completed = False
self.jobs = jobs
self.jobs_lock = jobs_lock
self.number_of_jobs = len(jobs)
self.stop_on_first_failure = stop_on_first_failure
self.success = True
self.worker_id = worker_id
def run(self):
print_thread("Starting worker", self.worker_id)
while True:
(job, job_id) = self.take_job(self.jobs, self.jobs_lock)
if job is None:
break
try:
print_thread(
"Starting job %s/%s" % (job_id, self.number_of_jobs),
self.worker_id)
exit_code = job(self.worker_id)
print_thread(
"Job %s finished with exit code %s"
% (job_id, exit_code),
self.worker_id)
if exit_code:
self.success = False
if self.stop_on_first_failure:
self.clear_jobs(self.jobs, self.jobs_lock)
break
except:
print_thread("Job %s crashed" % job_id, self.worker_id)
print_thread(traceback.format_exc(), self.worker_id)
self.success = False
if self.stop_on_first_failure:
self.clear_jobs(self.jobs, self.jobs_lock)
break
print_thread("Exiting", self.worker_id)
self.completed = True
def take_job(self, jobs, jobs_lock):
jobs_lock.acquire()
job_id = self.number_of_jobs - len(jobs) + 1
job = jobs.pop(0) if jobs else None
jobs_lock.release()
return (job, job_id)
def clear_jobs(self, jobs, jobs_lock):
jobs_lock.acquire()
jobs.clear()
jobs_lock.release()
def run_in_parallel(jobs, number_of_workers, stop_on_first_failure):
assert number_of_workers > 0
if number_of_workers > len(jobs):
number_of_workers = len(jobs)
if number_of_workers == 1:
return run_in_sequence(jobs, stop_on_first_failure)
jobs_lock = threading.Lock()
threads = []
for worker_id in range(1, number_of_workers + 1):
threads.append(
WorkerThread(jobs, jobs_lock, stop_on_first_failure, worker_id))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for thread in threads:
if not thread.completed or not thread.success:
return 1
return 0
def run_in_sequence(jobs, stop_on_first_failure):
combined_exit_code = 0
worker_id = None
for job in jobs:
try:
exit_code = job(worker_id)
if exit_code:
combined_exit_code = exit_code
if stop_on_first_failure:
break
except:
print(traceback.format_exc())
combined_exit_code = 1
if stop_on_first_failure:
break
return combined_exit_code
def print_thread(msg, worker_id):
if worker_id is None:
print(msg)
else:
print('WORKER %s: %s' % (worker_id, msg))