blob: caa0c24a37cdd6e404236dc2fc1491c21bdec3b4 [file] [log] [blame]
// Copyright (c) 2022, the R8 project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
package flow;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class FlowExample {
public static void main(String[] args) throws Exception {
OneShotPublisher oneShotPublisher = new OneShotPublisher();
SampleSubscriber<Object> subscriber = new SampleSubscriber<>(50L, System.out::println);
oneShotPublisher.subscribe(subscriber);
oneShotPublisher.awaitPublishing();
}
public static class OneShotPublisher implements Publisher<Boolean> {
private final ForkJoinPool executor = new ForkJoinPool(); // daemon-based
private boolean subscribed; // true after first subscribe
public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
if (subscribed) {
subscriber.onError(new IllegalStateException()); // only one allowed
} else {
subscribed = true;
subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
}
}
void awaitPublishing() throws InterruptedException {
// On Android 7 and higher, calling System.exit() terminates all threads without waiting for
// tasks on the fork join pool to be finished. For the test to be completed we need such
// tasks to finish.
int seconds = 60;
executor.awaitTermination(seconds, TimeUnit.SECONDS);
}
static class OneShotSubscription implements Subscription {
private final Subscriber<? super Boolean> subscriber;
private final ExecutorService executor;
private Future<?> future; // to allow cancellation
private boolean completed;
OneShotSubscription(Subscriber<? super Boolean> subscriber, ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (n != 0 && !completed) {
completed = true;
if (n < 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
future =
executor.submit(
() -> {
subscriber.onNext(Boolean.TRUE);
subscriber.onComplete();
});
}
}
}
public synchronized void cancel() {
completed = true;
if (future != null) {
future.cancel(false);
}
}
}
}
static class SampleSubscriber<T> implements Subscriber<T> {
final Consumer<? super T> consumer;
Subscription subscription;
final long bufferSize;
long count;
SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
this.bufferSize = bufferSize;
this.consumer = consumer;
}
public void onSubscribe(Subscription subscription) {
long initialRequestSize = bufferSize;
count = bufferSize - bufferSize / 2; // re-request when half consumed
(this.subscription = subscription).request(initialRequestSize);
}
public void onNext(T item) {
if (--count <= 0) {
subscription.request(count = bufferSize - bufferSize / 2);
}
consumer.accept(item);
}
public void onError(Throwable ex) {
ex.printStackTrace();
}
public void onComplete() {}
}
}