Index: lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java =================================================================== diff -u -r0d071e756d526e552dace9143c013ca5f9537692 -r88e06a551e39097393f1716da6eb5b7774752568 --- lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision 0d071e756d526e552dace9143c013ca5f9537692) +++ lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision 88e06a551e39097393f1716da6eb5b7774752568) @@ -10,6 +10,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiPredicate; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -25,7 +26,7 @@ public static final int SHORT_THROTTLE = 5; public static final int STANDARD_THROTTLE = 10; public static final int LONG_THROTTLE = 30; - public static final int STANDARD_TIMEOUT = 5 * 60; + public static final int STANDARD_TIMEOUT = 60; private final Map> map; @@ -79,15 +80,31 @@ int counter = subscriberCounter.get(); if (counter <= 0) { if (log.isDebugEnabled()) { - log.debug("Completing and removing flux for \"" + name + "\" with key " + key); + log.debug("Completing flux with no subscribers for \"" + name + "\" with key " + key); } sink.complete(); - map.remove(key); return; } sink.next(item); }); + // function to run when subscription is terminated is the same for cancel and complete + Consumer subscriptionTerminateConsumer = (closeType) -> { + int counter = subscriberCounter.decrementAndGet(); + + if (log.isDebugEnabled()) { + log.debug("Terminated (" + closeType + ") subscription #" + (counter + 1) + " to flux for \"" + + name + "\" with key " + key); + } + + if (counter <= 0) { + if (log.isDebugEnabled()) { + log.debug("Removing flux with no subscribers for \"" + name + "\" with key " + key); + } + map.remove(key); + } + }; + // map items from sink to fetch function result flux = filteringFlux.map(item -> fetchFunction.apply(item)) // push initial value to the Flux so data is available immediately after subscribing @@ -99,25 +116,18 @@ // just some logging .doOnSubscribe(subscription -> { int counter = subscriberCounter.incrementAndGet(); - if (counter <= 0) { - subscriberCounter.set(1); - counter = 1; - } if (log.isDebugEnabled()) { - log.debug("Subscribed (" + counter + ") to flux \"" + name + "\" with key " + key); + log.debug("Create subscription #" + counter + " to flux for \"" + name + "\" with key " + + key); } }) - // just some logging - .doOnCancel(() -> { - int counter = subscriberCounter.decrementAndGet(); + // just some logging when subscription is cancelled, for example on timeout + .doOnCancel(() -> subscriptionTerminateConsumer.accept("cancel")) - if (log.isDebugEnabled()) { - log.debug("Cancelling (" + counter + ") subscription to flux for \"" + name - + "\" with key " + key); - } - }) + // just some logging when subscription is completed + .doOnComplete(() -> subscriptionTerminateConsumer.accept("complete")) // detach all subscribers when flux is complete .onTerminateDetach(); @@ -131,15 +141,14 @@ flux = flux.onBackpressureLatest().onErrorResume(throwable -> { if (throwable instanceof TimeoutException) { if (log.isDebugEnabled()) { - log.debug("Removing timed out flux for \"" + name + "\" with key " + key); + log.debug("Removing timed out subscription to flux for \"" + name + "\" with key " + key); } } else { - log.error("Error while processing flux for \"" + name + "\" with key " + key, throwable); + log.error("Error while processing subscription for flux for \"" + name + "\" with key " + key, + throwable); } - // remove terminated Flux from the map - map.remove(key); - // switch subscribers to a dummy Flux which completes and cancels their subscriptions + // switch subscriber to a dummy Flux which completes their subscription return Flux.empty(); });