Index: idea_project/.idea/libraries/3rdParty.xml =================================================================== diff -u -r0830e8691458c7833c2aa84cfd18806b34daeaa8 -rbaf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5 --- idea_project/.idea/libraries/3rdParty.xml (.../3rdParty.xml) (revision 0830e8691458c7833c2aa84cfd18806b34daeaa8) +++ idea_project/.idea/libraries/3rdParty.xml (.../3rdParty.xml) (revision baf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5) @@ -55,6 +55,7 @@ + \ No newline at end of file Index: lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java =================================================================== diff -u -r6af10a84f90327690b640c49390c0c4084353cdd -rbaf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5 --- lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision 6af10a84f90327690b640c49390c0c4084353cdd) +++ lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision baf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5) @@ -1,5 +1,6 @@ package org.lamsfoundation.lams.flux; +import java.sql.Time; import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -11,13 +12,12 @@ import org.apache.log4j.Logger; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** - * Utility class for serving updates to shared Fluxes. - * It receives signals from a source Flux, probably a part of SharedSink. - * For each requested key it creates a hot publisher Flux which fetches data for the key. - * Supports time-based throttling. - * If the sink does not produce any matching signals for given timeout, the flux gets removed. + * Utility class for serving updates to shared Fluxes. It receives signals from a source Flux, probably a part of + * SharedSink. For each requested key it creates a hot publisher Flux which fetches data for the key. Supports + * time-based throttling. If the sink does not produce any matching signals for given timeout, the flux gets removed. * * @author Marcin Cieslak */ @@ -125,18 +125,25 @@ // remove Flux when source Flux did not emit an accepted signal before given timeout if (timeoutSeconds != null) { - flux = flux.timeout(Duration.ofSeconds(timeoutSeconds)).onErrorResume(TimeoutException.class, - throwable -> { - if (log.isDebugEnabled()) { - log.debug("Removing timed out flux for \"" + name + "\" with key " + key); - } - // remove terminated Flux from the map - map.remove(key); - // switch subscribers to a dummy Flux which completes and cancels their subscriptions - return Flux.empty(); - }); + flux = flux.timeout(Duration.ofSeconds(timeoutSeconds)); } + // backpressure and error handling + flux = flux.onBackpressureLatest().onErrorResume(throwable -> { + if (throwable instanceof TimeoutException) { + if (log.isDebugEnabled()) { + log.debug("Removing timed out flux for \"" + name + "\" with key " + key); + } + } else { + log.error("Error while processing flux for \"" + name + "\" with key " + key, throwable); + } + + // remove terminated Flux from the map + map.remove(key); + // switch subscribers to a dummy Flux which completes and cancels their subscriptions + return Flux.empty(); + }); + map.put(key, flux); } Index: lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java =================================================================== diff -u -r6af10a84f90327690b640c49390c0c4084353cdd -rbaf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5 --- lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java (.../SharedSink.java) (revision 6af10a84f90327690b640c49390c0c4084353cdd) +++ lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java (.../SharedSink.java) (revision baf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5) @@ -2,13 +2,15 @@ import org.apache.log4j.Logger; +import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; /** - * This class allows both sink functionality (manually pushing elements to Flux) - * and hot publisher (so multiple other Fluxes can use it as source). + * This class allows both sink functionality (manually pushing elements to Flux) and hot publisher (so multiple other + * Fluxes can use it as source). * * @author Marcin Cieslak */ @@ -24,7 +26,7 @@ log.debug("Created sink for \"" + name + "\""); } sink = Sinks.many().replay().latest(); - flux = sink.asFlux().doFinally((signalType) -> { + flux = sink.asFlux().onBackpressureLatest().doFinally((signalType) -> { if (log.isDebugEnabled()) { log.debug("Terminated (" + signalType + ") sink for \"" + name + "\""); }