Update ThreadTaskUtils to new threading module

Bug: b/304992619
Change-Id: I3bd6aa8f741cb9558bf36fb7e2ce253404f73a5d
diff --git a/src/main/java/com/android/tools/r8/utils/threads/ThreadTaskUtils.java b/src/main/java/com/android/tools/r8/utils/threads/ThreadTaskUtils.java
index 50dfe03..e65e4d9 100644
--- a/src/main/java/com/android/tools/r8/utils/threads/ThreadTaskUtils.java
+++ b/src/main/java/com/android/tools/r8/utils/threads/ThreadTaskUtils.java
@@ -4,17 +4,15 @@
 
 package com.android.tools.r8.utils.threads;
 
+import com.android.tools.r8.threading.TaskCollection;
 import com.android.tools.r8.utils.ArrayUtils;
 import com.android.tools.r8.utils.InternalOptions;
-import com.android.tools.r8.utils.ThreadUtils;
 import com.android.tools.r8.utils.Timing;
 import com.android.tools.r8.utils.Timing.TimingMerger;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 
 public class ThreadTaskUtils {
 
@@ -25,24 +23,24 @@
       ThreadTask... tasks)
       throws ExecutionException {
     assert tasks.length > 0;
-    List<Future<Void>> futures = new ArrayList<>(tasks.length);
+    TaskCollection<?> taskCollection = new TaskCollection<>(options, executorService);
     if (timingMerger.isEmpty()) {
       for (ThreadTask task : tasks) {
         if (task.shouldRun()) {
-          processTask(executorService, task, futures);
+          processTask(task, taskCollection);
         }
       }
-      ThreadUtils.awaitFutures(futures);
+      taskCollection.await();
     } else {
       List<Timing> timings =
           Arrays.asList(ArrayUtils.filled(new Timing[tasks.length], Timing.empty()));
       int taskIndex = 0;
       for (ThreadTask task : tasks) {
         if (task.shouldRun()) {
-          processTaskWithTiming(executorService, options, task, taskIndex++, futures, timings);
+          processTaskWithTiming(options, task, taskIndex++, taskCollection, timings);
         }
       }
-      ThreadUtils.awaitFutures(futures);
+      taskCollection.await();
       timingMerger.add(timings);
       timingMerger.end();
     }
@@ -53,26 +51,24 @@
     }
   }
 
-  private static void processTask(
-      ExecutorService executorService, ThreadTask task, List<Future<Void>> futures) {
+  private static void processTask(ThreadTask task, TaskCollection<?> taskCollection)
+      throws ExecutionException {
     if (task.shouldRunOnThread()) {
-      ThreadUtils.processAsynchronously(
-          () -> task.runWithRuntimeException(Timing.empty()), executorService, futures);
+      taskCollection.submit(() -> task.runWithRuntimeException(Timing.empty()));
     } else {
       task.runWithRuntimeException(Timing.empty());
     }
   }
 
   private static void processTaskWithTiming(
-      ExecutorService executorService,
       InternalOptions options,
       ThreadTask task,
       int taskIndex,
-      List<Future<Void>> futures,
-      List<Timing> timings) {
+      TaskCollection<?> taskCollection,
+      List<Timing> timings)
+      throws ExecutionException {
     if (task.shouldRunOnThread()) {
-      ThreadUtils.processAsynchronously(
-          () -> executeTask(options, task, taskIndex, timings), executorService, futures);
+      taskCollection.submit(() -> executeTask(options, task, taskIndex, timings));
     } else {
       executeTask(options, task, taskIndex, timings);
     }