Index: idea_project/.idea/libraries/3rdParty.xml
===================================================================
diff -u -r0830e8691458c7833c2aa84cfd18806b34daeaa8 -rbaf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5
--- idea_project/.idea/libraries/3rdParty.xml (.../3rdParty.xml) (revision 0830e8691458c7833c2aa84cfd18806b34daeaa8)
+++ idea_project/.idea/libraries/3rdParty.xml (.../3rdParty.xml) (revision baf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5)
@@ -55,6 +55,7 @@
+
\ No newline at end of file
Index: lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java
===================================================================
diff -u -r6af10a84f90327690b640c49390c0c4084353cdd -rbaf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5
--- lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision 6af10a84f90327690b640c49390c0c4084353cdd)
+++ lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision baf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5)
@@ -1,5 +1,6 @@
package org.lamsfoundation.lams.flux;
+import java.sql.Time;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -11,13 +12,12 @@
import org.apache.log4j.Logger;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
- * Utility class for serving updates to shared Fluxes.
- * It receives signals from a source Flux, probably a part of SharedSink.
- * For each requested key it creates a hot publisher Flux which fetches data for the key.
- * Supports time-based throttling.
- * If the sink does not produce any matching signals for given timeout, the flux gets removed.
+ * Utility class for serving updates to shared Fluxes. It receives signals from a source Flux, probably a part of
+ * SharedSink. For each requested key it creates a hot publisher Flux which fetches data for the key. Supports
+ * time-based throttling. If the sink does not produce any matching signals for given timeout, the flux gets removed.
*
* @author Marcin Cieslak
*/
@@ -125,18 +125,25 @@
// remove Flux when source Flux did not emit an accepted signal before given timeout
if (timeoutSeconds != null) {
- flux = flux.timeout(Duration.ofSeconds(timeoutSeconds)).onErrorResume(TimeoutException.class,
- throwable -> {
- if (log.isDebugEnabled()) {
- log.debug("Removing timed out flux for \"" + name + "\" with key " + key);
- }
- // remove terminated Flux from the map
- map.remove(key);
- // switch subscribers to a dummy Flux which completes and cancels their subscriptions
- return Flux.empty();
- });
+ flux = flux.timeout(Duration.ofSeconds(timeoutSeconds));
}
+ // backpressure and error handling
+ flux = flux.onBackpressureLatest().onErrorResume(throwable -> {
+ if (throwable instanceof TimeoutException) {
+ if (log.isDebugEnabled()) {
+ log.debug("Removing timed out flux for \"" + name + "\" with key " + key);
+ }
+ } else {
+ log.error("Error while processing flux for \"" + name + "\" with key " + key, throwable);
+ }
+
+ // remove terminated Flux from the map
+ map.remove(key);
+ // switch subscribers to a dummy Flux which completes and cancels their subscriptions
+ return Flux.empty();
+ });
+
map.put(key, flux);
}
Index: lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java
===================================================================
diff -u -r6af10a84f90327690b640c49390c0c4084353cdd -rbaf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5
--- lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java (.../SharedSink.java) (revision 6af10a84f90327690b640c49390c0c4084353cdd)
+++ lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java (.../SharedSink.java) (revision baf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5)
@@ -2,13 +2,15 @@
import org.apache.log4j.Logger;
+import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
/**
- * This class allows both sink functionality (manually pushing elements to Flux)
- * and hot publisher (so multiple other Fluxes can use it as source).
+ * This class allows both sink functionality (manually pushing elements to Flux) and hot publisher (so multiple other
+ * Fluxes can use it as source).
*
* @author Marcin Cieslak
*/
@@ -24,7 +26,7 @@
log.debug("Created sink for \"" + name + "\"");
}
sink = Sinks.many().replay().latest();
- flux = sink.asFlux().doFinally((signalType) -> {
+ flux = sink.asFlux().onBackpressureLatest().doFinally((signalType) -> {
if (log.isDebugEnabled()) {
log.debug("Terminated (" + signalType + ") sink for \"" + name + "\"");
}