blob: 89f259bc7aed4e8b386d974cafae50e7fdf26d64 [file] [log] [blame]
// Copyright (c) 2016, the R8 project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
package com.android.tools.r8.utils;
import static com.google.common.base.Predicates.alwaysTrue;
import com.android.tools.r8.graph.AppView;
import com.android.tools.r8.graph.ProgramMethod;
import com.android.tools.r8.threading.TaskCollection;
import com.android.tools.r8.threading.ThreadingModule;
import com.android.tools.r8.utils.ListUtils.ReferenceAndIntConsumer;
import com.android.tools.r8.utils.collections.DexClassAndMemberMap;
import com.android.tools.r8.utils.timing.Timing;
import com.android.tools.r8.utils.timing.TimingMerger;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
import java.util.function.Predicate;
public class ThreadUtils {
public enum WorkLoad {
// The threshold for HEAVY is basically just a fan-out when we have two items to process.
HEAVY(2),
// The threshold for LIGHT has been found by running TiviIncremental benchmark in different
// configurations. For partitioning inputs in buckets of 3 and use threading on 4 or more was
// slightly better than threading on 3:
// Buckets of 3 with threshold 4:
// TiviIncrementalLibrary(RunTimeRaw): 28076 ms
// TiviIncrementalMerge(RunTimeRaw): 1429 ms
// TiviIncrementalProgram(RunTimeRaw): 26374 ms
// Buckets of 3 with threshold 3:
// TiviIncrementalLibrary(RunTimeRaw): 30347 ms
// TiviIncrementalMerge(RunTimeRaw): 1558 ms
// TiviIncrementalProgram(RunTimeRaw): 26638 ms
LIGHT(4);
private final int threshold;
WorkLoad(int threshold) {
this.threshold = threshold;
}
public int getThreshold() {
return threshold;
}
}
public static final int NOT_SPECIFIED = -1;
public static <T, R, E extends Exception> Collection<R> processItemsWithResults(
Iterable<T> items,
ThrowingFunction<T, R, E> consumer,
ThreadingModule threadingModule,
ExecutorService executorService)
throws ExecutionException {
return processItemsWithResults(
items, (item, i) -> consumer.apply(item), threadingModule, executorService);
}
public static <T, R, E extends Exception> Collection<R> processItemsWithResultsThatMatches(
Iterable<T> items,
ThrowingFunction<T, R, E> consumer,
Predicate<R> predicate,
ThreadingModule threadingModule,
ExecutorService executorService)
throws ExecutionException {
return processItemsWithResultsThatMatches(
items, (item, i) -> consumer.apply(item), predicate, threadingModule, executorService);
}
public static <T, R, E extends Exception> Collection<R> processItemsWithResults(
Iterable<T> items,
ThrowingReferenceIntFunction<T, R, E> consumer,
ThreadingModule threadingModule,
ExecutorService executorService)
throws ExecutionException {
return processItemsWithResults(items::forEach, consumer, threadingModule, executorService);
}
public static <T, R, E extends Exception> Collection<R> processItemsWithResultsThatMatches(
Iterable<T> items,
ThrowingReferenceIntFunction<T, R, E> consumer,
Predicate<R> predicate,
ThreadingModule threadingModule,
ExecutorService executorService)
throws ExecutionException {
return processItemsWithResultsThatMatches(
items::forEach, consumer, predicate, threadingModule, executorService);
}
public static <T, R, E extends Exception> Collection<R> processItemsWithResults(
ForEachable<T> items,
ThrowingReferenceIntFunction<T, R, E> consumer,
ThreadingModule threadingModule,
ExecutorService executorService)
throws ExecutionException {
return processItemsWithResultsThatMatches(
items, consumer, null, threadingModule, executorService);
}
public static <T, R, E extends Exception> Collection<R> processItemsWithResultsThatMatches(
ForEachable<T> items,
ThrowingReferenceIntFunction<T, R, E> consumer,
Predicate<R> predicate,
ThreadingModule threadingModule,
ExecutorService executorService)
throws ExecutionException {
TaskCollection<R> tasks = new TaskCollection<>(threadingModule, executorService);
try {
items.forEachWithIndex(
(index, item) -> tasks.submitUnchecked(() -> consumer.apply(item, index)));
} catch (UncheckedExecutionException e) {
throw e.rethrow();
}
return tasks.awaitWithResults(predicate);
}
public static <T, E extends Exception> void processItemsThatMatches(
Collection<T> items,
Predicate<T> itemPredicate,
ThrowingBiConsumer<T, Timing, E> consumer,
InternalOptions options,
ExecutorService executorService,
Timing timing,
TimingMerger timingMerger)
throws ExecutionException {
processItemsThatMatches(
items,
itemPredicate,
consumer,
options,
executorService,
timing,
timingMerger,
(i, item) -> "Task " + i);
}
public static <T, E extends Exception> void processItemsThatMatches(
Collection<T> items,
Predicate<T> itemPredicate,
ThrowingBiConsumer<T, Timing, E> consumer,
InternalOptions options,
ExecutorService executorService,
Timing timing,
TimingMerger timingMerger,
IntObjToObjFunction<T, String> taskNameFunction)
throws ExecutionException {
TaskCollection<Timing> tasks =
new TaskCollection<>(options.getThreadingModule(), executorService);
int i = 0;
for (T item : items) {
if (itemPredicate.test(item)) {
String taskName = taskNameFunction.apply(i, item);
tasks.submitUnchecked(
() -> {
Timing threadTiming = timing.createThreadTiming(taskName, options);
consumer.accept(item, threadTiming);
return threadTiming.end();
});
}
i++;
}
Collection<Timing> timings = tasks.awaitWithResults(alwaysTrue());
timingMerger.add(timings);
timingMerger.end();
}
public static <T> void processItems(
Collection<T> items,
Consumer<T> consumer,
ThreadingModule threadingModule,
ExecutorService executorService)
throws ExecutionException {
processItems(
items,
(item, i) -> consumer.accept(item),
threadingModule,
executorService,
WorkLoad.LIGHT);
}
public static <T> void processItems(
Collection<T> items,
ReferenceAndIntConsumer<T> consumer,
ThreadingModule threadingModule,
ExecutorService executorService,
WorkLoad workLoad)
throws ExecutionException {
if (items.size() >= workLoad.getThreshold()) {
processItems(items::forEach, consumer::accept, threadingModule, executorService);
} else {
int counter = 0;
for (T item : items) {
consumer.accept(item, counter++);
}
}
}
public static <T, E extends Exception> void processItems(
ForEachable<T> items,
ThrowingConsumer<T, E> consumer,
ThreadingModule threadingModule,
ExecutorService executorService)
throws ExecutionException {
processItems(items, (item, i) -> consumer.accept(item), threadingModule, executorService);
}
public static <T, E extends Exception> void processItems(
ForEachable<T> items,
ThrowingReferenceIntConsumer<T, E> consumer,
ThreadingModule threadingModule,
ExecutorService executorService)
throws ExecutionException {
processItemsWithResults(
items,
(item, i) -> {
consumer.accept(item, i);
return null;
},
threadingModule,
executorService);
}
public static <T, U, E extends Exception> void processMap(
Map<T, U> items,
ThrowingBiConsumer<T, U, E> consumer,
ThreadingModule threadingModule,
ExecutorService executorService)
throws ExecutionException {
processMapWithResults(
items,
(key, value) -> {
consumer.accept(key, value);
return null;
},
threadingModule,
executorService);
}
public static <T, U, R, E extends Exception> Collection<R> processMapWithResults(
Map<T, U> items,
ThrowingBiFunction<T, U, R, E> consumer,
ThreadingModule threadingModule,
ExecutorService executorService)
throws ExecutionException {
if (items instanceof DexClassAndMemberMap) {
return processItemsWithResults(
(Consumer<T> keyConsumer) -> items.forEach((k, v) -> keyConsumer.accept(k)),
(key, index) -> consumer.apply(key, items.get(key)),
threadingModule,
executorService);
}
return processItemsWithResults(
items.entrySet(),
entry -> consumer.apply(entry.getKey(), entry.getValue()),
threadingModule,
executorService);
}
public static <E extends Exception> void processMethods(
AppView<?> appView,
ThrowingConsumer<ProgramMethod, E> consumer,
ThreadingModule threadingModule,
ExecutorService executorService)
throws ExecutionException {
processItems(
appView.appInfo().classes(),
clazz -> clazz.forEachProgramMethod(consumer::acceptWithRuntimeException),
threadingModule,
executorService);
}
static ExecutorService getExecutorServiceForProcessors(
int processors, ThreadingModule threadingModule) {
return getExecutorServiceForThreads(getThreadPoolSize(processors), threadingModule);
}
/**
* Returns the thread pool size to use. A bit simplified, we use #cpus as thread pool size for
* machines with <=16 cpus, and #cpus/2 as thread pool size for machines with more cpus. We at
* most use a thread pool size of 48.
*
* <p>Using #cpus as thread pool size for machines with >16 cpus showed improved performance on
* small sample apps, but worse performance on non-trivial apps.
*
* <p>Using a thread pool size >48 generally seemed to regress build speed, most likely due to
* increased synchronization/lock contention in R8.
*/
private static int getThreadPoolSize(int processors) {
if (processors <= 16) {
return processors;
} else {
// Half of the cpus may be <= 16.
// Don't allocate a smaller thread pool than we would have for a machine with fewer cpus.
int threadPoolSize = 16 + (int) Math.round((processors - 16) / 2.0);
// We do not use more than 48 threads since this appears to lead to higher contention.
return Math.min(threadPoolSize, 48);
}
}
static ExecutorService getExecutorServiceForThreads(
int threads, ThreadingModule threadingModule) {
return threadingModule.createThreadedExecutorService(threads);
}
public static ExecutorService getExecutorService(int threads, ThreadingModule threadingModule) {
return threads == NOT_SPECIFIED
? getExecutorServiceForProcessors(
Runtime.getRuntime().availableProcessors(), threadingModule)
: getExecutorServiceForThreads(threads, threadingModule);
}
public static ExecutorService getExecutorService(InternalOptions options) {
return getExecutorService(options.threadCount, options.getThreadingModule());
}
public static int getNumberOfThreads(ExecutorService service) {
return getNumberOfThreadsOrDefault(service, -1);
}
public static int getNumberOfThreadsOrDefault(ExecutorService service, int defaultValue) {
if (service instanceof ForkJoinPool) {
return ((ForkJoinPool) service).getParallelism();
}
return defaultValue;
}
}