@@ -780,7 +784,7 @@
* available backpressure modes
* @param emitter Consume the {@link FluxSink} provided per-subscriber by Reactor to generate signals.
* @return a {@link Flux}
- * @see #create(Consumer, OverflowStrategy)
+ * @see #create(Consumer, reactor.core.publisher.FluxSink.OverflowStrategy)
*/
public static Flux push(Consumer super FluxSink> emitter, OverflowStrategy backpressure) {
return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_ONLY));
@@ -1060,7 +1064,7 @@
*/
public static Flux from(Publisher extends T> source) {
//duplicated in wrap, but necessary to detect early and thus avoid applying assembly
- if (source instanceof Flux) {
+ if (source instanceof Flux && !ContextPropagationSupport.shouldWrapPublisher(source)) {
@SuppressWarnings("unchecked")
Flux casted = (Flux) source;
return casted;
@@ -2130,6 +2134,64 @@
}
/**
+ * Uses an {@link AutoCloseable} resource, generated by a supplier for each individual Subscriber,
+ * while streaming the values from a Publisher derived from the same resource and makes sure
+ * the resource is released if the sequence terminates or the Subscriber cancels.
+ *
+ * Eager {@link AutoCloseable} resource cleanup happens just before the source termination and exceptions raised
+ * by the cleanup Consumer may override the terminal event.
+ *
+ *
+ *
+ * For an asynchronous version of the cleanup, with distinct path for onComplete, onError
+ * and cancel terminations, see {@link #usingWhen(Publisher, Function, Function, BiFunction, Function)}.
+ *
+ * @param resourceSupplier a {@link Callable} that is called on subscribe to generate the resource
+ * @param sourceSupplier a factory to derive a {@link Publisher} from the supplied resource
+ * @param emitted type
+ * @param resource type
+ *
+ * @return a new {@link Flux} built around a disposable resource
+ * @see #usingWhen(Publisher, Function, Function, BiFunction, Function)
+ * @see #usingWhen(Publisher, Function, Function)
+ */
+ public static Flux using(Callable extends D> resourceSupplier,
+ Function super D, ? extends Publisher extends T>> sourceSupplier) {
+ return using(resourceSupplier, sourceSupplier, true);
+ }
+
+ /**
+ * Uses an {@link AutoCloseable} resource, generated by a supplier for each individual Subscriber,
+ * while streaming the values from a Publisher derived from the same resource and makes sure
+ * the resource is released if the sequence terminates or the Subscriber cancels.
+ *
+ *
+ *
Eager {@link AutoCloseable} resource cleanup happens just before the source termination and exceptions raised
+ * by the cleanup Consumer may override the terminal event.
+ *
Non-eager cleanup will drop any exception.
+ *
+ *
+ *
+ *
+ * For an asynchronous version of the cleanup, with distinct path for onComplete, onError
+ * and cancel terminations, see {@link #usingWhen(Publisher, Function, Function, BiFunction, Function)}.
+ *
+ * @param resourceSupplier a {@link Callable} that is called on subscribe to generate the resource
+ * @param sourceSupplier a factory to derive a {@link Publisher} from the supplied resource
+ * @param eager true to clean before terminating downstream subscribers
+ * @param emitted type
+ * @param resource type
+ *
+ * @return a new {@link Flux} built around a disposable resource
+ * @see #usingWhen(Publisher, Function, Function, BiFunction, Function)
+ * @see #usingWhen(Publisher, Function, Function)
+ */
+ public static Flux using(Callable extends D> resourceSupplier,
+ Function super D, ? extends Publisher extends T>> sourceSupplier, boolean eager) {
+ return using(resourceSupplier, sourceSupplier, Exceptions.AUTO_CLOSE, eager);
+ }
+
+ /**
* Uses a resource, generated by a {@link Publisher} for each individual {@link Subscriber},
* while streaming the values from a {@link Publisher} derived from the same resource.
* Whenever the resulting sequence terminates, a provided {@link Function} generates
@@ -2709,7 +2771,8 @@
* signals its first value, completes or a timeout expires. Returns that value,
* or null if the Flux completes empty. In case the Flux errors, the original
* exception is thrown (wrapped in a {@link RuntimeException} if it was a checked
- * exception). If the provided timeout expires, a {@link RuntimeException} is thrown.
+ * exception). If the provided timeout expires, a {@link RuntimeException} is thrown
+ * with a {@link TimeoutException} as the cause.
*
* Note that each blockFirst() will trigger a new subscription: in other words,
* the result might miss signal from hot publishers.
@@ -2718,6 +2781,7 @@
*
*
* @param timeout maximum time period to wait for before raising a {@link RuntimeException}
+ * with a {@link TimeoutException} as the cause
* @return the first value or null
*/
@Nullable
@@ -2759,7 +2823,8 @@
* signals its last value, completes or a timeout expires. Returns that value,
* or null if the Flux completes empty. In case the Flux errors, the original
* exception is thrown (wrapped in a {@link RuntimeException} if it was a checked
- * exception). If the provided timeout expires, a {@link RuntimeException} is thrown.
+ * exception). If the provided timeout expires, a {@link RuntimeException} is thrown
+ * with a {@link TimeoutException} as the cause.
*
* Note that each blockLast() will trigger a new subscription: in other words,
* the result might miss signal from hot publishers.
@@ -2768,6 +2833,7 @@
*
*
* @param timeout maximum time period to wait for before raising a {@link RuntimeException}
+ * with a {@link TimeoutException} as the cause
* @return the last value or null
*/
@Nullable
@@ -5054,7 +5120,7 @@
}
/**
- * Map this {@link Flux} into {@link Tuple2 Tuple2<Long, T>}
+ * Map this {@link Flux} into {@link reactor.util.function.Tuple2 Tuple2<Long, T>}
* of timemillis and source data. The timemillis corresponds to the elapsed time
* between each signal as measured by the {@link Schedulers#parallel() parallel} scheduler.
* First duration is measured between the subscription and the first element.
@@ -5071,7 +5137,7 @@
}
/**
- * Map this {@link Flux} into {@link Tuple2 Tuple2<Long, T>}
+ * Map this {@link Flux} into {@link reactor.util.function.Tuple2 Tuple2<Long, T>}
* of timemillis and source data. The timemillis corresponds to the elapsed time
* between each signal as measured by the provided {@link Scheduler}.
* First duration is measured between the subscription and the first element.
@@ -6670,7 +6736,7 @@
* The name serves as a prefix in the reported metrics names. In case no name has been provided, the default name "reactor" will be applied.
*
* The {@link MeterRegistry} used by reactor can be configured via
- * {@link Metrics.MicrometerConfiguration#useRegistry(MeterRegistry)}
+ * {@link reactor.util.Metrics.MicrometerConfiguration#useRegistry(MeterRegistry)}
* prior to using this operator, the default being
* {@link io.micrometer.core.instrument.Metrics#globalRegistry}.
*
@@ -7950,7 +8016,7 @@
* The companion is generated by the provided {@link Retry} instance, see {@link Retry#max(long)}, {@link Retry#maxInARow(long)}
* and {@link Retry#backoff(long, Duration)} for readily available strategy builders.
*
- * The operator generates a base for the companion, a {@link Flux} of {@link Retry.RetrySignal}
+ * The operator generates a base for the companion, a {@link Flux} of {@link reactor.util.retry.Retry.RetrySignal}
* which each give metadata about each retryable failure whenever this {@link Flux} signals an error. The final companion
* should be derived from that base companion and emit data in response to incoming onNext (although it can emit less
* elements, or delay the emissions).
@@ -7960,12 +8026,12 @@
*
*
*
- * Note that the {@link Retry.RetrySignal} state can be
+ * Note that the {@link reactor.util.retry.Retry.RetrySignal} state can be
* transient and change between each source
* {@link org.reactivestreams.Subscriber#onError(Throwable) onError} or
* {@link org.reactivestreams.Subscriber#onNext(Object) onNext}. If processed with a delay,
* this could lead to the represented state being out of sync with the state at which the retry
- * was evaluated. Map it to {@link Retry.RetrySignal#copy()}
+ * was evaluated. Map it to {@link reactor.util.retry.Retry.RetrySignal#copy()}
* right away to mediate this.
*
* Note that if the companion {@link Publisher} created by the {@code whenFactory}
@@ -7990,12 +8056,12 @@
*
*
* @param retrySpec the {@link Retry} strategy that will generate the companion {@link Publisher},
- * given a {@link Flux} that signals each onError as a {@link Retry.RetrySignal}.
+ * given a {@link Flux} that signals each onError as a {@link reactor.util.retry.Retry.RetrySignal}.
*
* @return a {@link Flux} that retries on onError when a companion {@link Publisher} produces an onNext signal
- * @see Retry#max(long)
- * @see Retry#maxInARow(long)
- * @see Retry#backoff(long, Duration)
+ * @see reactor.util.retry.Retry#max(long)
+ * @see reactor.util.retry.Retry#maxInARow(long)
+ * @see reactor.util.retry.Retry#backoff(long, Duration)
*/
public final Flux retryWhen(Retry retrySpec) {
return onAssembly(new FluxRetryWhen<>(this, retrySpec));
@@ -8259,7 +8325,7 @@
/**
* Expect and emit a single item from this {@link Flux} source or signal
- * {@link NoSuchElementException} for an empty source, or
+ * {@link java.util.NoSuchElementException} for an empty source, or
* {@link IndexOutOfBoundsException} for a source with more than one element.
*
*
@@ -8585,7 +8651,7 @@
* Subscribe a {@link Consumer} to this {@link Flux} that will consume all the
* elements in the sequence. It will request an unbounded demand ({@code Long.MAX_VALUE}).
*
- * For a passive version that observe and forward incoming data see {@link #doOnNext(Consumer)}.
+ * For a passive version that observe and forward incoming data see {@link #doOnNext(java.util.function.Consumer)}.
*
* For a version that gives you more control over backpressure and the request, see
* {@link #subscribe(Subscriber)} with a {@link BaseSubscriber}.
@@ -8612,7 +8678,7 @@
* The subscription will request an unbounded demand ({@code Long.MAX_VALUE}).
*
* For a passive version that observe and forward incoming data see
- * {@link #doOnNext(Consumer)} and {@link #doOnError(Consumer)}.
+ * {@link #doOnNext(java.util.function.Consumer)} and {@link #doOnError(java.util.function.Consumer)}.
*
For a version that gives you more control over backpressure and the request, see
* {@link #subscribe(Subscriber)} with a {@link BaseSubscriber}.
*
@@ -8638,8 +8704,8 @@
* elements in the sequence, handle errors and react to completion. The subscription
* will request unbounded demand ({@code Long.MAX_VALUE}).
*
- * For a passive version that observe and forward incoming data see {@link #doOnNext(Consumer)},
- * {@link #doOnError(Consumer)} and {@link #doOnComplete(Runnable)}.
+ * For a passive version that observe and forward incoming data see {@link #doOnNext(java.util.function.Consumer)},
+ * {@link #doOnError(java.util.function.Consumer)} and {@link #doOnComplete(Runnable)}.
*
For a version that gives you more control over backpressure and the request, see
* {@link #subscribe(Subscriber)} with a {@link BaseSubscriber}.
*
@@ -8670,8 +8736,8 @@
* request the adequate amount of data, or request unbounded demand
* {@code Long.MAX_VALUE} if no such consumer is provided.
*
- * For a passive version that observe and forward incoming data see {@link #doOnNext(Consumer)},
- * {@link #doOnError(Consumer)}, {@link #doOnComplete(Runnable)}
+ * For a passive version that observe and forward incoming data see {@link #doOnNext(java.util.function.Consumer)},
+ * {@link #doOnError(java.util.function.Consumer)}, {@link #doOnComplete(Runnable)}
* and {@link #doOnSubscribe(Consumer)}.
*
For a version that gives you more control over backpressure and the request, see
* {@link #subscribe(Subscriber)} with a {@link BaseSubscriber}.
@@ -8710,8 +8776,8 @@
* elements in the sequence, handle errors and react to completion. Additionally, a {@link Context}
* is tied to the subscription. At subscription, an unbounded request is implicitly made.
*
- * For a passive version that observe and forward incoming data see {@link #doOnNext(Consumer)},
- * {@link #doOnError(Consumer)}, {@link #doOnComplete(Runnable)}
+ * For a passive version that observe and forward incoming data see {@link #doOnNext(java.util.function.Consumer)},
+ * {@link #doOnError(java.util.function.Consumer)}, {@link #doOnComplete(Runnable)}
* and {@link #doOnSubscribe(Consumer)}.
*
For a version that gives you more control over backpressure and the request, see
* {@link #subscribe(Subscriber)} with a {@link BaseSubscriber}.
@@ -8770,6 +8836,7 @@
}
}
+ subscriber = Operators.restoreContextOnSubscriberIfPublisherNonInternal(publisher, subscriber);
publisher.subscribe(subscriber);
}
catch (Throwable e) {
@@ -8797,7 +8864,7 @@
* the next occurrence of a {@link #publishOn(Scheduler) publishOn}.
*
* Note that if you are using an eager or blocking
- * {@link #create(Consumer, OverflowStrategy)}
+ * {@link #create(Consumer, FluxSink.OverflowStrategy)}
* as the source, it can lead to deadlocks due to requests piling up behind the emitter.
* In such case, you should call {@link #subscribeOn(Scheduler, boolean) subscribeOn(scheduler, false)}
* instead.
@@ -8834,7 +8901,7 @@
* the next occurrence of a {@link #publishOn(Scheduler) publishOn}.
*
* Note that if you are using an eager or blocking
- * {@link Flux#create(Consumer, OverflowStrategy)}
+ * {@link Flux#create(Consumer, FluxSink.OverflowStrategy)}
* as the source, it can lead to deadlocks due to requests piling up behind the emitter.
* Thus this operator has a {@code requestOnSeparateThread} parameter, which should be
* set to {@code false} in this case.
@@ -9658,7 +9725,7 @@
}
/**
- * Emit a {@link Tuple2} pair of T1 the current clock time in
+ * Emit a {@link reactor.util.function.Tuple2} pair of T1 the current clock time in
* millis (as a {@link Long} measured by the {@link Schedulers#parallel() parallel}
* Scheduler) and T2 the emitted data (as a {@code T}), for each item from this {@link Flux}.
*
@@ -9674,7 +9741,7 @@
}
/**
- * Emit a {@link Tuple2} pair of T1 the current clock time in
+ * Emit a {@link reactor.util.function.Tuple2} pair of T1 the current clock time in
* millis (as a {@link Long} measured by the provided {@link Scheduler}) and T2
* the emitted data (as a {@code T}), for each item from this {@link Flux}.
*
@@ -9916,7 +9983,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
@@ -9956,7 +10023,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: The overlapping variant DOES NOT discard elements, as they might be part of another still valid window.
* The exact window and dropping window variants bot discard elements they internally queued for backpressure
@@ -9992,7 +10059,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors and those emitted by the {@code boundary} delivered to the window
- * {@link Flux} are wrapped in {@link Exceptions.SourceException}.
+ * {@link Flux} are wrapped in {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
@@ -10024,7 +10091,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
@@ -10067,7 +10134,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: The overlapping variant DOES NOT discard elements, as they might be part of another still valid window.
* The exact window and dropping window variants bot discard elements they internally queued for backpressure
@@ -10101,7 +10168,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
@@ -10145,7 +10212,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: The overlapping variant DOES NOT discard elements, as they might be part of another still valid window.
* The exact window and dropping window variants bot discard elements they internally queued for backpressure
@@ -10185,7 +10252,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
@@ -10219,7 +10286,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
@@ -10255,7 +10322,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
@@ -10290,7 +10357,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
@@ -10330,7 +10397,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal. Upon cancellation of the current window,
@@ -10372,7 +10439,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal. Upon cancellation of the current window,
@@ -10416,7 +10483,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal. Upon cancellation of the current window,
@@ -10455,7 +10522,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal. Upon cancellation of the current window,
@@ -10486,7 +10553,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal. Upon cancellation of the current window,
@@ -10519,7 +10586,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal. Upon cancellation of the current window,
@@ -10559,7 +10626,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal, as well as the triggering element(s) (that doesn't match
@@ -10597,7 +10664,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal, as well as the triggering element(s) (that doesn't match
@@ -10641,7 +10708,7 @@
*
* To distinguish errors emitted by the processing of individual windows, source
* sequence errors delivered to the window {@link Flux} are wrapped in
- * {@link Exceptions.SourceException}.
+ * {@link reactor.core.Exceptions.SourceException}.
*
*
Discard Support: This operator DOES NOT discard elements.
*
@@ -11065,8 +11132,12 @@
*/
@SuppressWarnings("unchecked")
static Flux wrap(Publisher extends I> source) {
- if (source instanceof Flux) {
- return (Flux) source;
+ boolean shouldWrap = ContextPropagationSupport.shouldWrapPublisher(source);
+ if (source instanceof Flux) {
+ if (!shouldWrap) {
+ return (Flux) source;
+ }
+ return ContextPropagation.fluxRestoreThreadLocals((Flux extends I>) source);
}
//for scalars we'll instantiate the operators directly to avoid onAssembly
@@ -11084,16 +11155,22 @@
}
}
- if(source instanceof Mono){
- if(source instanceof Fuseable){
- return new FluxSourceMonoFuseable<>((Mono)source);
+ Flux target;
+ if (source instanceof Mono) {
+ if (source instanceof Fuseable) {
+ target = new FluxSourceMonoFuseable<>((Mono) source);
+ } else {
+ target = new FluxSourceMono<>((Mono) source);
}
- return new FluxSourceMono<>((Mono)source);
+ } else if (source instanceof Fuseable) {
+ target = new FluxSourceFuseable<>(source);
+ } else {
+ target = new FluxSource<>(source);
}
- if(source instanceof Fuseable){
- return new FluxSourceFuseable<>(source);
+ if (shouldWrap) {
+ return ContextPropagation.fluxRestoreThreadLocals(target);
}
- return new FluxSource<>(source);
+ return target;
}
@SuppressWarnings("rawtypes")
@@ -11108,4 +11185,4 @@
@SuppressWarnings("rawtypes")
static final Function IDENTITY_FUNCTION = Function.identity();
-}
\ No newline at end of file
+}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxArray.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxArray.java (.../FluxArray.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxArray.java (.../FluxArray.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -64,8 +64,7 @@
public Object scanUnsafe(Attr key) {
if (key == Attr.BUFFERED) return array.length;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
-
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
static final class ArraySubscription
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxBufferBoundary.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxBufferBoundary.java (.../FluxBufferBoundary.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxBufferBoundary.java (.../FluxBufferBoundary.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -53,7 +53,7 @@
Publisher other,
Supplier bufferSupplier) {
super(source);
- this.other = Objects.requireNonNull(other, "other");
+ this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier");
}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxBufferPredicate.java
===================================================================
diff -u -rc4ce08dc0aae7d9da822088a3d5710484f6b0402 -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxBufferPredicate.java (.../FluxBufferPredicate.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxBufferPredicate.java (.../FluxBufferPredicate.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -40,7 +40,7 @@
/**
* Buffers elements into custom collections where the buffer boundary is determined by
- * a {@link Predicate} on the values. The predicate can be used in
+ * a {@link java.util.function.Predicate} on the values. The predicate can be used in
* several modes:
*
*
{@code Until}: A new buffer starts when the predicate returns true. The
@@ -461,4 +461,4 @@
}
}
-}
\ No newline at end of file
+}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxBufferWhen.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxBufferWhen.java (.../FluxBufferWhen.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxBufferWhen.java (.../FluxBufferWhen.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -65,7 +65,7 @@
Supplier bufferSupplier,
Supplier extends Queue> queueSupplier) {
super(source);
- this.start = Objects.requireNonNull(start, "start");
+ this.start = Operators.toFluxOrMono(Objects.requireNonNull(start, "start"));
this.end = Objects.requireNonNull(end, "end");
this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier");
this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
@@ -360,6 +360,7 @@
BufferWhenCloseSubscriber bc = new BufferWhenCloseSubscriber<>(this, idx);
subscribers.add(bc);
+ p = Operators.toFluxOrMono(p);
p.subscribe(bc);
}
@@ -431,7 +432,7 @@
if (key == Attr.ERROR) return errors;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return InnerOperator.super.scanUnsafe(key);
}
}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxCallable.java
===================================================================
diff -u -rc4ce08dc0aae7d9da822088a3d5710484f6b0402 -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxCallable.java (.../FluxCallable.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxCallable.java (.../FluxCallable.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -49,6 +49,6 @@
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxCombineLatest.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxCombineLatest.java (.../FluxCombineLatest.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxCombineLatest.java (.../FluxCombineLatest.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -166,6 +166,8 @@
}
}
+ Operators.toFluxOrMono(a);
+
Queue queue = queueSupplier.get();
CombineLatestCoordinator coordinator =
@@ -180,7 +182,7 @@
public Object scanUnsafe(Attr key) {
if (key == Attr.PREFETCH) return prefetch;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
static final class CombineLatestCoordinator
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxConcatArray.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxConcatArray.java (.../FluxConcatArray.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxConcatArray.java (.../FluxConcatArray.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -61,6 +61,7 @@
if (p == null) {
Operators.error(actual, new NullPointerException("The single source Publisher is null"));
} else {
+ p = Operators.toFluxOrMono(p);
p.subscribe(actual);
}
return;
@@ -83,7 +84,7 @@
public Object scanUnsafe(Attr key) {
if (key == Attr.DELAY_ERROR) return delayError;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
/**
@@ -255,6 +256,7 @@
if (this.cancelled) {
return;
}
+ p = Operators.toFluxOrMono(p);
p.subscribe(this);
final Object state = this.get();
@@ -404,7 +406,7 @@
return;
}
- final Publisher extends T> p = a[i];
+ Publisher extends T> p = a[i];
if (p == null) {
this.remove();
@@ -440,6 +442,7 @@
return;
}
+ p = Operators.toFluxOrMono(p);
p.subscribe(this);
final Object state = this.get();
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxConcatIterable.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxConcatIterable.java (.../FluxConcatIterable.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxConcatIterable.java (.../FluxConcatIterable.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -64,7 +64,7 @@
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
static final class ConcatIterableSubscriber
@@ -144,6 +144,7 @@
produced(c);
}
+ p = Operators.toFluxOrMono(p);
p.subscribe(this);
if (isCancelled()) {
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxConcatMap.java
===================================================================
diff -u -rc4ce08dc0aae7d9da822088a3d5710484f6b0402 -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxConcatMap.java (.../FluxConcatMap.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxConcatMap.java (.../FluxConcatMap.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -448,6 +448,7 @@
}
else {
active = true;
+ p = Operators.toFluxOrMono(p);
p.subscribe(inner);
}
}
@@ -805,6 +806,7 @@
}
else {
active = true;
+ p = Operators.toFluxOrMono(p);
p.subscribe(inner);
}
}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxConcatMapNoPrefetch.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxConcatMapNoPrefetch.java (.../FluxConcatMapNoPrefetch.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxConcatMapNoPrefetch.java (.../FluxConcatMapNoPrefetch.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2020-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -203,6 +203,7 @@
return;
}
+ p = Operators.toFluxOrMono(p);
p.subscribe(inner);
}
catch (Throwable e) {
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxContextWrite.java
===================================================================
diff -u -rc4ce08dc0aae7d9da822088a3d5710484f6b0402 -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxContextWrite.java (.../FluxContextWrite.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxContextWrite.java (.../FluxContextWrite.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -46,7 +46,7 @@
@Override
public Object scanUnsafe(Attr key) {
- if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
+ if (key == Scannable.Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}
@@ -79,7 +79,7 @@
if (key == Attr.PARENT) {
return s;
}
- if (key == Attr.RUN_STYLE) {
+ if (key == Scannable.Attr.RUN_STYLE) {
return Attr.RunStyle.SYNC;
}
return InnerOperator.super.scanUnsafe(key);
@@ -175,4 +175,4 @@
return qs != null ? qs.size() : 0;
}
}
-}
\ No newline at end of file
+}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxContextWriteRestoringThreadLocals.java
===================================================================
diff -u -rc4ce08dc0aae7d9da822088a3d5710484f6b0402 -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxContextWriteRestoringThreadLocals.java (.../FluxContextWriteRestoringThreadLocals.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxContextWriteRestoringThreadLocals.java (.../FluxContextWriteRestoringThreadLocals.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -22,6 +22,7 @@
import io.micrometer.context.ContextSnapshot;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
+import reactor.core.Fuseable;
import reactor.core.Fuseable.ConditionalSubscriber;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
@@ -49,10 +50,11 @@
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
+ if (key == InternalProducerAttr.INSTANCE) return true;
return super.scanUnsafe(key);
}
- static final class ContextWriteRestoringThreadLocalsSubscriber
+ static class ContextWriteRestoringThreadLocalsSubscriber
implements ConditionalSubscriber, InnerOperator {
final CoreSubscriber super T> actual;
@@ -171,4 +173,46 @@
}
}
}
+
+ static final class FuseableContextWriteRestoringThreadLocalsSubscriber
+ extends ContextWriteRestoringThreadLocalsSubscriber
+ implements Fuseable.QueueSubscription {
+
+ FuseableContextWriteRestoringThreadLocalsSubscriber(
+ CoreSubscriber super T> actual, Context context) {
+ super(actual, context);
+ }
+
+ // Required for
+ // FuseableBestPracticesTest.coreFuseableSubscribersShouldNotExtendNonFuseableOnNext
+ @Override
+ public void onNext(T t) {
+ super.onNext(t);
+ }
+
+ @Override
+ public T poll() {
+ throw new UnsupportedOperationException("Nope");
+ }
+
+ @Override
+ public int requestFusion(int requestedMode) {
+ return Fuseable.NONE;
+ }
+
+ @Override
+ public int size() {
+ throw new UnsupportedOperationException("Nope");
+ }
+
+ @Override
+ public boolean isEmpty() {
+ throw new UnsupportedOperationException("Nope");
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException("Nope");
+ }
+ }
}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxCreate.java
===================================================================
diff -u -rc4ce08dc0aae7d9da822088a3d5710484f6b0402 -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxCreate.java (.../FluxCreate.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxCreate.java (.../FluxCreate.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -58,7 +58,7 @@
final CreateMode createMode;
FluxCreate(Consumer super FluxSink> source,
- OverflowStrategy backpressure,
+ FluxSink.OverflowStrategy backpressure,
CreateMode createMode) {
this.source = Objects.requireNonNull(source, "source");
this.backpressure = Objects.requireNonNull(backpressure, "backpressure");
@@ -88,24 +88,26 @@
@Override
public void subscribe(CoreSubscriber super T> actual) {
- BaseSink sink = createSink(actual, backpressure);
+ CoreSubscriber super T> wrapped =
+ Operators.restoreContextOnSubscriberIfAutoCPEnabled(this, actual);
+ BaseSink sink = createSink(wrapped, backpressure);
- actual.onSubscribe(sink);
+ wrapped.onSubscribe(sink);
try {
source.accept(
createMode == CreateMode.PUSH_PULL ? new SerializedFluxSink<>(sink) :
sink);
}
catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
- sink.error(Operators.onOperatorError(ex, actual.currentContext()));
+ sink.error(Operators.onOperatorError(ex, wrapped.currentContext()));
}
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.ASYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
/**
@@ -1125,4 +1127,4 @@
}
}
}
-}
\ No newline at end of file
+}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxDefer.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxDefer.java (.../FluxDefer.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxDefer.java (.../FluxDefer.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -58,6 +58,6 @@
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxDeferContextual.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxDeferContextual.java (.../FluxDeferContextual.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxDeferContextual.java (.../FluxDeferContextual.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -54,13 +54,13 @@
return;
}
- from(p).subscribe(actual);
+ Operators.toFluxOrMono(p).subscribe(actual);
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxDelaySubscription.java
===================================================================
diff -u -rc4ce08dc0aae7d9da822088a3d5710484f6b0402 -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxDelaySubscription.java (.../FluxDelaySubscription.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxDelaySubscription.java (.../FluxDelaySubscription.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -40,7 +40,7 @@
FluxDelaySubscription(Flux extends T> source, Publisher other) {
super(source);
- this.other = Objects.requireNonNull(other, "other");
+ this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
}
@Override
@@ -68,7 +68,7 @@
static final class DelaySubscriptionOtherSubscriber
extends Operators.DeferredSubscription implements InnerOperator {
- final Consumer> source;
+ final Consumer> source;
final CoreSubscriber super T> actual;
@@ -77,7 +77,7 @@
boolean done;
DelaySubscriptionOtherSubscriber(CoreSubscriber super T> actual,
- Consumer> source) {
+ Consumer> source) {
this.actual = actual;
this.source = source;
}
@@ -199,4 +199,4 @@
actual.onComplete();
}
}
-}
\ No newline at end of file
+}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxEmpty.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxEmpty.java (.../FluxEmpty.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxEmpty.java (.../FluxEmpty.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -45,7 +45,7 @@
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
/**
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxError.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxError.java (.../FluxError.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxError.java (.../FluxError.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -54,6 +54,6 @@
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxErrorOnRequest.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxErrorOnRequest.java (.../FluxErrorOnRequest.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxErrorOnRequest.java (.../FluxErrorOnRequest.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -46,7 +46,7 @@
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
static final class ErrorSubscription implements InnerProducer {
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxErrorSupplied.java
===================================================================
diff -u -rc4ce08dc0aae7d9da822088a3d5710484f6b0402 -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxErrorSupplied.java (.../FluxErrorSupplied.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxErrorSupplied.java (.../FluxErrorSupplied.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,7 +25,7 @@
/**
* Emits a generated {@link Throwable} instance to Subscribers, lazily generated via a
- * provided {@link Supplier}.
+ * provided {@link java.util.function.Supplier}.
*
* @param the value type
*
@@ -57,6 +57,6 @@
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
-}
\ No newline at end of file
+}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxExpand.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxExpand.java (.../FluxExpand.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxExpand.java (.../FluxExpand.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -119,8 +119,8 @@
Publisher extends T> p;
try {
- p = Objects.requireNonNull(expander.apply(t),
- "The expander returned a null Publisher");
+ p = Operators.toFluxOrMono(Objects.requireNonNull(expander.apply(t),
+ "The expander returned a null Publisher"));
}
catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxFilter.java
===================================================================
diff -u -rc4ce08dc0aae7d9da822088a3d5710484f6b0402 -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxFilter.java (.../FluxFilter.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxFilter.java (.../FluxFilter.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -61,7 +61,7 @@
static final class FilterSubscriber
implements InnerOperator,
- ConditionalSubscriber {
+ Fuseable.ConditionalSubscriber {
final CoreSubscriber super T> actual;
final Context ctx;
@@ -194,9 +194,9 @@
static final class FilterConditionalSubscriber
implements InnerOperator,
- ConditionalSubscriber {
+ Fuseable.ConditionalSubscriber {
- final ConditionalSubscriber super T> actual;
+ final Fuseable.ConditionalSubscriber super T> actual;
final Context ctx;
final Predicate super T> predicate;
@@ -205,7 +205,7 @@
boolean done;
- FilterConditionalSubscriber(ConditionalSubscriber super T> actual,
+ FilterConditionalSubscriber(Fuseable.ConditionalSubscriber super T> actual,
Predicate super T> predicate) {
this.actual = actual;
this.ctx = actual.currentContext();
@@ -327,4 +327,4 @@
}
}
-}
\ No newline at end of file
+}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxFilterWhen.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxFilterWhen.java (.../FluxFilterWhen.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxFilterWhen.java (.../FluxFilterWhen.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -279,6 +279,7 @@
FilterWhenInner inner = new FilterWhenInner(this, !(p instanceof Mono));
if (CURRENT.compareAndSet(this,null, inner)) {
state = STATE_RUNNING;
+ p = Operators.toFluxOrMono(p);
p.subscribe(inner);
break;
}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxFirstWithSignal.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxFirstWithSignal.java (.../FluxFirstWithSignal.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxFirstWithSignal.java (.../FluxFirstWithSignal.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -127,11 +127,14 @@
new NullPointerException("The single source Publisher is null"));
}
else {
+ p = Operators.toFluxOrMono(p);
p.subscribe(actual);
}
return;
}
+ Operators.toFluxOrMono(a);
+
RaceCoordinator coordinator = new RaceCoordinator<>(n);
coordinator.subscribe(a, n, actual);
@@ -164,7 +167,7 @@
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
static final class RaceCoordinator
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxFirstWithValue.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxFirstWithValue.java (.../FluxFirstWithValue.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxFirstWithValue.java (.../FluxFirstWithValue.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -159,7 +159,7 @@
return;
}
if (n == 1) {
- Publisher extends T> p = a[0];
+ Publisher extends T> p = Flux.from(a[0]);
if (p == null) {
Operators.error(actual,
@@ -178,7 +178,7 @@
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
static final class RaceValuesCoordinator
@@ -227,6 +227,8 @@
actual.onSubscribe(this);
+ Operators.toFluxOrMono(sources);
+
for (int i = 0; i < n; i++) {
if (cancelled || winner != Integer.MIN_VALUE) {
return;
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxFlatMap.java
===================================================================
diff -u -rc4ce08dc0aae7d9da822088a3d5710484f6b0402 -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxFlatMap.java (.../FluxFlatMap.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxFlatMap.java (.../FluxFlatMap.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -30,6 +30,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
+import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
@@ -196,6 +197,7 @@
}
}
else {
+ p = Operators.toFluxOrMono(p);
if (!fuseableExpected || p instanceof Fuseable) {
p.subscribe(s);
}
@@ -424,6 +426,7 @@
else {
FlatMapInner inner = new FlatMapInner<>(this, prefetch);
if (add(inner)) {
+ p = Operators.toFluxOrMono(p);
p.subscribe(inner);
} else {
Operators.onDiscard(t, actual.currentContext());
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxFromMonoOperator.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxFromMonoOperator.java (.../FluxFromMonoOperator.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxFromMonoOperator.java (.../FluxFromMonoOperator.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -63,6 +63,7 @@
public Object scanUnsafe(Attr key) {
if (key == Attr.PREFETCH) return getPrefetch();
if (key == Attr.PARENT) return source;
+ if (key == InternalProducerAttr.INSTANCE) return true;
return null;
}
@@ -80,6 +81,7 @@
}
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null) {
+ subscriber = Operators.restoreContextOnSubscriberIfPublisherNonInternal(operator.source(), subscriber);
operator.source().subscribe(subscriber);
return;
}
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxGenerate.java
===================================================================
diff -u -rc4ce08dc0aae7d9da822088a3d5710484f6b0402 -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxGenerate.java (.../FluxGenerate.java) (revision c4ce08dc0aae7d9da822088a3d5710484f6b0402)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxGenerate.java (.../FluxGenerate.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -74,21 +74,24 @@
@Override
public void subscribe(CoreSubscriber super T> actual) {
+ CoreSubscriber super T> wrapped =
+ Operators.restoreContextOnSubscriberIfAutoCPEnabled(this, actual);
+
S state;
try {
state = stateSupplier.call();
} catch (Throwable e) {
- Operators.error(actual, Operators.onOperatorError(e, actual.currentContext()));
+ Operators.error(wrapped, Operators.onOperatorError(e, wrapped.currentContext()));
return;
}
- actual.onSubscribe(new GenerateSubscription<>(actual, state, generator, stateConsumer));
+ wrapped.onSubscribe(new GenerateSubscription<>(wrapped, state, generator, stateConsumer));
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
- return null;
+ return SourceProducer.super.scanUnsafe(key);
}
static final class GenerateSubscription
Index: 3rdParty_sources/reactor/reactor/core/publisher/FluxGroupJoin.java
===================================================================
diff -u -r03ee7b3a8cdecd210e54619377d06f9c7cb4014b -r683b1a0cbc595fe30e200900d081c88105aae63c
--- 3rdParty_sources/reactor/reactor/core/publisher/FluxGroupJoin.java (.../FluxGroupJoin.java) (revision 03ee7b3a8cdecd210e54619377d06f9c7cb4014b)
+++ 3rdParty_sources/reactor/reactor/core/publisher/FluxGroupJoin.java (.../FluxGroupJoin.java) (revision 683b1a0cbc595fe30e200900d081c88105aae63c)
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -81,7 +81,7 @@
Supplier extends Queue