/************************************************************************ * Licensed under Public Domain (CC0) * * * * To the extent possible under law, the person who associated CC0 with * * this code has waived all copyright and related or neighboring * * rights to this code. * * * * You should have received a copy of the CC0 legalcode along with this * * work. If not, see .* ************************************************************************/ package org.reactivestreams; import java.util.concurrent.Flow; import static java.util.Objects.requireNonNull; /** * Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API. */ public final class FlowAdapters { /** Utility class. */ private FlowAdapters() { throw new IllegalStateException("No instances!"); } /** * Converts a Flow Publisher into a Reactive Streams Publisher. * @param the element type * @param flowPublisher the source Flow Publisher to convert * @return the equivalent Reactive Streams Publisher */ @SuppressWarnings("unchecked") public static org.reactivestreams.Publisher toPublisher( Flow.Publisher flowPublisher) { requireNonNull(flowPublisher, "flowPublisher"); final org.reactivestreams.Publisher publisher; if (flowPublisher instanceof FlowPublisherFromReactive) { publisher = (org.reactivestreams.Publisher)(((FlowPublisherFromReactive)flowPublisher).reactiveStreams); } else if (flowPublisher instanceof org.reactivestreams.Publisher) { publisher = (org.reactivestreams.Publisher)flowPublisher; } else { publisher = new ReactivePublisherFromFlow(flowPublisher); } return publisher; } /** * Converts a Reactive Streams Publisher into a Flow Publisher. * @param the element type * @param reactiveStreamsPublisher the source Reactive Streams Publisher to convert * @return the equivalent Flow Publisher */ @SuppressWarnings("unchecked") public static Flow.Publisher toFlowPublisher( org.reactivestreams.Publisher reactiveStreamsPublisher ) { requireNonNull(reactiveStreamsPublisher, "reactiveStreamsPublisher"); final Flow.Publisher flowPublisher; if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) { flowPublisher = (Flow.Publisher)(((ReactivePublisherFromFlow)reactiveStreamsPublisher).flow); } else if (reactiveStreamsPublisher instanceof Flow.Publisher) { flowPublisher = (Flow.Publisher)reactiveStreamsPublisher; } else { flowPublisher = new FlowPublisherFromReactive(reactiveStreamsPublisher); } return flowPublisher; } /** * Converts a Flow Processor into a Reactive Streams Processor. * @param the input value type * @param the output value type * @param flowProcessor the source Flow Processor to convert * @return the equivalent Reactive Streams Processor */ @SuppressWarnings("unchecked") public static org.reactivestreams.Processor toProcessor( Flow.Processor flowProcessor ) { requireNonNull(flowProcessor, "flowProcessor"); final org.reactivestreams.Processor processor; if (flowProcessor instanceof FlowToReactiveProcessor) { processor = (org.reactivestreams.Processor)(((FlowToReactiveProcessor)flowProcessor).reactiveStreams); } else if (flowProcessor instanceof org.reactivestreams.Processor) { processor = (org.reactivestreams.Processor)flowProcessor; } else { processor = new ReactiveToFlowProcessor(flowProcessor); } return processor; } /** * Converts a Reactive Streams Processor into a Flow Processor. * @param the input value type * @param the output value type * @param reactiveStreamsProcessor the source Reactive Streams Processor to convert * @return the equivalent Flow Processor */ @SuppressWarnings("unchecked") public static Flow.Processor toFlowProcessor( org.reactivestreams.Processor reactiveStreamsProcessor ) { requireNonNull(reactiveStreamsProcessor, "reactiveStreamsProcessor"); final Flow.Processor flowProcessor; if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) { flowProcessor = (Flow.Processor)(((ReactiveToFlowProcessor)reactiveStreamsProcessor).flow); } else if (reactiveStreamsProcessor instanceof Flow.Processor) { flowProcessor = (Flow.Processor)reactiveStreamsProcessor; } else { flowProcessor = new FlowToReactiveProcessor(reactiveStreamsProcessor); } return flowProcessor; } /** * Converts a Reactive Streams Subscriber into a Flow Subscriber. * @param the input and output value type * @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert * @return the equivalent Flow Subscriber */ @SuppressWarnings("unchecked") public static Flow.Subscriber toFlowSubscriber(org.reactivestreams.Subscriber reactiveStreamsSubscriber) { requireNonNull(reactiveStreamsSubscriber, "reactiveStreamsSubscriber"); final Flow.Subscriber flowSubscriber; if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) { flowSubscriber = (Flow.Subscriber)((ReactiveToFlowSubscriber)reactiveStreamsSubscriber).flow; } else if (reactiveStreamsSubscriber instanceof Flow.Subscriber) { flowSubscriber = (Flow.Subscriber)reactiveStreamsSubscriber; } else { flowSubscriber = new FlowToReactiveSubscriber(reactiveStreamsSubscriber); } return flowSubscriber; } /** * Converts a Flow Subscriber into a Reactive Streams Subscriber. * @param the input and output value type * @param flowSubscriber the Flow Subscriber instance to convert * @return the equivalent Reactive Streams Subscriber */ @SuppressWarnings("unchecked") public static org.reactivestreams.Subscriber toSubscriber(Flow.Subscriber flowSubscriber) { requireNonNull(flowSubscriber, "flowSubscriber"); final org.reactivestreams.Subscriber subscriber; if (flowSubscriber instanceof FlowToReactiveSubscriber) { subscriber = (org.reactivestreams.Subscriber)((FlowToReactiveSubscriber)flowSubscriber).reactiveStreams; } else if (flowSubscriber instanceof org.reactivestreams.Subscriber) { subscriber = (org.reactivestreams.Subscriber)flowSubscriber; } else { subscriber = new ReactiveToFlowSubscriber(flowSubscriber); } return subscriber; } /** * Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription. */ static final class FlowToReactiveSubscription implements Flow.Subscription { final org.reactivestreams.Subscription reactiveStreams; public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) { this.reactiveStreams = reactive; } @Override public void request(long n) { reactiveStreams.request(n); } @Override public void cancel() { reactiveStreams.cancel(); } } /** * Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription. */ static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription { final Flow.Subscription flow; public ReactiveToFlowSubscription(Flow.Subscription flow) { this.flow = flow; } @Override public void request(long n) { flow.request(n); } @Override public void cancel() { flow.cancel(); } } /** * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it. * @param the element type */ static final class FlowToReactiveSubscriber implements Flow.Subscriber { final org.reactivestreams.Subscriber reactiveStreams; public FlowToReactiveSubscriber(org.reactivestreams.Subscriber reactive) { this.reactiveStreams = reactive; } @Override public void onSubscribe(Flow.Subscription subscription) { reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); } @Override public void onNext(T item) { reactiveStreams.onNext(item); } @Override public void onError(Throwable throwable) { reactiveStreams.onError(throwable); } @Override public void onComplete() { reactiveStreams.onComplete(); } } /** * Wraps a Flow Subscriber and forwards methods of the Reactive Streams Subscriber to it. * @param the element type */ static final class ReactiveToFlowSubscriber implements org.reactivestreams.Subscriber { final Flow.Subscriber flow; public ReactiveToFlowSubscriber(Flow.Subscriber flow) { this.flow = flow; } @Override public void onSubscribe(org.reactivestreams.Subscription subscription) { flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); } @Override public void onNext(T item) { flow.onNext(item); } @Override public void onError(Throwable throwable) { flow.onError(throwable); } @Override public void onComplete() { flow.onComplete(); } } /** * Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it. * @param the input type * @param the output type */ static final class ReactiveToFlowProcessor implements org.reactivestreams.Processor { final Flow.Processor flow; public ReactiveToFlowProcessor(Flow.Processor flow) { this.flow = flow; } @Override public void onSubscribe(org.reactivestreams.Subscription subscription) { flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); } @Override public void onNext(T t) { flow.onNext(t); } @Override public void onError(Throwable t) { flow.onError(t); } @Override public void onComplete() { flow.onComplete(); } @Override public void subscribe(org.reactivestreams.Subscriber s) { flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber(s)); } } /** * Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it. * @param the input type * @param the output type */ static final class FlowToReactiveProcessor implements Flow.Processor { final org.reactivestreams.Processor reactiveStreams; public FlowToReactiveProcessor(org.reactivestreams.Processor reactive) { this.reactiveStreams = reactive; } @Override public void onSubscribe(Flow.Subscription subscription) { reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); } @Override public void onNext(T t) { reactiveStreams.onNext(t); } @Override public void onError(Throwable t) { reactiveStreams.onError(t); } @Override public void onComplete() { reactiveStreams.onComplete(); } @Override public void subscribe(Flow.Subscriber s) { reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber(s)); } } /** * Reactive Streams Publisher that wraps a Flow Publisher. * @param the element type */ static final class ReactivePublisherFromFlow implements org.reactivestreams.Publisher { final Flow.Publisher flow; public ReactivePublisherFromFlow(Flow.Publisher flowPublisher) { this.flow = flowPublisher; } @Override public void subscribe(org.reactivestreams.Subscriber reactive) { flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber(reactive)); } } /** * Flow Publisher that wraps a Reactive Streams Publisher. * @param the element type */ static final class FlowPublisherFromReactive implements Flow.Publisher { final org.reactivestreams.Publisher reactiveStreams; public FlowPublisherFromReactive(org.reactivestreams.Publisher reactivePublisher) { this.reactiveStreams = reactivePublisher; } @Override public void subscribe(Flow.Subscriber flow) { reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber(flow)); } } }