Index: lams_build/3rdParty.userlibraries
===================================================================
diff -u -r50f28fabcac6dcd46979f8da77041931065f632d -rc33d4aa11d22778bd16a874f0a95237da3f2f10c
--- lams_build/3rdParty.userlibraries (.../3rdParty.userlibraries) (revision 50f28fabcac6dcd46979f8da77041931065f632d)
+++ lams_build/3rdParty.userlibraries (.../3rdParty.userlibraries) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c)
@@ -46,5 +46,6 @@
+
Index: lams_build/conf/j2ee/jboss-deployment-structure.xml
===================================================================
diff -u -r50f28fabcac6dcd46979f8da77041931065f632d -rc33d4aa11d22778bd16a874f0a95237da3f2f10c
--- lams_build/conf/j2ee/jboss-deployment-structure.xml (.../jboss-deployment-structure.xml) (revision 50f28fabcac6dcd46979f8da77041931065f632d)
+++ lams_build/conf/j2ee/jboss-deployment-structure.xml (.../jboss-deployment-structure.xml) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c)
@@ -41,6 +41,7 @@
+
Index: lams_common/src/java/org/lamsfoundation/lams/util/FluxMap.java
===================================================================
diff -u
--- lams_common/src/java/org/lamsfoundation/lams/util/FluxMap.java (revision 0)
+++ lams_common/src/java/org/lamsfoundation/lams/util/FluxMap.java (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c)
@@ -0,0 +1,105 @@
+package org.lamsfoundation.lams.util;
+
+import java.time.Duration;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+import org.apache.log4j.Logger;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * Utility class for serving updates to shared Fluxes.
+ * It receives signals from a source Flux, probably a Sink.Many.
+ * For each requested key it creates a hot publisher Flux which fetches data for the key.
+ * If the sink does not produce any matching signals for given timeout, the flux gets removed.
+ *
+ * @author Marcin Cieslak
+ */
+public class FluxMap extends ConcurrentHashMap> {
+ private static final long serialVersionUID = -8271928054306412858L;
+
+ private static Logger log = Logger.getLogger(FluxMap.class.getName());
+
+ public static final int STANDARD_THROTTLE = 1;
+ public static final int STANDARD_TIMEOUT = 5 * 60;
+
+ // only for logging purposes
+ private final String operationDescription;
+ private final Flux source;
+ // default timeout is null, i.e. never expire
+ private final Integer timeoutSeconds;
+ // default throttle time is null, i.e. no throttling
+ private final Integer throttleSeconds;
+ private final Function fetchFunction;
+
+ public FluxMap(String operationDescritpion, Flux source, Function fetchFunction, Integer throttleSeconds,
+ Integer timeoutSeconds) {
+ this.operationDescription = operationDescritpion;
+ this.source = source;
+ this.fetchFunction = fetchFunction;
+ this.throttleSeconds = throttleSeconds;
+ this.timeoutSeconds = timeoutSeconds;
+ }
+
+ /**
+ * Get a hot publisher Flux for the given key.
+ */
+ public Flux getFlux(T key) {
+ // try to get existing Flux
+ Flux flux = get(key);
+
+ if (flux == null) {
+ // create a new Flux
+ if (log.isDebugEnabled()) {
+ log.debug("Creating new flux for \"" + operationDescription + "\" with key " + key);
+ }
+
+ // filter out signals which do not match the key
+ Flux filteringFlux = source.filter(item -> item.equals(key));
+ // do not emit more often than this amount of time
+ if (throttleSeconds != null) {
+ filteringFlux = filteringFlux.sample(Duration.ofSeconds(throttleSeconds));
+ }
+ // fetch data based on the key
+ flux = filteringFlux.map(item -> fetchFunction.apply(item));
+
+ // push initial value to the Flux so data is available immediately after subscribing
+ flux = flux.startWith(fetchFunction.apply(key))
+ // make sure the subsequent subscribers also have data immediately available
+ .cache(1)
+ // just some logging
+ .doOnSubscribe(subscription -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Subscribed to flux \"" + operationDescription + "\" with key " + key);
+ }
+ })
+ // just some logging
+ .doOnCancel(() -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Cancelling subscription to flux for \"" + operationDescription + "\" with key "
+ + key);
+ }
+ });
+
+ // remove Flux on timeout
+ if (timeoutSeconds != null) {
+ flux = flux.timeout(Duration.ofSeconds(timeoutSeconds)).onErrorResume(TimeoutException.class,
+ throwable -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Removing flux for \"" + operationDescription + "\" with key " + key);
+ }
+ // remove terminated Flux from the map
+ remove(key);
+ // switch subscribers to a dummy Flux which completes and cancels their subscriptions
+ return Flux.empty();
+ });
+ }
+
+ put(key, flux);
+ }
+
+ return flux;
+ }
+}
\ No newline at end of file
Index: lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/AssessmentServiceImpl.java
===================================================================
diff -u -ra13c7c0eabe99a281bc65537d779610d07a56f04 -rc33d4aa11d22778bd16a874f0a95237da3f2f10c
--- lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/AssessmentServiceImpl.java (.../AssessmentServiceImpl.java) (revision a13c7c0eabe99a281bc65537d779610d07a56f04)
+++ lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/AssessmentServiceImpl.java (.../AssessmentServiceImpl.java) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c)
@@ -45,7 +45,6 @@
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -130,6 +129,7 @@
import org.lamsfoundation.lams.usermanagement.dto.UserDTO;
import org.lamsfoundation.lams.usermanagement.service.IUserManagementService;
import org.lamsfoundation.lams.util.FileUtil;
+import org.lamsfoundation.lams.util.FluxMap;
import org.lamsfoundation.lams.util.JsonUtil;
import org.lamsfoundation.lams.util.MessageService;
import org.lamsfoundation.lams.util.NumberUtil;
@@ -143,6 +143,9 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Sinks;
+
/**
* @author Andrey Balan
*/
@@ -195,8 +198,20 @@
private ILearnerInteractionService learnerInteractionService;
- private final Map LEARNER_RESULTS_UPDATED_MAP = new ConcurrentHashMap<>();
+ // sink that accepts tool content ID for which learners' answers changed
+ private final Sinks.Many answersUpdatedSink;
+ // flux map for updating completion charts
+ private final FluxMap completionChartUpdateFluxMap;
+
+ public AssessmentServiceImpl() {
+ answersUpdatedSink = Sinks.many().multicast().onBackpressureBuffer(1);
+
+ completionChartUpdateFluxMap = new FluxMap<>("assessment completion chart update", answersUpdatedSink.asFlux(),
+ toolContentId -> getCompletionChartsData(toolContentId), FluxMap.STANDARD_THROTTLE,
+ FluxMap.STANDARD_TIMEOUT);
+ }
+
// *******************************************************************************
// Service method
// *******************************************************************************
@@ -662,10 +677,6 @@
}
}
- if (isAnswerModified) {
- LEARNER_RESULTS_UPDATED_MAP.put(assessment.getContentId(), true);
- }
-
// store finished date only on user hitting submit all answers button (and not submit mark hedging
// question)
int maximumGrade = 0;
@@ -713,6 +724,12 @@
learnerService.createCommandForLearners(assessment.getContentId(), userIds, jsonCommand.toString());
}
+ if (isAnswerModified) {
+ // need to flush so subscribers to sink see new answers in DB
+ assessmentResultDao.flush();
+ answersUpdatedSink.tryEmitNext(assessment.getContentId());
+ }
+
return true;
}
@@ -802,11 +819,32 @@
return questionResult;
}
+ private String getCompletionChartsData(long toolContentId) {
+ try {
+ ObjectNode chartJson = JsonNodeFactory.instance.objectNode();
+
+ chartJson.put("possibleLearners", getCountLessonLearnersByContentId(toolContentId));
+ chartJson.put("startedLearners", getCountUsersByContentId(toolContentId));
+ chartJson.put("completedLearners", getCountLearnersWithFinishedCurrentAttempt(toolContentId));
+
+ chartJson.put("sessionCount", getSessionsByContentId(toolContentId).size());
+ Map> answeredQuestionsByUsers = getAnsweredQuestionsByUsers(toolContentId);
+ if (!answeredQuestionsByUsers.isEmpty()) {
+ chartJson.set("answeredQuestionsByUsers", JsonUtil.readObject(answeredQuestionsByUsers));
+ Map answeredQuestionsByUsersCount = answeredQuestionsByUsers.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().size()));
+ chartJson.set("answeredQuestionsByUsersCount", JsonUtil.readObject(answeredQuestionsByUsersCount));
+ }
+ return chartJson.toString();
+ } catch (Exception e) {
+ log.error("Unable to fetch completion charts data for tool content ID " + toolContentId);
+ return "";
+ }
+ }
+
@Override
- public boolean isAnswersUpdated(long toolContentId) {
- Boolean isAnswersUpdated = LEARNER_RESULTS_UPDATED_MAP.get(toolContentId);
- LEARNER_RESULTS_UPDATED_MAP.put(toolContentId, false);
- return isAnswersUpdated == null ? true : isAnswersUpdated;
+ public Flux getCompletionChartsDataFlux(long toolContentId) {
+ return completionChartUpdateFluxMap.getFlux(toolContentId);
}
/**
Index: lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/IAssessmentService.java
===================================================================
diff -u -ra13c7c0eabe99a281bc65537d779610d07a56f04 -rc33d4aa11d22778bd16a874f0a95237da3f2f10c
--- lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/IAssessmentService.java (.../IAssessmentService.java) (revision a13c7c0eabe99a281bc65537d779610d07a56f04)
+++ lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/IAssessmentService.java (.../IAssessmentService.java) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c)
@@ -50,6 +50,8 @@
import org.lamsfoundation.lams.usermanagement.User;
import org.lamsfoundation.lams.util.excel.ExcelSheet;
+import reactor.core.publisher.Flux;
+
/**
* Interface that defines the contract that all ShareAssessment service provider must follow.
*
@@ -239,7 +241,7 @@
boolean storeUserAnswers(Assessment assessment, Long userId, List> pagedQuestions,
boolean isAutosave) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException;
- boolean isAnswersUpdated(long toolContentId);
+ Flux getCompletionChartsDataFlux(long toolContentId);
void loadupLastAttempt(Long assessmentUid, Long userId, List> pagedQuestionDtos);
Index: lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/web/controller/MonitoringController.java
===================================================================
diff -u -ra13c7c0eabe99a281bc65537d779610d07a56f04 -rc33d4aa11d22778bd16a874f0a95237da3f2f10c
--- lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/web/controller/MonitoringController.java (.../MonitoringController.java) (revision a13c7c0eabe99a281bc65537d779610d07a56f04)
+++ lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/web/controller/MonitoringController.java (.../MonitoringController.java) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c)
@@ -25,7 +25,6 @@
import java.io.IOException;
import java.security.InvalidParameterException;
-import java.time.Duration;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.util.ArrayList;
@@ -261,52 +260,13 @@
return "pages/monitoring/monitoring";
}
- @RequestMapping(path = "/getCompletionChartsData")
+ @RequestMapping(path = "/getCompletionChartsData", method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
- public String getCompletionChartsData(@RequestParam long toolContentId, HttpServletResponse response)
+ public Flux getCompletionChartsData(@RequestParam long toolContentId)
throws JsonProcessingException, IOException {
- String chartData = getCompletionChartsData(toolContentId);
- response.setContentType("application/json;charset=utf-8");
- return chartData;
+ return service.getCompletionChartsDataFlux(toolContentId);
}
- @RequestMapping(path = "/getCompletionChartsDataFlux", method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
- @ResponseBody
- public Flux getCompletionChartsDataFlux(@RequestParam long toolContentId)
- throws JsonProcessingException, IOException {
- return Flux.interval(Duration.ZERO, Duration.ofSeconds(1)).map(sequence -> {
- boolean isAnswersUpdated = service.isAnswersUpdated(toolContentId);
- if (!isAnswersUpdated) {
- return "";
- }
- String chartData = null;
- try {
- chartData = getCompletionChartsData(toolContentId);
- } catch (IOException e) {
- log.error(e);
- }
- return chartData;
- }).filter(chartData -> !chartData.isBlank()).distinctUntilChanged();
- }
-
- private String getCompletionChartsData(long toolContentId) throws JsonProcessingException, IOException {
- ObjectNode chartJson = JsonNodeFactory.instance.objectNode();
-
- chartJson.put("possibleLearners", service.getCountLessonLearnersByContentId(toolContentId));
- chartJson.put("startedLearners", service.getCountUsersByContentId(toolContentId));
- chartJson.put("completedLearners", service.getCountLearnersWithFinishedCurrentAttempt(toolContentId));
-
- chartJson.put("sessionCount", service.getSessionsByContentId(toolContentId).size());
- Map> answeredQuestionsByUsers = service.getAnsweredQuestionsByUsers(toolContentId);
- if (!answeredQuestionsByUsers.isEmpty()) {
- chartJson.set("answeredQuestionsByUsers", JsonUtil.readObject(answeredQuestionsByUsers));
- Map answeredQuestionsByUsersCount = answeredQuestionsByUsers.entrySet().stream()
- .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().size()));
- chartJson.set("answeredQuestionsByUsersCount", JsonUtil.readObject(answeredQuestionsByUsersCount));
- }
- return chartJson.toString();
- }
-
@RequestMapping("/userMasterDetail")
public String userMasterDetail(HttpServletRequest request, HttpServletResponse response) {
Long userId = WebUtil.readLongParam(request, AttributeNames.PARAM_USER_ID);
Index: lams_tool_assessment/web/includes/javascript/chart.js
===================================================================
diff -u -r62f565df2936fcc7978cb55d3e6b3f9f5a9621a5 -rc33d4aa11d22778bd16a874f0a95237da3f2f10c
--- lams_tool_assessment/web/includes/javascript/chart.js (.../chart.js) (revision 62f565df2936fcc7978cb55d3e6b3f9f5a9621a5)
+++ lams_tool_assessment/web/includes/javascript/chart.js (.../chart.js) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c)
@@ -1,8 +1,11 @@
function drawCompletionCharts(toolContentId, useGroups, animate) {
- const source = new EventSource( WEB_APP_URL + 'monitoring/getCompletionChartsDataFlux.do?toolContentId=' + toolContentId);
+ const source = new EventSource( WEB_APP_URL + 'monitoring/getCompletionChartsData.do?toolContentId=' + toolContentId);
source.onmessage = function (event) {
+ if (!event.data) {
+ return;
+ }
var data = JSON.parse(event.data);
drawActivityCompletionChart(data, animate);
drawAnsweredQuestionsChart(data, useGroups, animate);