| // Copyright (c) 2022, the R8 project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| package flow; |
| |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Flow.Publisher; |
| import java.util.concurrent.Flow.Subscriber; |
| import java.util.concurrent.Flow.Subscription; |
| import java.util.concurrent.ForkJoinPool; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Consumer; |
| |
| public class FlowExample { |
| public static void main(String[] args) throws Exception { |
| OneShotPublisher oneShotPublisher = new OneShotPublisher(); |
| SampleSubscriber<Object> subscriber = new SampleSubscriber<>(50L, System.out::println); |
| oneShotPublisher.subscribe(subscriber); |
| oneShotPublisher.awaitPublishing(); |
| } |
| |
| static class OneShotPublisher implements Publisher<Boolean> { |
| |
| private final ForkJoinPool executor = new ForkJoinPool(); // daemon-based |
| |
| private boolean subscribed; // true after first subscribe |
| |
| public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { |
| if (subscribed) { |
| subscriber.onError(new IllegalStateException()); // only one allowed |
| } else { |
| subscribed = true; |
| subscriber.onSubscribe(new OneShotSubscription(subscriber, executor)); |
| } |
| } |
| |
| void awaitPublishing() throws InterruptedException { |
| // On Android 7 and higher, calling System.exit() terminates all threads without waiting for |
| // tasks on the fork join pool to be finished. For the test to be completed we need such |
| // tasks to finish. |
| int seconds = 60; |
| executor.awaitTermination(seconds, TimeUnit.SECONDS); |
| } |
| |
| static class OneShotSubscription implements Subscription { |
| |
| private final Subscriber<? super Boolean> subscriber; |
| private final ExecutorService executor; |
| private Future<?> future; // to allow cancellation |
| private boolean completed; |
| |
| OneShotSubscription(Subscriber<? super Boolean> subscriber, ExecutorService executor) { |
| this.subscriber = subscriber; |
| this.executor = executor; |
| } |
| |
| public synchronized void request(long n) { |
| if (n != 0 && !completed) { |
| completed = true; |
| if (n < 0) { |
| IllegalArgumentException ex = new IllegalArgumentException(); |
| executor.execute(() -> subscriber.onError(ex)); |
| } else { |
| future = |
| executor.submit( |
| () -> { |
| subscriber.onNext(Boolean.TRUE); |
| subscriber.onComplete(); |
| }); |
| } |
| } |
| } |
| |
| public synchronized void cancel() { |
| completed = true; |
| if (future != null) { |
| future.cancel(false); |
| } |
| } |
| } |
| } |
| |
| static class SampleSubscriber<T> implements Subscriber<T> { |
| |
| final Consumer<? super T> consumer; |
| Subscription subscription; |
| final long bufferSize; |
| long count; |
| |
| SampleSubscriber(long bufferSize, Consumer<? super T> consumer) { |
| this.bufferSize = bufferSize; |
| this.consumer = consumer; |
| } |
| |
| public void onSubscribe(Subscription subscription) { |
| long initialRequestSize = bufferSize; |
| count = bufferSize - bufferSize / 2; // re-request when half consumed |
| (this.subscription = subscription).request(initialRequestSize); |
| } |
| |
| public void onNext(T item) { |
| if (--count <= 0) { |
| subscription.request(count = bufferSize - bufferSize / 2); |
| } |
| consumer.accept(item); |
| } |
| |
| public void onError(Throwable ex) { |
| ex.printStackTrace(); |
| } |
| |
| public void onComplete() {} |
| } |
| } |