Parallelize run_on_app.py

Bug: b/297302759
Change-Id: I454286c5d673c7cec41f14f0acad89f1842122cd
diff --git a/tools/thread_utils.py b/tools/thread_utils.py
new file mode 100755
index 0000000..28fd348
--- /dev/null
+++ b/tools/thread_utils.py
@@ -0,0 +1,116 @@
+#!/usr/bin/env python3
+# Copyright (c) 2023, the R8 project authors. Please see the AUTHORS file
+# for details. All rights reserved. Use of this source code is governed by a
+# BSD-style license that can be found in the LICENSE file.
+
+import sys
+import threading
+from threading import Thread
+import traceback
+
+# A thread that is given a list of jobs. The thread will repeatedly take a job
+# from the list of jobs (which is shared with other threads) and execute it
+# until there is no more jobs.
+#
+# If stop_on_first_failure is True, then the thread will exit upon the first
+# failing job. The thread will then clear the jobs to ensure that all other
+# workers will also terminate after completing there current job.
+#
+# Each job is a lambda that takes the worker_id as an argument. To guarantee
+# termination each job must itself terminate (i.e., each job is responsible for
+# setting an appropriate timeout).
+class WorkerThread(Thread):
+
+  # The initialization of a WorkerThread is never run concurrently with the
+  # initialization of other WorkerThreads.
+  def __init__(self, jobs, jobs_lock, stop_on_first_failure, worker_id):
+    Thread.__init__(self)
+    self.jobs = jobs
+    self.jobs_lock = jobs_lock
+    self.number_of_jobs = len(jobs)
+    self.stop_on_first_failure = stop_on_first_failure
+    self.success = True
+    self.worker_id = worker_id
+
+  def run(self):
+    print_thread("Starting worker", self.worker_id)
+    while True:
+      (job, job_id) = self.take_job(self.jobs, self.jobs_lock)
+      if job is None:
+        break
+      try:
+        print_thread(
+            "Starting job %s/%s" % (job_id, self.number_of_jobs),
+            self.worker_id)
+        exit_code = job(self.worker_id)
+        print_thread(
+            "Job %s finished with exit code %s"
+                % (job_id, exit_code),
+            self.worker_id)
+        if exit_code:
+          self.success = False
+          if self.stop_on_first_failure:
+            self.clear_jobs(jobs, jobs_lock)
+            break
+      except:
+        print_thread("Job %s crashed" % job_id, self.worker_id)
+        print_thread(traceback.format_exc(), self.worker_id)
+        self.success = False
+        if self.stop_on_first_failure:
+          self.clear_jobs(jobs, jobs_lock)
+          break
+    print_thread("Exiting", self.worker_id)
+
+  def take_job(self, jobs, jobs_lock):
+    jobs_lock.acquire()
+    job_id = self.number_of_jobs - len(jobs) + 1
+    job = jobs.pop(0) if jobs else None
+    jobs_lock.release()
+    return (job, job_id)
+
+  def clear_jobs(self, jobs, jobs_lock):
+    jobs_lock.acquire()
+    jobs.clear()
+    jobs_lock.release()
+
+def run_in_parallel(jobs, number_of_workers, stop_on_first_failure):
+  assert number_of_workers > 0
+  if number_of_workers > len(jobs):
+    number_of_workers = len(jobs)
+  if number_of_workers == 1:
+    return run_in_sequence(jobs, stop_on_first_failure)
+  jobs_lock = threading.Lock()
+  threads = []
+  for worker_id in range(1, number_of_workers + 1):
+    threads.append(
+        WorkerThread(jobs, jobs_lock, stop_on_first_failure, worker_id))
+  for thread in threads:
+    thread.start()
+  for thread in threads:
+    thread.join()
+  for thread in threads:
+    if not thread.success:
+      return 1
+  return 0
+
+def run_in_sequence(jobs, stop_on_first_failure):
+  combined_exit_code = 0
+  worker_id = None
+  for job in jobs:
+    try:
+      exit_code = job(worker_id)
+      if exit_code:
+        combined_exit_code = exit_code
+        if stop_on_first_failure:
+          break
+    except:
+      print(traceback.format_exc())
+      combined_exit_code = 1
+      if stop_on_first_failure:
+        break
+  return combined_exit_code
+
+def print_thread(msg, worker_id):
+  if worker_id is None:
+    print(msg)
+  print('WORKER %s: %s' % (worker_id, msg))
\ No newline at end of file