Index: lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java =================================================================== diff -u -r20e62542c73330b5ccd425dc727597660abdd129 -r6af10a84f90327690b640c49390c0c4084353cdd --- lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision 20e62542c73330b5ccd425dc727597660abdd129) +++ lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision 6af10a84f90327690b640c49390c0c4084353cdd) @@ -118,8 +118,11 @@ log.debug("Cancelling (" + counter + ") subscription to flux for \"" + name + "\" with key " + key); } - }); + }) + // detach all subscribers when flux is complete + .onTerminateDetach(); + // remove Flux when source Flux did not emit an accepted signal before given timeout if (timeoutSeconds != null) { flux = flux.timeout(Duration.ofSeconds(timeoutSeconds)).onErrorResume(TimeoutException.class, Index: lams_common/src/java/org/lamsfoundation/lams/flux/FluxRegistry.java =================================================================== diff -u -rd65bab224e24187e92bdc65393320d915b7b1e9f -r6af10a84f90327690b640c49390c0c4084353cdd --- lams_common/src/java/org/lamsfoundation/lams/flux/FluxRegistry.java (.../FluxRegistry.java) (revision d65bab224e24187e92bdc65393320d915b7b1e9f) +++ lams_common/src/java/org/lamsfoundation/lams/flux/FluxRegistry.java (.../FluxRegistry.java) (revision 6af10a84f90327690b640c49390c0c4084353cdd) @@ -6,9 +6,13 @@ import java.util.function.BiPredicate; import java.util.function.Function; +import org.apache.log4j.Logger; + import reactor.core.publisher.Flux; public class FluxRegistry { + private static Logger log = Logger.getLogger(FluxMap.class.getName()); + @SuppressWarnings("rawtypes") private static final Map fluxRegistry = new ConcurrentHashMap<>(); @SuppressWarnings("rawtypes") @@ -50,8 +54,10 @@ public static void initFluxMap(String fluxName, String sinkName, BiPredicate itemEqualsPredicate, Function fetchFunction, Integer throttleSeconds, Integer timeoutSeconds) { if (fluxRegistry.containsKey(fluxName)) { - throw new IllegalArgumentException("FluxMap for \"" + fluxName + "\" was already initialised"); + log.warn("FluxMap for \"" + fluxName + "\" was already initialised"); + return; } + SharedSink sink = sinkRegistry.get(sinkName); if (sink == null) { sink = FluxRegistry.getSink(sinkName); @@ -90,6 +96,16 @@ for (Entry binding : sinkBindings.entrySet()) { FluxRegistry.emit(binding.getKey(), binding.getValue().apply(item)); } + } + @SuppressWarnings("rawtypes") + public static void shutdown() { + for (SharedSink sink : sinkRegistry.values()) { + try { + sink.shutdown(); + } catch (Exception e) { + log.warn("Exception while closing Flux sink: " + e.getMessage()); + } + } } } \ No newline at end of file Index: lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java =================================================================== diff -u -r4dbd12afbfcb6eb768ceb772768779e7619dc2f8 -r6af10a84f90327690b640c49390c0c4084353cdd --- lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java (.../SharedSink.java) (revision 4dbd12afbfcb6eb768ceb772768779e7619dc2f8) +++ lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java (.../SharedSink.java) (revision 6af10a84f90327690b640c49390c0c4084353cdd) @@ -4,6 +4,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; /** * This class allows both sink functionality (manually pushing elements to Flux) @@ -37,4 +38,8 @@ public Flux getFlux() { return flux; } + + public void shutdown() { + sink.emitComplete(EmitFailureHandler.FAIL_FAST); + } } \ No newline at end of file Index: lams_common/src/java/org/lamsfoundation/lams/web/filter/LamsContextLoaderListener.java =================================================================== diff -u -r9986a8084b318ccc9c025ec56407dc09a705dd60 -r6af10a84f90327690b640c49390c0c4084353cdd --- lams_common/src/java/org/lamsfoundation/lams/web/filter/LamsContextLoaderListener.java (.../LamsContextLoaderListener.java) (revision 9986a8084b318ccc9c025ec56407dc09a705dd60) +++ lams_common/src/java/org/lamsfoundation/lams/web/filter/LamsContextLoaderListener.java (.../LamsContextLoaderListener.java) (revision 6af10a84f90327690b640c49390c0c4084353cdd) @@ -1,7 +1,9 @@ package org.lamsfoundation.lams.web.filter; import javax.servlet.ServletContext; +import javax.servlet.ServletContextEvent; +import org.lamsfoundation.lams.flux.FluxRegistry; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.web.context.ContextLoaderListener; @@ -29,4 +31,15 @@ } return parentContext; } + + /** + * Close the root web application context. + */ + @Override + public void contextDestroyed(ServletContextEvent event) { + // close all sinks and underlying fluxes + FluxRegistry.shutdown(); + + super.contextDestroyed(event); + } } \ No newline at end of file