Index: 3rdParty_sources/reactive-streams/org/reactivestreams/FlowAdapters.java =================================================================== diff -u --- 3rdParty_sources/reactive-streams/org/reactivestreams/FlowAdapters.java (revision 0) +++ 3rdParty_sources/reactive-streams/org/reactivestreams/FlowAdapters.java (revision 2d6722d97aad801e2f7db229945ae3dab6ec8576) @@ -0,0 +1,377 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams; + +import java.util.concurrent.Flow; +import static java.util.Objects.requireNonNull; + +/** + * Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API. + */ +public final class FlowAdapters { + /** Utility class. */ + private FlowAdapters() { + throw new IllegalStateException("No instances!"); + } + + /** + * Converts a Flow Publisher into a Reactive Streams Publisher. + * @param the element type + * @param flowPublisher the source Flow Publisher to convert + * @return the equivalent Reactive Streams Publisher + */ + @SuppressWarnings("unchecked") + public static org.reactivestreams.Publisher toPublisher( + Flow.Publisher flowPublisher) { + requireNonNull(flowPublisher, "flowPublisher"); + final org.reactivestreams.Publisher publisher; + if (flowPublisher instanceof FlowPublisherFromReactive) { + publisher = (org.reactivestreams.Publisher)(((FlowPublisherFromReactive)flowPublisher).reactiveStreams); + } else if (flowPublisher instanceof org.reactivestreams.Publisher) { + publisher = (org.reactivestreams.Publisher)flowPublisher; + } else { + publisher = new ReactivePublisherFromFlow(flowPublisher); + } + return publisher; + } + + /** + * Converts a Reactive Streams Publisher into a Flow Publisher. + * @param the element type + * @param reactiveStreamsPublisher the source Reactive Streams Publisher to convert + * @return the equivalent Flow Publisher + */ + @SuppressWarnings("unchecked") + public static Flow.Publisher toFlowPublisher( + org.reactivestreams.Publisher reactiveStreamsPublisher + ) { + requireNonNull(reactiveStreamsPublisher, "reactiveStreamsPublisher"); + final Flow.Publisher flowPublisher; + if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) { + flowPublisher = (Flow.Publisher)(((ReactivePublisherFromFlow)reactiveStreamsPublisher).flow); + } else if (reactiveStreamsPublisher instanceof Flow.Publisher) { + flowPublisher = (Flow.Publisher)reactiveStreamsPublisher; + } else { + flowPublisher = new FlowPublisherFromReactive(reactiveStreamsPublisher); + } + return flowPublisher; + } + + /** + * Converts a Flow Processor into a Reactive Streams Processor. + * @param the input value type + * @param the output value type + * @param flowProcessor the source Flow Processor to convert + * @return the equivalent Reactive Streams Processor + */ + @SuppressWarnings("unchecked") + public static org.reactivestreams.Processor toProcessor( + Flow.Processor flowProcessor + ) { + requireNonNull(flowProcessor, "flowProcessor"); + final org.reactivestreams.Processor processor; + if (flowProcessor instanceof FlowToReactiveProcessor) { + processor = (org.reactivestreams.Processor)(((FlowToReactiveProcessor)flowProcessor).reactiveStreams); + } else if (flowProcessor instanceof org.reactivestreams.Processor) { + processor = (org.reactivestreams.Processor)flowProcessor; + } else { + processor = new ReactiveToFlowProcessor(flowProcessor); + } + return processor; + } + + /** + * Converts a Reactive Streams Processor into a Flow Processor. + * @param the input value type + * @param the output value type + * @param reactiveStreamsProcessor the source Reactive Streams Processor to convert + * @return the equivalent Flow Processor + */ + @SuppressWarnings("unchecked") + public static Flow.Processor toFlowProcessor( + org.reactivestreams.Processor reactiveStreamsProcessor + ) { + requireNonNull(reactiveStreamsProcessor, "reactiveStreamsProcessor"); + final Flow.Processor flowProcessor; + if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) { + flowProcessor = (Flow.Processor)(((ReactiveToFlowProcessor)reactiveStreamsProcessor).flow); + } else if (reactiveStreamsProcessor instanceof Flow.Processor) { + flowProcessor = (Flow.Processor)reactiveStreamsProcessor; + } else { + flowProcessor = new FlowToReactiveProcessor(reactiveStreamsProcessor); + } + return flowProcessor; + } + + /** + * Converts a Reactive Streams Subscriber into a Flow Subscriber. + * @param the input and output value type + * @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert + * @return the equivalent Flow Subscriber + */ + @SuppressWarnings("unchecked") + public static Flow.Subscriber toFlowSubscriber(org.reactivestreams.Subscriber reactiveStreamsSubscriber) { + requireNonNull(reactiveStreamsSubscriber, "reactiveStreamsSubscriber"); + final Flow.Subscriber flowSubscriber; + if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) { + flowSubscriber = (Flow.Subscriber)((ReactiveToFlowSubscriber)reactiveStreamsSubscriber).flow; + } else if (reactiveStreamsSubscriber instanceof Flow.Subscriber) { + flowSubscriber = (Flow.Subscriber)reactiveStreamsSubscriber; + } else { + flowSubscriber = new FlowToReactiveSubscriber(reactiveStreamsSubscriber); + } + return flowSubscriber; + } + + /** + * Converts a Flow Subscriber into a Reactive Streams Subscriber. + * @param the input and output value type + * @param flowSubscriber the Flow Subscriber instance to convert + * @return the equivalent Reactive Streams Subscriber + */ + @SuppressWarnings("unchecked") + public static org.reactivestreams.Subscriber toSubscriber(Flow.Subscriber flowSubscriber) { + requireNonNull(flowSubscriber, "flowSubscriber"); + final org.reactivestreams.Subscriber subscriber; + if (flowSubscriber instanceof FlowToReactiveSubscriber) { + subscriber = (org.reactivestreams.Subscriber)((FlowToReactiveSubscriber)flowSubscriber).reactiveStreams; + } else if (flowSubscriber instanceof org.reactivestreams.Subscriber) { + subscriber = (org.reactivestreams.Subscriber)flowSubscriber; + } else { + subscriber = new ReactiveToFlowSubscriber(flowSubscriber); + } + return subscriber; + } + + /** + * Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription. + */ + static final class FlowToReactiveSubscription implements Flow.Subscription { + final org.reactivestreams.Subscription reactiveStreams; + + public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) { + this.reactiveStreams = reactive; + } + + @Override + public void request(long n) { + reactiveStreams.request(n); + } + + @Override + public void cancel() { + reactiveStreams.cancel(); + } + + } + + /** + * Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription. + */ + static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription { + final Flow.Subscription flow; + + public ReactiveToFlowSubscription(Flow.Subscription flow) { + this.flow = flow; + } + + @Override + public void request(long n) { + flow.request(n); + } + + @Override + public void cancel() { + flow.cancel(); + } + + + } + + /** + * Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it. + * @param the element type + */ + static final class FlowToReactiveSubscriber implements Flow.Subscriber { + final org.reactivestreams.Subscriber reactiveStreams; + + public FlowToReactiveSubscriber(org.reactivestreams.Subscriber reactive) { + this.reactiveStreams = reactive; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); + } + + @Override + public void onNext(T item) { + reactiveStreams.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + reactiveStreams.onError(throwable); + } + + @Override + public void onComplete() { + reactiveStreams.onComplete(); + } + + } + + /** + * Wraps a Flow Subscriber and forwards methods of the Reactive Streams Subscriber to it. + * @param the element type + */ + static final class ReactiveToFlowSubscriber implements org.reactivestreams.Subscriber { + final Flow.Subscriber flow; + + public ReactiveToFlowSubscriber(Flow.Subscriber flow) { + this.flow = flow; + } + + @Override + public void onSubscribe(org.reactivestreams.Subscription subscription) { + flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); + } + + @Override + public void onNext(T item) { + flow.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + flow.onError(throwable); + } + + @Override + public void onComplete() { + flow.onComplete(); + } + + } + + /** + * Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it. + * @param the input type + * @param the output type + */ + static final class ReactiveToFlowProcessor implements org.reactivestreams.Processor { + final Flow.Processor flow; + + public ReactiveToFlowProcessor(Flow.Processor flow) { + this.flow = flow; + } + + @Override + public void onSubscribe(org.reactivestreams.Subscription subscription) { + flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription)); + } + + @Override + public void onNext(T t) { + flow.onNext(t); + } + + @Override + public void onError(Throwable t) { + flow.onError(t); + } + + @Override + public void onComplete() { + flow.onComplete(); + } + + @Override + public void subscribe(org.reactivestreams.Subscriber s) { + flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber(s)); + } + } + + /** + * Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it. + * @param the input type + * @param the output type + */ + static final class FlowToReactiveProcessor implements Flow.Processor { + final org.reactivestreams.Processor reactiveStreams; + + public FlowToReactiveProcessor(org.reactivestreams.Processor reactive) { + this.reactiveStreams = reactive; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription)); + } + + @Override + public void onNext(T t) { + reactiveStreams.onNext(t); + } + + @Override + public void onError(Throwable t) { + reactiveStreams.onError(t); + } + + @Override + public void onComplete() { + reactiveStreams.onComplete(); + } + + @Override + public void subscribe(Flow.Subscriber s) { + reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber(s)); + } + } + + /** + * Reactive Streams Publisher that wraps a Flow Publisher. + * @param the element type + */ + static final class ReactivePublisherFromFlow implements org.reactivestreams.Publisher { + final Flow.Publisher flow; + + public ReactivePublisherFromFlow(Flow.Publisher flowPublisher) { + this.flow = flowPublisher; + } + + @Override + public void subscribe(org.reactivestreams.Subscriber reactive) { + flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber(reactive)); + } + } + + /** + * Flow Publisher that wraps a Reactive Streams Publisher. + * @param the element type + */ + static final class FlowPublisherFromReactive implements Flow.Publisher { + + final org.reactivestreams.Publisher reactiveStreams; + + public FlowPublisherFromReactive(org.reactivestreams.Publisher reactivePublisher) { + this.reactiveStreams = reactivePublisher; + } + + @Override + public void subscribe(Flow.Subscriber flow) { + reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber(flow)); + } + } + +} Index: 3rdParty_sources/reactive-streams/org/reactivestreams/Processor.java =================================================================== diff -u --- 3rdParty_sources/reactive-streams/org/reactivestreams/Processor.java (revision 0) +++ 3rdParty_sources/reactive-streams/org/reactivestreams/Processor.java (revision 2d6722d97aad801e2f7db229945ae3dab6ec8576) @@ -0,0 +1,22 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams; + +/** + * A Processor represents a processing stage—which is both a {@link Subscriber} + * and a {@link Publisher} and obeys the contracts of both. + * + * @param the type of element signaled to the {@link Subscriber} + * @param the type of element signaled by the {@link Publisher} + */ +public interface Processor extends Subscriber, Publisher { +} Index: 3rdParty_sources/reactive-streams/org/reactivestreams/Publisher.java =================================================================== diff -u --- 3rdParty_sources/reactive-streams/org/reactivestreams/Publisher.java (revision 0) +++ 3rdParty_sources/reactive-streams/org/reactivestreams/Publisher.java (revision 2d6722d97aad801e2f7db229945ae3dab6ec8576) @@ -0,0 +1,40 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams; + +/** + * A {@link Publisher} is a provider of a potentially unbounded number of sequenced elements, publishing them according to + * the demand received from its {@link Subscriber}(s). + *

+ * A {@link Publisher} can serve multiple {@link Subscriber}s subscribed {@link #subscribe(Subscriber)} dynamically + * at various points in time. + * + * @param the type of element signaled. + */ +public interface Publisher { + + /** + * Request {@link Publisher} to start streaming data. + *

+ * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}. + *

+ * Each {@link Subscription} will work for only a single {@link Subscriber}. + *

+ * A {@link Subscriber} should only subscribe once to a single {@link Publisher}. + *

+ * If the {@link Publisher} rejects the subscription attempt or otherwise fails it will + * signal the error via {@link Subscriber#onError}. + * + * @param s the {@link Subscriber} that will consume signals from this {@link Publisher} + */ + public void subscribe(Subscriber s); +} Index: 3rdParty_sources/reactive-streams/org/reactivestreams/Subscriber.java =================================================================== diff -u --- 3rdParty_sources/reactive-streams/org/reactivestreams/Subscriber.java (revision 0) +++ 3rdParty_sources/reactive-streams/org/reactivestreams/Subscriber.java (revision 2d6722d97aad801e2f7db229945ae3dab6ec8576) @@ -0,0 +1,66 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams; + +/** + * Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}. + *

+ * No further notifications will be received until {@link Subscription#request(long)} is called. + *

+ * After signaling demand: + *

    + *
  • One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#request(long)}
  • + *
  • Single invocation of {@link #onError(Throwable)} or {@link Subscriber#onComplete()} which signals a terminal state after which no further events will be sent. + *
+ *

+ * Demand can be signaled via {@link Subscription#request(long)} whenever the {@link Subscriber} instance is capable of handling more. + * + * @param the type of element signaled. + */ +public interface Subscriber { + /** + * Invoked after calling {@link Publisher#subscribe(Subscriber)}. + *

+ * No data will start flowing until {@link Subscription#request(long)} is invoked. + *

+ * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted. + *

+ * The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}. + * + * @param s + * {@link Subscription} that allows requesting data via {@link Subscription#request(long)} + */ + public void onSubscribe(Subscription s); + + /** + * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}. + * + * @param t the element signaled + */ + public void onNext(T t); + + /** + * Failed terminal state. + *

+ * No further events will be sent even if {@link Subscription#request(long)} is invoked again. + * + * @param t the throwable signaled + */ + public void onError(Throwable t); + + /** + * Successful terminal state. + *

+ * No further events will be sent even if {@link Subscription#request(long)} is invoked again. + */ + public void onComplete(); +} Index: 3rdParty_sources/reactive-streams/org/reactivestreams/Subscription.java =================================================================== diff -u --- 3rdParty_sources/reactive-streams/org/reactivestreams/Subscription.java (revision 0) +++ 3rdParty_sources/reactive-streams/org/reactivestreams/Subscription.java (revision 2d6722d97aad801e2f7db229945ae3dab6ec8576) @@ -0,0 +1,44 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams; + +/** + * A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}. + *

+ * It can only be used once by a single {@link Subscriber}. + *

+ * It is used to both signal desire for data and cancel demand (and allow resource cleanup). + * + */ +public interface Subscription { + /** + * No events will be sent by a {@link Publisher} until demand is signaled via this method. + *

+ * It can be called however often and whenever needed—but if the outstanding cumulative demand ever becomes Long.MAX_VALUE or more, + * it may be treated by the {@link Publisher} as "effectively unbounded". + *

+ * Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled. + *

+ * A {@link Publisher} can send less than is requested if the stream ends but + * then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}. + * + * @param n the strictly positive number of elements to requests to the upstream {@link Publisher} + */ + public void request(long n); + + /** + * Request the {@link Publisher} to stop sending data and clean up resources. + *

+ * Data may still be sent to meet previously signalled demand after calling cancel. + */ + public void cancel(); +} Index: 3rdParty_sources/versions.txt =================================================================== diff -u -rd3b7bd0be7b298ffca42f0d33989d6fb4763cef6 -r2d6722d97aad801e2f7db229945ae3dab6ec8576 --- 3rdParty_sources/versions.txt (.../versions.txt) (revision d3b7bd0be7b298ffca42f0d33989d6fb4763cef6) +++ 3rdParty_sources/versions.txt (.../versions.txt) (revision 2d6722d97aad801e2f7db229945ae3dab6ec8576) @@ -59,6 +59,8 @@ picketbox 5.0.3 +Reactive Streams 1.0.3 + Servlet API 4.0.0 Spring 5.3.13