Index: lams_common/src/java/org/lamsfoundation/lams/commonContext.xml
===================================================================
diff -u -r5a5a88e2992d006dbfd34513422ad6262f7222ae -r551d792837f89602da3be00578e9d474decc6605
--- lams_common/src/java/org/lamsfoundation/lams/commonContext.xml (.../commonContext.xml) (revision 5a5a88e2992d006dbfd34513422ad6262f7222ae)
+++ lams_common/src/java/org/lamsfoundation/lams/commonContext.xml (.../commonContext.xml) (revision 551d792837f89602da3be00578e9d474decc6605)
@@ -18,10 +18,14 @@
-
-
+
+
+
+
-
@@ -811,4 +815,4 @@
-
+
\ No newline at end of file
Index: lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java
===================================================================
diff -u -re88a78540d7b5e9e6cdbe66d04fc861f647dc121 -r551d792837f89602da3be00578e9d474decc6605
--- lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision e88a78540d7b5e9e6cdbe66d04fc861f647dc121)
+++ lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision 551d792837f89602da3be00578e9d474decc6605)
@@ -32,16 +32,16 @@
// only for logging purposes
private final String name;
- private final Flux source;
+ private final SharedSink source;
private final BiPredicate itemEqualsPredicate;
private final Function fetchFunction;
// default timeout is null, i.e. never expire
private final Integer timeoutSeconds;
// default throttle time is null, i.e. no throttling
private final Integer throttleSeconds;
- public FluxMap(String name, Flux source, BiPredicate itemEqualsPredicate, Function fetchFunction,
- Integer throttleSeconds, Integer timeoutSeconds) {
+ public FluxMap(String name, SharedSink source, BiPredicate itemEqualsPredicate,
+ Function fetchFunction, Integer throttleSeconds, Integer timeoutSeconds) {
this.name = name;
this.source = source;
this.itemEqualsPredicate = itemEqualsPredicate;
@@ -66,7 +66,7 @@
}
// filter out signals which do not match the key
- Flux filteringFlux = source.filter(item -> itemEqualsPredicate.test(item, key));
+ Flux filteringFlux = source.getFlux().filter(item -> itemEqualsPredicate.test(item, key));
// do not emit more often than this amount of time
if (throttleSeconds != null) {
@@ -79,10 +79,15 @@
filteringFlux = filteringFlux.handle((item, sink) -> {
int counter = subscriberCounter.get();
if (counter <= 0) {
+ // all subscribers are gone, complete the Flux
+ sink.complete();
+ // remove Flux from mapping
+ synchronized (map) {
+ map.remove(key);
+ }
if (log.isDebugEnabled()) {
- log.debug("Completing flux with no subscribers for \"" + name + "\" with key " + key);
+ log.debug("Removed flux with no subscribers for \"" + name + "\" with key " + key);
}
- sink.complete();
return;
}
sink.next(item);
@@ -99,9 +104,10 @@
if (counter <= 0) {
if (log.isDebugEnabled()) {
- log.debug("Removing flux with no subscribers for \"" + name + "\" with key " + key);
+ log.debug("No subscribers left for flux for \"" + name + "\" with key " + key);
}
- map.remove(key);
+ // send final signal so Flux can complete
+ source.emit(key);
}
};
Index: lams_common/src/java/org/lamsfoundation/lams/flux/FluxRegistry.java
===================================================================
diff -u -r976df526805398d7daa7d49e4e11b81272ec3cb5 -r551d792837f89602da3be00578e9d474decc6605
--- lams_common/src/java/org/lamsfoundation/lams/flux/FluxRegistry.java (.../FluxRegistry.java) (revision 976df526805398d7daa7d49e4e11b81272ec3cb5)
+++ lams_common/src/java/org/lamsfoundation/lams/flux/FluxRegistry.java (.../FluxRegistry.java) (revision 551d792837f89602da3be00578e9d474decc6605)
@@ -1,15 +1,14 @@
package org.lamsfoundation.lams.flux;
+import org.apache.log4j.Logger;
+import reactor.core.publisher.Flux;
+
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiPredicate;
import java.util.function.Function;
-import org.apache.log4j.Logger;
-
-import reactor.core.publisher.Flux;
-
public class FluxRegistry {
private static Logger log = Logger.getLogger(FluxMap.class.getName());
@@ -27,6 +26,9 @@
return sink;
}
sink = new SharedSink<>(sinkName);
+ if (log.isDebugEnabled()) {
+ log.debug("Created new sink for name \"" + sinkName + "\"");
+ }
sinkRegistry.put(sinkName, sink);
return sink;
}
@@ -47,6 +49,9 @@
}
if (!sourceSinkBindings.containsKey(targetSinkName)) {
sourceSinkBindings.put(targetSinkName, emitTransformer);
+ if (log.isDebugEnabled()) {
+ log.debug("Bound sink with name \"" + sourceSinkName + " to sink with name \"" + targetSinkName + "\"");
+ }
}
}
@@ -65,9 +70,12 @@
if (itemEqualsPredicate == null) {
itemEqualsPredicate = (item, key) -> item.equals(key);
}
- FluxMap fluxMap = new FluxMap<>(fluxName, sink.getFlux(), itemEqualsPredicate, fetchFunction,
- throttleSeconds, timeoutSeconds);
+ FluxMap fluxMap = new FluxMap<>(fluxName, sink, itemEqualsPredicate, fetchFunction, throttleSeconds,
+ timeoutSeconds);
fluxRegistry.put(fluxName, fluxMap);
+ if (log.isDebugEnabled()) {
+ log.debug("Initialised FluxMap for \"" + fluxName + "\"");
+ }
}
@SuppressWarnings({ "unchecked" })
Index: lams_monitoring/src/java/org/lamsfoundation/lams/monitoring/web/MonitoringController.java
===================================================================
diff -u -re88a78540d7b5e9e6cdbe66d04fc861f647dc121 -r551d792837f89602da3be00578e9d474decc6605
--- lams_monitoring/src/java/org/lamsfoundation/lams/monitoring/web/MonitoringController.java (.../MonitoringController.java) (revision e88a78540d7b5e9e6cdbe66d04fc861f647dc121)
+++ lams_monitoring/src/java/org/lamsfoundation/lams/monitoring/web/MonitoringController.java (.../MonitoringController.java) (revision 551d792837f89602da3be00578e9d474decc6605)
@@ -49,7 +49,6 @@
import org.lamsfoundation.lams.learningdesign.ToolActivity;
import org.lamsfoundation.lams.learningdesign.Transition;
import org.lamsfoundation.lams.learningdesign.dao.IActivityDAO;
-import org.lamsfoundation.lams.learningdesign.dao.ILearningDesignDAO;
import org.lamsfoundation.lams.learningdesign.exception.LearningDesignException;
import org.lamsfoundation.lams.learningdesign.service.ILearningDesignService;
import org.lamsfoundation.lams.lesson.LearnerProgress;
@@ -75,7 +74,6 @@
import org.lamsfoundation.lams.usermanagement.WorkspaceFolder;
import org.lamsfoundation.lams.usermanagement.dto.UserDTO;
import org.lamsfoundation.lams.usermanagement.exception.UserException;
-import org.lamsfoundation.lams.usermanagement.exception.WorkspaceFolderException;
import org.lamsfoundation.lams.usermanagement.service.IUserManagementService;
import org.lamsfoundation.lams.util.CommonConstants;
import org.lamsfoundation.lams.util.DateUtil;
@@ -88,6 +86,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.MediaType;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -111,10 +110,8 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
@@ -142,6 +139,10 @@
private static final int USER_PAGE_SIZE = 10;
@Autowired
+ @Qualifier("serverTaskExecutor")
+ private ThreadPoolTaskExecutor serverTaskExecutor;
+
+ @Autowired
private ILogEventService logEventService;
@Autowired
private ILearningDesignService learningDesignService;
@@ -1005,6 +1006,13 @@
@RequestMapping("/monitorLesson")
public String monitorLesson(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
+ if (log.isDebugEnabled()) {
+ // temporary (?) debugging information on thread pool used for async requests
+ StringBuilder executorLog = new StringBuilder();
+ executorLog.append("Active thread count: " + serverTaskExecutor.getActiveCount());
+ executorLog.append(" / Pool size: " + serverTaskExecutor.getPoolSize());
+ log.debug(executorLog.toString());
+ }
Long lessonId = WebUtil.readLongParam(request, AttributeNames.PARAM_LESSON_ID);
LessonDetailsDTO lessonDTO = lessonService.getLessonDetails(lessonId);