Index: lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java =================================================================== diff -u -rf7b8a16c38d3569a3a675eaaef3be8d3a99e74ff -r4dbd12afbfcb6eb768ceb772768779e7619dc2f8 --- lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision f7b8a16c38d3569a3a675eaaef3be8d3a99e74ff) +++ lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (.../FluxMap.java) (revision 4dbd12afbfcb6eb768ceb772768779e7619dc2f8) @@ -1,6 +1,7 @@ -package org.lamsfoundation.lams.util; +package org.lamsfoundation.lams.flux; import java.time.Duration; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -19,43 +20,44 @@ * * @author Marcin Cieslak */ -public class FluxMap extends ConcurrentHashMap> { - private static final long serialVersionUID = -8271928054306412858L; - +public class FluxMap { private static Logger log = Logger.getLogger(FluxMap.class.getName()); public static final int STANDARD_THROTTLE = 1; public static final int STANDARD_TIMEOUT = 5 * 60; + private final Map> map; + // only for logging purposes - private final String operationDescription; + private final String name; private final Flux source; // default timeout is null, i.e. never expire private final Integer timeoutSeconds; // default throttle time is null, i.e. no throttling private final Integer throttleSeconds; private final Function fetchFunction; - public FluxMap(String operationDescritpion, Flux source, Function fetchFunction, Integer throttleSeconds, + public FluxMap(String name, Flux source, Function fetchFunction, Integer throttleSeconds, Integer timeoutSeconds) { - this.operationDescription = operationDescritpion; + this.name = name; this.source = source; this.fetchFunction = fetchFunction; this.throttleSeconds = throttleSeconds; this.timeoutSeconds = timeoutSeconds; + this.map = new ConcurrentHashMap<>(); } /** * Get a hot publisher Flux for the given key. */ - public Flux getFlux(T key) { + public Flux get(T key) { // try to get existing Flux - Flux flux = get(key); + Flux flux = map.get(key); if (flux == null) { // create a new Flux if (log.isDebugEnabled()) { - log.debug("Creating new flux for \"" + operationDescription + "\" with key " + key); + log.debug("Creating new flux for \"" + name + "\" with key " + key); } // filter out signals which do not match the key @@ -73,10 +75,10 @@ int counter = subscriberCounter.get(); if (counter <= 0) { if (log.isDebugEnabled()) { - log.debug("Completing and removing flux for \"" + operationDescription + "\" with key " + key); + log.debug("Completing and removing flux for \"" + name + "\" with key " + key); } sink.complete(); - remove(key); + map.remove(key); return; } sink.next(item); @@ -99,7 +101,7 @@ } if (log.isDebugEnabled()) { - log.debug("Subscribed (" + counter + ") to flux \"" + operationDescription + "\" with key " + log.debug("Subscribed (" + counter + ") to flux \"" + name + "\" with key " + key); } }) @@ -109,7 +111,7 @@ int counter = subscriberCounter.decrementAndGet(); if (log.isDebugEnabled()) { - log.debug("Cancelling (" + counter + ") subscription to flux for \"" + operationDescription + log.debug("Cancelling (" + counter + ") subscription to flux for \"" + name + "\" with key " + key); } }); @@ -120,16 +122,16 @@ throwable -> { if (log.isDebugEnabled()) { log.debug( - "Removing timed out flux for \"" + operationDescription + "\" with key " + key); + "Removing timed out flux for \"" + name + "\" with key " + key); } // remove terminated Flux from the map - remove(key); + map.remove(key); // switch subscribers to a dummy Flux which completes and cancels their subscriptions return Flux.empty(); }); } - put(key, flux); + map.put(key, flux); } return flux; Index: lams_common/src/java/org/lamsfoundation/lams/flux/FluxRegistry.java =================================================================== diff -u --- lams_common/src/java/org/lamsfoundation/lams/flux/FluxRegistry.java (revision 0) +++ lams_common/src/java/org/lamsfoundation/lams/flux/FluxRegistry.java (revision 4dbd12afbfcb6eb768ceb772768779e7619dc2f8) @@ -0,0 +1,49 @@ +package org.lamsfoundation.lams.flux; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +import reactor.core.publisher.Flux; + +public class FluxRegistry { + private static final Map fluxRegistry = new ConcurrentHashMap<>(); + private static final Map sinkRegistry = new ConcurrentHashMap<>(); + + public static void initSink(String sinkName) { + if (sinkRegistry.containsKey(sinkName)) { + throw new IllegalArgumentException("Sink for \"" + sinkName + "\" was already initialised"); + } + SharedSink sink = new SharedSink<>(sinkName); + sinkRegistry.put(sinkName, sink); + } + + public static void initFluxMap(String fluxName, String sinkName, Function fetchFunction, + Integer throttleSeconds, Integer timeoutSeconds) { + if (fluxRegistry.containsKey(fluxName)) { + throw new IllegalArgumentException("FluxMap for \"" + fluxName + "\" was already initialised"); + } + SharedSink sink = sinkRegistry.get(sinkName); + if (sink == null) { + throw new IllegalArgumentException("Sink for \"" + sinkName + "\" was not initialised"); + } + FluxMap fluxMap = new FluxMap<>(fluxName, sink.getFlux(), fetchFunction, throttleSeconds, timeoutSeconds); + fluxRegistry.put(fluxName, fluxMap); + } + + public static Flux get(String fluxName, T key) { + FluxMap fluxMap = fluxRegistry.get(fluxName); + if (fluxMap == null) { + throw new IllegalArgumentException("FluxMap for \"" + fluxName + "\" was not initialised"); + } + return fluxMap.get(key); + } + + public static void emit(String sinkName, T item) { + SharedSink sink = sinkRegistry.get(sinkName); + if (sink == null) { + throw new IllegalArgumentException("Sink for \"" + sinkName + "\" was not initialised"); + } + sink.emit(item); + } +} \ No newline at end of file Index: lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java =================================================================== diff -u -rf7b8a16c38d3569a3a675eaaef3be8d3a99e74ff -r4dbd12afbfcb6eb768ceb772768779e7619dc2f8 --- lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java (.../SharedSink.java) (revision f7b8a16c38d3569a3a675eaaef3be8d3a99e74ff) +++ lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java (.../SharedSink.java) (revision 4dbd12afbfcb6eb768ceb772768779e7619dc2f8) @@ -1,4 +1,4 @@ -package org.lamsfoundation.lams.util; +package org.lamsfoundation.lams.flux; import org.apache.log4j.Logger; @@ -18,14 +18,14 @@ private final Sinks.Many sink; private final Flux flux; - public SharedSink(String operationDescription) { + public SharedSink(String name) { if (log.isDebugEnabled()) { - log.debug("Created sink for \"" + operationDescription + "\""); + log.debug("Created sink for \"" + name + "\""); } sink = Sinks.many().replay().latest(); flux = sink.asFlux().doFinally((signalType) -> { if (log.isDebugEnabled()) { - log.debug("Terminated (" + signalType + ") sink for \"" + operationDescription + "\""); + log.debug("Terminated (" + signalType + ") sink for \"" + name + "\""); } }).publish().autoConnect(); } Index: lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/AssessmentConstants.java =================================================================== diff -u -rf86d00ef0e3fc17667feabcec9a04b4b3ca4d619 -r4dbd12afbfcb6eb768ceb772768779e7619dc2f8 --- lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/AssessmentConstants.java (.../AssessmentConstants.java) (revision f86d00ef0e3fc17667feabcec9a04b4b3ca4d619) +++ lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/AssessmentConstants.java (.../AssessmentConstants.java) (revision 4dbd12afbfcb6eb768ceb772768779e7619dc2f8) @@ -213,8 +213,13 @@ public static final String CONFIG_KEY_HIDE_TITLES = "hideTitles"; public static final String ATTR_IS_QUESTION_ETHERPAD_ENABLED = "isQuestionEtherpadEnabled"; - + public static final String ATTR_CODE_STYLES = "codeStyles"; public static final String ATTR_ALL_GROUP_USERS = "allGroupUsers"; + + //flux management + public static final String ANSWERS_UPDATED_SINK_NAME = "assessment learner answers update"; + + public static final String COMPLETION_CHARTS_UPDATE_FLUX_NAME = "assessment completion chart update"; } Index: lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/AssessmentServiceImpl.java =================================================================== diff -u -rd0685b6bd29bf47ed67fbc4eb6abcc69f0c1acad -r4dbd12afbfcb6eb768ceb772768779e7619dc2f8 --- lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/AssessmentServiceImpl.java (.../AssessmentServiceImpl.java) (revision d0685b6bd29bf47ed67fbc4eb6abcc69f0c1acad) +++ lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/AssessmentServiceImpl.java (.../AssessmentServiceImpl.java) (revision 4dbd12afbfcb6eb768ceb772768779e7619dc2f8) @@ -58,6 +58,8 @@ import org.lamsfoundation.lams.confidencelevel.VsaAnswerDTO; import org.lamsfoundation.lams.contentrepository.client.IToolContentHandler; import org.lamsfoundation.lams.events.IEventNotificationService; +import org.lamsfoundation.lams.flux.FluxMap; +import org.lamsfoundation.lams.flux.FluxRegistry; import org.lamsfoundation.lams.learning.service.ILearnerService; import org.lamsfoundation.lams.learningdesign.Grouping; import org.lamsfoundation.lams.learningdesign.ToolActivity; @@ -129,11 +131,9 @@ import org.lamsfoundation.lams.usermanagement.dto.UserDTO; import org.lamsfoundation.lams.usermanagement.service.IUserManagementService; import org.lamsfoundation.lams.util.FileUtil; -import org.lamsfoundation.lams.util.FluxMap; import org.lamsfoundation.lams.util.JsonUtil; import org.lamsfoundation.lams.util.MessageService; import org.lamsfoundation.lams.util.NumberUtil; -import org.lamsfoundation.lams.util.SharedSink; import org.lamsfoundation.lams.util.WebUtil; import org.lamsfoundation.lams.util.excel.ExcelCell; import org.lamsfoundation.lams.util.excel.ExcelRow; @@ -198,18 +198,12 @@ private ILearnerInteractionService learnerInteractionService; - // sink that accepts tool content ID for which learners' answers changed - private final SharedSink answersUpdatedSink; - - // flux map for updating completion charts - private final FluxMap completionChartUpdateFluxMap; - public AssessmentServiceImpl() { - answersUpdatedSink = new SharedSink<>("assessment learner answers update"); - - completionChartUpdateFluxMap = new FluxMap<>("assessment completion chart update", answersUpdatedSink.getFlux(), - toolContentId -> getCompletionChartsData(toolContentId), FluxMap.STANDARD_THROTTLE, - FluxMap.STANDARD_TIMEOUT); + FluxRegistry.initSink(AssessmentConstants.ANSWERS_UPDATED_SINK_NAME); + FluxRegistry.initFluxMap(AssessmentConstants.COMPLETION_CHARTS_UPDATE_FLUX_NAME, + AssessmentConstants.ANSWERS_UPDATED_SINK_NAME, + (Function) toolContentId -> getCompletionChartsData(toolContentId), + FluxMap.STANDARD_THROTTLE, FluxMap.STANDARD_TIMEOUT); } // ******************************************************************************* @@ -727,7 +721,7 @@ if (isAnswerModified) { // need to flush so subscribers to sink see new answers in DB assessmentResultDao.flush(); - answersUpdatedSink.emit(assessment.getContentId()); + FluxRegistry.emit(AssessmentConstants.ANSWERS_UPDATED_SINK_NAME, assessment.getContentId()); } return true; @@ -844,7 +838,7 @@ @Override public Flux getCompletionChartsDataFlux(long toolContentId) { - return completionChartUpdateFluxMap.getFlux(toolContentId); + return FluxRegistry.get(AssessmentConstants.COMPLETION_CHARTS_UPDATE_FLUX_NAME, toolContentId); } /**