Use task collections in d8 method processor

Bug: b/304992619
Change-Id: I593ee12426e323802b2ec20dae666e5da287706b
diff --git a/src/main/java/com/android/tools/r8/ir/conversion/D8MethodProcessor.java b/src/main/java/com/android/tools/r8/ir/conversion/D8MethodProcessor.java
index 2229d90..50261b2 100644
--- a/src/main/java/com/android/tools/r8/ir/conversion/D8MethodProcessor.java
+++ b/src/main/java/com/android/tools/r8/ir/conversion/D8MethodProcessor.java
@@ -12,33 +12,27 @@
 import com.android.tools.r8.ir.desugar.CfInstructionDesugaringEventConsumer;
 import com.android.tools.r8.ir.optimize.info.OptimizationFeedbackIgnore;
 import com.android.tools.r8.profile.rewriting.ProfileCollectionAdditions;
-import com.android.tools.r8.utils.ThreadUtils;
+import com.android.tools.r8.threading.SynchronizedTaskCollection;
 import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 
 public class D8MethodProcessor extends MethodProcessor {
 
   private final ProfileCollectionAdditions profileCollectionAdditions;
   private final PrimaryD8L8IRConverter converter;
   private final MethodProcessorEventConsumer eventConsumer;
-  private final ExecutorService executorService;
   private final Set<DexType> scheduled = Sets.newIdentityHashSet();
 
   // Asynchronous method processing actions. These are "terminal" method processing actions in the
   // sense that the method processing is known not to fork any other futures.
-  private final List<Future<?>> terminalFutures = Collections.synchronizedList(new ArrayList<>());
+  private final SynchronizedTaskCollection<?> terminalTasks;
 
   // Asynchronous method processing actions. This list includes both "terminal" and "non-terminal"
   // method processing actions. Thus, before the asynchronous method processing finishes, it may
   // fork the processing of another method.
-  private final List<Future<?>> nonTerminalFutures =
-      Collections.synchronizedList(new ArrayList<>());
+  private final SynchronizedTaskCollection<?> nonTerminalTasks;
 
   private ProcessorContext processorContext;
 
@@ -49,8 +43,9 @@
     this.profileCollectionAdditions = profileCollectionAdditions;
     this.converter = converter;
     this.eventConsumer = MethodProcessorEventConsumer.createForD8(profileCollectionAdditions);
-    this.executorService = executorService;
     this.processorContext = converter.appView.createProcessorContext();
+    this.terminalTasks = new SynchronizedTaskCollection<>(converter.options, executorService);
+    this.nonTerminalTasks = new SynchronizedTaskCollection<>(converter.options, executorService);
   }
 
   public void addScheduled(DexProgramClass clazz) {
@@ -91,16 +86,18 @@
       // The non-synthetic holder is not scheduled. It will be processed once holder is scheduled.
       return;
     }
