- * A {@link Publisher} can serve multiple {@link Subscriber}s subscribed {@link #subscribe(Subscriber)} dynamically
+ * A {@link Publisher} can serve multiple {@link Subscriber}s subscribed {@link Publisher#subscribe(Subscriber)} dynamically
* at various points in time.
*
- * @param
* If the {@link Publisher} rejects the subscription attempt or otherwise fails it will
- * signal the error via {@link Subscriber#onError}.
+ * signal the error via {@link Subscriber#onError(Throwable)}.
*
* @param s the {@link Subscriber} that will consume signals from this {@link Publisher}
*/
Index: 3rdParty_sources/reactive-streams/org/reactivestreams/Subscriber.java
===================================================================
diff -u -r2d6722d97aad801e2f7db229945ae3dab6ec8576 -rc4ce08dc0aae7d9da822088a3d5710484f6b0402
--- 3rdParty_sources/reactive-streams/org/reactivestreams/Subscriber.java (.../Subscriber.java) (revision 2d6722d97aad801e2f7db229945ae3dab6ec8576)
+++ 3rdParty_sources/reactive-streams/org/reactivestreams/Subscriber.java (.../Subscriber.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
@@ -1,13 +1,6 @@
-/************************************************************************
- * Licensed under Public Domain (CC0) *
- * *
- * To the extent possible under law, the person who associated CC0 with *
- * this code has waived all copyright and related or neighboring *
- * rights to this code. *
- * *
- * You should have received a copy of the CC0 legalcode along with this *
- * work. If not, see
* Demand can be signaled via {@link Subscription#request(long)} whenever the {@link Subscriber} instance is capable of handling more.
*
- * @param
@@ -36,8 +30,7 @@
*
* The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
*
- * @param s
- * {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
+ * @param s the {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
*/
public void onSubscribe(Subscription s);
Index: 3rdParty_sources/reactive-streams/org/reactivestreams/Subscription.java
===================================================================
diff -u -r2d6722d97aad801e2f7db229945ae3dab6ec8576 -rc4ce08dc0aae7d9da822088a3d5710484f6b0402
--- 3rdParty_sources/reactive-streams/org/reactivestreams/Subscription.java (.../Subscription.java) (revision 2d6722d97aad801e2f7db229945ae3dab6ec8576)
+++ 3rdParty_sources/reactive-streams/org/reactivestreams/Subscription.java (.../Subscription.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
@@ -1,13 +1,6 @@
-/************************************************************************
- * Licensed under Public Domain (CC0) *
- * *
- * To the extent possible under law, the person who associated CC0 with *
- * this code has waived all copyright and related or neighboring *
- * rights to this code. *
- * *
- * You should have received a copy of the CC0 legalcode along with this *
- * work. If not, see
* It is used to both signal desire for data and cancel demand (and allow resource cleanup).
- *
*/
public interface Subscription {
+
/**
* No events will be sent by a {@link Publisher} until demand is signaled via this method.
*
Index: 3rdParty_sources/reactor/reactor/adapter/JdkFlowAdapter.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -rc4ce08dc0aae7d9da822088a3d5710484f6b0402
--- 3rdParty_sources/reactor/reactor/adapter/JdkFlowAdapter.java (.../JdkFlowAdapter.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/adapter/JdkFlowAdapter.java (.../JdkFlowAdapter.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
@@ -57,9 +57,9 @@
}
private static class FlowPublisherAsFlux
+ * Unless wrapped explicitly, such exceptions would always be thrown by operators instead of
+ * propagation through onError, potentially interrupting progress of Flux/Mono sequences.
+ * When they occur, the JVM itself is assumed to be in an unrecoverable state, and so is Reactor.
+ *
+ * @see #throwIfFatal(Throwable)
+ * @see #throwIfJvmFatal(Throwable)
+ * @see #isFatal(Throwable)
+ * @param t the {@link Throwable} to check
+ * @return true if the throwable is considered Jvm Fatal
+ */
+ public static boolean isJvmFatal(@Nullable Throwable t) {
+ return t instanceof VirtualMachineError ||
+ t instanceof ThreadDeath ||
+ t instanceof LinkageError;
+ }
+
+ /**
+ * Check if a {@link Throwable} is considered by Reactor as Fatal and would be thrown by
+ * {@link #throwIfFatal(Throwable)}.
+ *
+ * Unless wrapped explicitly, such exceptions would always be thrown by operators instead of
+ * propagation through onError, potentially interrupting progress of Flux/Mono sequences.
+ * When they occur, the assumption is that Reactor is in an unrecoverable state (notably
+ * because the JVM itself might be in an unrecoverable state).
+ *
+ * @see #throwIfFatal(Throwable)
+ * @see #isJvmFatal(Throwable)
+ * @param t the {@link Throwable} to check
+ * @return true if the throwable is considered fatal
+ */
+ public static boolean isFatal(@Nullable Throwable t) {
+ return isFatalButNotJvmFatal(t) || isJvmFatal(t);
+ }
+
+ /**
+ * Internal intermediate test that only detect Fatal but not Jvm Fatal exceptions.
+ */
+ static boolean isFatalButNotJvmFatal(@Nullable Throwable t) {
+ return t instanceof BubblingException || t instanceof ErrorCallbackNotImplemented;
+ }
+
+ /**
* Throws a particular {@code Throwable} only if it belongs to a set of "fatal" error
* varieties. These varieties are as follows:
+ * Tags can only be discovered until no parent can be inspected, which happens either
+ * when the source publisher has been reached or when a non-reactor intermediate operator
+ * is present in the parent chain (i.e. a stage that is not {@link Scannable} for {@link Attr#PARENT}).
*
- * @return the stream of tags for this {@link Scannable} and its parents
+ * @return the stream of tags for this {@link Scannable} and its reachable parents, including duplicates
+ * @see #tagsDeduplicated()
*/
default Stream
+ * Tags can only be discovered until no parent can be inspected, which happens either
+ * when the source publisher has been reached or when a non-reactor intermediate operator
+ * is present in the parent chain (i.e. a stage that is not {@link Scannable} for {@link Attr#PARENT}).
+ *
+ * @return a {@link Map} of deduplicated tags from this {@link Scannable} and its reachable parents
+ * @see #tags()
+ */
+ default Map
+ * Both publisher-to-subscriber events and subscription events are handled. Methods are closer to the side-effect doOnXxx operators
+ * than to {@link Subscriber} and {@link Subscription} methods, in order to avoid misconstruing this for an actual Reactive Streams
+ * implementation. The actual downstream {@link Subscriber} and upstream {@link Subscription} are intentionally not exposed
+ * to avoid any influence on the observed sequence.
+ *
+ * @author Simon Baslé
+ */
+public interface SignalListener
+ * Once the {@link Publisher} has acknowledged with a {@link Subscription}, the {@link #doOnSubscription()}
+ * handler will be invoked before that {@link Subscription} is passed down.
+ *
+ * @see #doOnSubscription()
+ */
+ void doFirst() throws Throwable;
+
+ /**
+ * Handle terminal signals after the signals have been propagated, as the final step.
+ * Only {@link SignalType#ON_COMPLETE}, {@link SignalType#ON_ERROR} or {@link SignalType#CANCEL} can be passed.
+ * This handler is invoked AFTER the terminal signal has been propagated, and if relevant AFTER the {@link #doAfterComplete()}
+ * or {@link #doAfterError(Throwable)} events. If any doOnXxx handler throws, this handler is NOT invoked (see {@link #handleListenerError(Throwable)}
+ * instead).
+ *
+ * @see #handleListenerError(Throwable)
+ */
+ void doFinally(SignalType terminationType) throws Throwable;
+
+ /**
+ * Handle the fact that the upstream {@link Publisher} acknowledged {@link Subscription}.
+ * The {@link Subscription} is intentionally not exposed in order to avoid manipulation by the observer.
+ *
+ * While {@link #doFirst} is invoked right as the downstream {@link Subscriber} is registered,
+ * this method is invoked as the upstream answers back with a {@link Subscription} (and before that
+ * same {@link Subscription} is passed downstream).
+ *
+ * @see #doFirst()
+ */
+ void doOnSubscription() throws Throwable;
+
+ /**
+ * Handle the negotiation of fusion between two {@link reactor.core.Fuseable} operators. As the downstream operator
+ * requests fusion, the upstream answers back with the compatible level of fusion it can handle. This {@code negotiatedFusion}
+ * code is passed to this handler right before it is propagated downstream.
+ *
+ * @param negotiatedFusion the final fusion mode negotiated by the upstream operator in response to a fusion request
+ * from downstream
+ */
+ void doOnFusion(int negotiatedFusion) throws Throwable;
+
+ /**
+ * Handle a new request made by the downstream, exposing the demand.
+ *
+ * This is invoked before the request is propagated upstream.
+ *
+ * @param requested the downstream demand
+ */
+ void doOnRequest(long requested) throws Throwable;
+
+ /**
+ * Handle the downstream cancelling its currently observed {@link Subscription}.
+ *
+ * This handler is invoked before propagating the cancellation upstream, while {@link #doFinally(SignalType)}
+ * is invoked right after the cancellation has been propagated upstream.
+ *
+ * @see #doFinally(SignalType)
+ */
+ void doOnCancel() throws Throwable;
+
+ /**
+ * Handle a new value emission from the source.
+ *
+ * This handler is invoked before propagating the value downstream.
+ *
+ * @param value the emitted value
+ */
+ void doOnNext(T value) throws Throwable;
+
+ /**
+ * Handle graceful onComplete sequence termination.
+ *
+ * This handler is invoked before propagating the completion downstream, while both
+ * {@link #doAfterComplete()} and {@link #doFinally(SignalType)} are invoked after.
+ *
+ * @see #doAfterComplete()
+ * @see #doFinally(SignalType)
+ */
+ void doOnComplete() throws Throwable;
+
+ /**
+ * Handle onError sequence termination.
+ *
+ * This handler is invoked before propagating the error downstream, while both
+ * {@link #doAfterError(Throwable)} and {@link #doFinally(SignalType)} are invoked after.
+ *
+ * @param error the exception that terminated the sequence
+ * @see #doAfterError(Throwable)
+ * @see #doFinally(SignalType)
+ */
+ void doOnError(Throwable error) throws Throwable;
+
+ /**
+ * Handle graceful onComplete sequence termination, after onComplete has been propagated downstream.
+ *
+ * This handler is invoked after propagating the completion downstream, similar to {@link #doFinally(SignalType)}
+ * and unlike {@link #doOnComplete()}.
+ */
+ void doAfterComplete() throws Throwable;
+
+ /**
+ * Handle onError sequence termination after onError has been propagated downstream.
+ *
+ * This handler is invoked after propagating the error downstream, similar to {@link #doFinally(SignalType)}
+ * and unlike {@link #doOnError(Throwable)}.
+ *
+ * @param error the exception that terminated the sequence
+ */
+ void doAfterError(Throwable error) throws Throwable;
+
+ /**
+ * Handle malformed {@link Subscriber#onNext(Object)}, which are onNext happening after the sequence has already terminated
+ * via {@link Subscriber#onComplete()} or {@link Subscriber#onError(Throwable)}.
+ * Note that after this handler is invoked, the value is automatically {@link Operators#onNextDropped(Object, Context) dropped}.
+ *
+ * If this handler fails with an exception, that exception is {@link Operators#onErrorDropped(Throwable, Context) dropped} before the
+ * value is also dropped.
+ *
+ * @param value the value for which an emission was attempted (which will be automatically dropped afterwards)
+ */
+ void doOnMalformedOnNext(T value) throws Throwable;
+
+ /**
+ * Handle malformed {@link Subscriber#onError(Throwable)}, which means the sequence has already terminated
+ * via {@link Subscriber#onComplete()} or {@link Subscriber#onError(Throwable)}.
+ * Note that after this handler is invoked, the exception is automatically {@link Operators#onErrorDropped(Throwable, Context) dropped}.
+ *
+ * If this handler fails with an exception, that exception is {@link Operators#onErrorDropped(Throwable, Context) dropped} before the
+ * original onError exception is also dropped.
+ *
+ * @param error the extraneous exception (which will be automatically dropped afterwards)
+ */
+ void doOnMalformedOnError(Throwable error) throws Throwable;
+
+ /**
+ * Handle malformed {@link Subscriber#onComplete()}, which means the sequence has already terminated
+ * via {@link Subscriber#onComplete()} or {@link Subscriber#onError(Throwable)}.
+ *
+ * If this handler fails with an exception, that exception is {@link Operators#onErrorDropped(Throwable, Context) dropped}.
+ */
+ void doOnMalformedOnComplete() throws Throwable;
+
+ /**
+ * A special handler for exceptions thrown from all the other handlers.
+ * This method MUST return normally, i.e. it MUST NOT throw.
+ * When a {@link SignalListener} handler fails, callers are expected to first invoke this method then to propagate
+ * the {@code listenerError} downstream if that is possible, terminating the original sequence with the listenerError.
+ *
+ * Typically, this special handler is intended for a last chance at processing the error despite the fact that
+ * {@link #doFinally(SignalType)} is not triggered on handler errors. For example, recording the error in a
+ * metrics backend or cleaning up state that would otherwise be cleaned up by {@link #doFinally(SignalType)}.
+ *
+ * @param listenerError the exception thrown from a {@link SignalListener} handler method
+ */
+ void handleListenerError(Throwable listenerError);
+
+ /**
+ * In some cases, the tap operation should alter the {@link Context} exposed by the operator in order to store additional
+ * data. This method is invoked when the tap subscriber is created, which is between the invocation of {@link #doFirst()}
+ * and the invocation of {@link #doOnSubscription()}. Generally, only addition of new keys should be performed on
+ * the downstream original {@link Context}. Extra care should be exercised if any pre-existing key is to be removed
+ * or replaced.
+ *
+ * @param originalContext the original downstream operator's {@link Context}
+ * @return the {@link Context} to use and expose upstream
+ */
+ default Context addToContext(Context originalContext) {
+ return originalContext;
+ }
+}
Index: 3rdParty_sources/reactor/reactor/core/observability/SignalListenerFactory.java
===================================================================
diff -u
--- 3rdParty_sources/reactor/reactor/core/observability/SignalListenerFactory.java (revision 0)
+++ 3rdParty_sources/reactor/reactor/core/observability/SignalListenerFactory.java (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package reactor.core.observability;
+
+import org.reactivestreams.Publisher;
+
+import reactor.util.context.ContextView;
+
+/**
+ * A factory for per-subscription {@link SignalListener}, exposing the ability to generate common state at publisher level
+ * from the source {@link Publisher}.
+ *
+ * Examples of such state include:
+ *
+ * The {@code source} {@link Publisher} is the same as the one that triggered common state creation at assembly time in
+ * {@link #initializePublisherState(Publisher)}). Said common state is passed to this method as well, and so is the
+ * {@link ContextView} for the newly registered {@link reactor.core.CoreSubscriber}.
+ *
+ * @param source the source {@link Publisher} that is being subscribed to
+ * @param listenerContext the {@link ContextView} associated with the new subscriber
+ * @param publisherContext the common state initialized in {@link #initializePublisherState(Publisher)}
+ * @return a stateful {@link SignalListener} observing signals to and from the new subscriber
+ */
+ SignalListener
+ *
+ *
+ *
+ *
+ *
+ *
*
+ *
+ *
+ * @param