Index: lams_common/src/java/org/lamsfoundation/lams/commonContext.xml =================================================================== diff -u -r5a5a88e2992d006dbfd34513422ad6262f7222ae -r551d792837f89602da3be00578e9d474decc6605 --- lams_common/src/java/org/lamsfoundation/lams/commonContext.xml (.../commonContext.xml) (revision 5a5a88e2992d006dbfd34513422ad6262f7222ae) +++ lams_common/src/java/org/lamsfoundation/lams/commonContext.xml (.../commonContext.xml) (revision 551d792837f89602da3be00578e9d474decc6605) @@ -18,10 +18,14 @@ - - + + + + - @@ -811,4 +815,4 @@ - + \ No newline at end of file Index: lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java =================================================================== diff -u -re88a78540d7b5e9e6cdbe66d04fc861f647dc121 -r551d792837f89602da3be00578e9d474decc6605 --- lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision e88a78540d7b5e9e6cdbe66d04fc861f647dc121) +++ lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision 551d792837f89602da3be00578e9d474decc6605) @@ -32,16 +32,16 @@ // only for logging purposes private final String name; - private final Flux source; + private final SharedSink source; private final BiPredicate itemEqualsPredicate; private final Function fetchFunction; // default timeout is null, i.e. never expire private final Integer timeoutSeconds; // default throttle time is null, i.e. no throttling private final Integer throttleSeconds; - public FluxMap(String name, Flux source, BiPredicate itemEqualsPredicate, Function fetchFunction, - Integer throttleSeconds, Integer timeoutSeconds) { + public FluxMap(String name, SharedSink source, BiPredicate itemEqualsPredicate, + Function fetchFunction, Integer throttleSeconds, Integer timeoutSeconds) { this.name = name; this.source = source; this.itemEqualsPredicate = itemEqualsPredicate; @@ -66,7 +66,7 @@ } // filter out signals which do not match the key - Flux filteringFlux = source.filter(item -> itemEqualsPredicate.test(item, key)); + Flux filteringFlux = source.getFlux().filter(item -> itemEqualsPredicate.test(item, key)); // do not emit more often than this amount of time if (throttleSeconds != null) { @@ -79,10 +79,15 @@ filteringFlux = filteringFlux.handle((item, sink) -> { int counter = subscriberCounter.get(); if (counter <= 0) { + // all subscribers are gone, complete the Flux + sink.complete(); + // remove Flux from mapping + synchronized (map) { + map.remove(key); + } if (log.isDebugEnabled()) { - log.debug("Completing flux with no subscribers for \"" + name + "\" with key " + key); + log.debug("Removed flux with no subscribers for \"" + name + "\" with key " + key); } - sink.complete(); return; } sink.next(item); @@ -99,9 +104,10 @@ if (counter <= 0) { if (log.isDebugEnabled()) { - log.debug("Removing flux with no subscribers for \"" + name + "\" with key " + key); + log.debug("No subscribers left for flux for \"" + name + "\" with key " + key); } - map.remove(key); + // send final signal so Flux can complete + source.emit(key); } }; Index: lams_common/src/java/org/lamsfoundation/lams/flux/FluxRegistry.java =================================================================== diff -u -r976df526805398d7daa7d49e4e11b81272ec3cb5 -r551d792837f89602da3be00578e9d474decc6605 --- lams_common/src/java/org/lamsfoundation/lams/flux/FluxRegistry.java (.../FluxRegistry.java) (revision 976df526805398d7daa7d49e4e11b81272ec3cb5) +++ lams_common/src/java/org/lamsfoundation/lams/flux/FluxRegistry.java (.../FluxRegistry.java) (revision 551d792837f89602da3be00578e9d474decc6605) @@ -1,15 +1,14 @@ package org.lamsfoundation.lams.flux; +import org.apache.log4j.Logger; +import reactor.core.publisher.Flux; + import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiPredicate; import java.util.function.Function; -import org.apache.log4j.Logger; - -import reactor.core.publisher.Flux; - public class FluxRegistry { private static Logger log = Logger.getLogger(FluxMap.class.getName()); @@ -27,6 +26,9 @@ return sink; } sink = new SharedSink<>(sinkName); + if (log.isDebugEnabled()) { + log.debug("Created new sink for name \"" + sinkName + "\""); + } sinkRegistry.put(sinkName, sink); return sink; } @@ -47,6 +49,9 @@ } if (!sourceSinkBindings.containsKey(targetSinkName)) { sourceSinkBindings.put(targetSinkName, emitTransformer); + if (log.isDebugEnabled()) { + log.debug("Bound sink with name \"" + sourceSinkName + " to sink with name \"" + targetSinkName + "\""); + } } } @@ -65,9 +70,12 @@ if (itemEqualsPredicate == null) { itemEqualsPredicate = (item, key) -> item.equals(key); } - FluxMap fluxMap = new FluxMap<>(fluxName, sink.getFlux(), itemEqualsPredicate, fetchFunction, - throttleSeconds, timeoutSeconds); + FluxMap fluxMap = new FluxMap<>(fluxName, sink, itemEqualsPredicate, fetchFunction, throttleSeconds, + timeoutSeconds); fluxRegistry.put(fluxName, fluxMap); + if (log.isDebugEnabled()) { + log.debug("Initialised FluxMap for \"" + fluxName + "\""); + } } @SuppressWarnings({ "unchecked" }) Index: lams_monitoring/src/java/org/lamsfoundation/lams/monitoring/web/MonitoringController.java =================================================================== diff -u -re88a78540d7b5e9e6cdbe66d04fc861f647dc121 -r551d792837f89602da3be00578e9d474decc6605 --- lams_monitoring/src/java/org/lamsfoundation/lams/monitoring/web/MonitoringController.java (.../MonitoringController.java) (revision e88a78540d7b5e9e6cdbe66d04fc861f647dc121) +++ lams_monitoring/src/java/org/lamsfoundation/lams/monitoring/web/MonitoringController.java (.../MonitoringController.java) (revision 551d792837f89602da3be00578e9d474decc6605) @@ -49,7 +49,6 @@ import org.lamsfoundation.lams.learningdesign.ToolActivity; import org.lamsfoundation.lams.learningdesign.Transition; import org.lamsfoundation.lams.learningdesign.dao.IActivityDAO; -import org.lamsfoundation.lams.learningdesign.dao.ILearningDesignDAO; import org.lamsfoundation.lams.learningdesign.exception.LearningDesignException; import org.lamsfoundation.lams.learningdesign.service.ILearningDesignService; import org.lamsfoundation.lams.lesson.LearnerProgress; @@ -75,7 +74,6 @@ import org.lamsfoundation.lams.usermanagement.WorkspaceFolder; import org.lamsfoundation.lams.usermanagement.dto.UserDTO; import org.lamsfoundation.lams.usermanagement.exception.UserException; -import org.lamsfoundation.lams.usermanagement.exception.WorkspaceFolderException; import org.lamsfoundation.lams.usermanagement.service.IUserManagementService; import org.lamsfoundation.lams.util.CommonConstants; import org.lamsfoundation.lams.util.DateUtil; @@ -88,6 +86,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.MediaType; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -111,10 +110,8 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -142,6 +139,10 @@ private static final int USER_PAGE_SIZE = 10; @Autowired + @Qualifier("serverTaskExecutor") + private ThreadPoolTaskExecutor serverTaskExecutor; + + @Autowired private ILogEventService logEventService; @Autowired private ILearningDesignService learningDesignService; @@ -1005,6 +1006,13 @@ @RequestMapping("/monitorLesson") public String monitorLesson(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + if (log.isDebugEnabled()) { + // temporary (?) debugging information on thread pool used for async requests + StringBuilder executorLog = new StringBuilder(); + executorLog.append("Active thread count: " + serverTaskExecutor.getActiveCount()); + executorLog.append(" / Pool size: " + serverTaskExecutor.getPoolSize()); + log.debug(executorLog.toString()); + } Long lessonId = WebUtil.readLongParam(request, AttributeNames.PARAM_LESSON_ID); LessonDetailsDTO lessonDTO = lessonService.getLessonDetails(lessonId);