-    nonTerminalFutures.add(
-        ThreadUtils.processAsynchronously(
-            () ->
-                converter.rewriteNonDesugaredCode(
-                    method,
-                    eventConsumer,
-                    OptimizationFeedbackIgnore.getInstance(),
-                    this,
-                    processorContext.createMethodProcessingContext(method)),
-            executorService));
+    try {
+      nonTerminalTasks.submit(
+          () ->
+              converter.rewriteNonDesugaredCode(
+                  method,
+                  eventConsumer,
+                  OptimizationFeedbackIgnore.getInstance(),
+                  this,
+                  processorContext.createMethodProcessingContext(method)));
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
@@ -115,16 +112,18 @@
     if (method.getDefinition().isAbstract()) {
       return;
     }
-    terminalFutures.add(
-        ThreadUtils.processAsynchronously(
-            () ->
-                converter.rewriteDesugaredCode(
-                    method,
-                    OptimizationFeedbackIgnore.getInstance(),
-                    this,
-                    processorContext.createMethodProcessingContext(method),
-                    MethodConversionOptions.forD8(converter.appView)),
-            executorService));
+    try {
+      terminalTasks.submit(
+          () ->
+              converter.rewriteDesugaredCode(
+                  method,
+                  OptimizationFeedbackIgnore.getInstance(),
+                  this,
+                  processorContext.createMethodProcessingContext(method),
+                  MethodConversionOptions.forD8(converter.appView)));
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public void scheduleDesugaredMethodsForProcessing(Iterable<ProgramMethod> methods) {
@@ -137,21 +136,8 @@
   }
 
   public void awaitMethodProcessing() throws ExecutionException {
-    // Await the non-terminal futures until there are only terminal futures left.
-    while (!nonTerminalFutures.isEmpty()) {
-      List<Future<?>> futuresToAwait;
-      synchronized (nonTerminalFutures) {
-        futuresToAwait = new ArrayList<>(nonTerminalFutures);
-        nonTerminalFutures.clear();
-      }
-      ThreadUtils.awaitFutures(futuresToAwait);
-    }
-
-    // Await the terminal futures. There futures will by design not to fork new method processing.
-    int numberOfTerminalFutures = terminalFutures.size();
-    ThreadUtils.awaitFutures(terminalFutures);
-    assert terminalFutures.size() == numberOfTerminalFutures;
-    terminalFutures.clear();
+    nonTerminalTasks.await();
+    terminalTasks.await();
   }
 
   public void processMethod(
@@ -164,8 +150,8 @@
   }
 
   public boolean verifyNoPendingMethodProcessing() {
-    assert terminalFutures.isEmpty();
-    assert nonTerminalFutures.isEmpty();
+    assert terminalTasks.isEmpty();
+    assert nonTerminalTasks.isEmpty();
     return true;
   }
 }
diff --git a/src/main/java/com/android/tools/r8/threading/SynchronizedTaskCollection.java b/src/main/java/com/android/tools/r8/threading/SynchronizedTaskCollection.java
new file mode 100644
index 0000000..d82886e
--- /dev/null
+++ b/src/main/java/com/android/tools/r8/threading/SynchronizedTaskCollection.java
@@ -0,0 +1,41 @@
+// Copyright (c) 2023, the R8 project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+package com.android.tools.r8.threading;
+
+import com.android.tools.r8.utils.InternalOptions;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+public class SynchronizedTaskCollection<T> extends TaskCollection<T> {
+
+  public SynchronizedTaskCollection(InternalOptions options, ExecutorService executorService) {
+    super(options, executorService);
+  }
+
+  @Override
+  public synchronized void submit(Callable<T> task) throws ExecutionException {
+    super.submit(task);
+  }
+
+  @Override
+  public void await() throws ExecutionException {
+    // Assuming tasks may add new tasks, awaiting all pending tasks must be run in a loop.
+    // The identification of futures is synchronized with submit so that we don't have concurrent
+    // modification of the task list.
+    List<Future<T>> futures;
+    synchronized (this) {
+      futures = getAndClearFutures();
+    }
+    do {
+      getThreadingModule().awaitFutures(futures);
+      synchronized (this) {
+        futures = getAndClearFutures();
+      }
+    } while (!futures.isEmpty());
+  }
+}
diff --git a/src/main/java/com/android/tools/r8/threading/TaskCollection.java b/src/main/java/com/android/tools/r8/threading/TaskCollection.java
index 4fcaaaf..3f7752c 100644
--- a/src/main/java/com/android/tools/r8/threading/TaskCollection.java
+++ b/src/main/java/com/android/tools/r8/threading/TaskCollection.java
@@ -25,7 +25,25 @@
     this.executorService = executorService;
   }
 
-  public <E extends Exception> void submit(ThrowingAction<E> task) throws ExecutionException {
+  public int size() {
+    return futures.size();
+  }
+
+  public boolean isEmpty() {
+    return size() == 0;
+  }
+
+  List<Future<T>> getAndClearFutures() {
+    List<Future<T>> copy = new ArrayList<>(futures);
+    futures.clear();
+    return copy;
+  }
+
+  ThreadingModule getThreadingModule() {
+    return threadingModule;
+  }
+
+  public final <E extends Exception> void submit(ThrowingAction<E> task) throws ExecutionException {
     submit(
         () -> {
           task.execute();
@@ -39,5 +57,6 @@
 
   public void await() throws ExecutionException {
     threadingModule.awaitFutures(futures);
+    futures.clear();
   }
 }