Index: lams_build/3rdParty.userlibraries =================================================================== diff -u -r50f28fabcac6dcd46979f8da77041931065f632d -rc33d4aa11d22778bd16a874f0a95237da3f2f10c --- lams_build/3rdParty.userlibraries (.../3rdParty.userlibraries) (revision 50f28fabcac6dcd46979f8da77041931065f632d) +++ lams_build/3rdParty.userlibraries (.../3rdParty.userlibraries) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c) @@ -46,5 +46,6 @@ + Index: lams_build/conf/j2ee/jboss-deployment-structure.xml =================================================================== diff -u -r50f28fabcac6dcd46979f8da77041931065f632d -rc33d4aa11d22778bd16a874f0a95237da3f2f10c --- lams_build/conf/j2ee/jboss-deployment-structure.xml (.../jboss-deployment-structure.xml) (revision 50f28fabcac6dcd46979f8da77041931065f632d) +++ lams_build/conf/j2ee/jboss-deployment-structure.xml (.../jboss-deployment-structure.xml) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c) @@ -41,6 +41,7 @@ + Index: lams_common/src/java/org/lamsfoundation/lams/util/FluxMap.java =================================================================== diff -u --- lams_common/src/java/org/lamsfoundation/lams/util/FluxMap.java (revision 0) +++ lams_common/src/java/org/lamsfoundation/lams/util/FluxMap.java (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c) @@ -0,0 +1,105 @@ +package org.lamsfoundation.lams.util; + +import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +import org.apache.log4j.Logger; + +import reactor.core.publisher.Flux; + +/** + * Utility class for serving updates to shared Fluxes. + * It receives signals from a source Flux, probably a Sink.Many. + * For each requested key it creates a hot publisher Flux which fetches data for the key. + * If the sink does not produce any matching signals for given timeout, the flux gets removed. + * + * @author Marcin Cieslak + */ +public class FluxMap extends ConcurrentHashMap> { + private static final long serialVersionUID = -8271928054306412858L; + + private static Logger log = Logger.getLogger(FluxMap.class.getName()); + + public static final int STANDARD_THROTTLE = 1; + public static final int STANDARD_TIMEOUT = 5 * 60; + + // only for logging purposes + private final String operationDescription; + private final Flux source; + // default timeout is null, i.e. never expire + private final Integer timeoutSeconds; + // default throttle time is null, i.e. no throttling + private final Integer throttleSeconds; + private final Function fetchFunction; + + public FluxMap(String operationDescritpion, Flux source, Function fetchFunction, Integer throttleSeconds, + Integer timeoutSeconds) { + this.operationDescription = operationDescritpion; + this.source = source; + this.fetchFunction = fetchFunction; + this.throttleSeconds = throttleSeconds; + this.timeoutSeconds = timeoutSeconds; + } + + /** + * Get a hot publisher Flux for the given key. + */ + public Flux getFlux(T key) { + // try to get existing Flux + Flux flux = get(key); + + if (flux == null) { + // create a new Flux + if (log.isDebugEnabled()) { + log.debug("Creating new flux for \"" + operationDescription + "\" with key " + key); + } + + // filter out signals which do not match the key + Flux filteringFlux = source.filter(item -> item.equals(key)); + // do not emit more often than this amount of time + if (throttleSeconds != null) { + filteringFlux = filteringFlux.sample(Duration.ofSeconds(throttleSeconds)); + } + // fetch data based on the key + flux = filteringFlux.map(item -> fetchFunction.apply(item)); + + // push initial value to the Flux so data is available immediately after subscribing + flux = flux.startWith(fetchFunction.apply(key)) + // make sure the subsequent subscribers also have data immediately available + .cache(1) + // just some logging + .doOnSubscribe(subscription -> { + if (log.isDebugEnabled()) { + log.debug("Subscribed to flux \"" + operationDescription + "\" with key " + key); + } + }) + // just some logging + .doOnCancel(() -> { + if (log.isDebugEnabled()) { + log.debug("Cancelling subscription to flux for \"" + operationDescription + "\" with key " + + key); + } + }); + + // remove Flux on timeout + if (timeoutSeconds != null) { + flux = flux.timeout(Duration.ofSeconds(timeoutSeconds)).onErrorResume(TimeoutException.class, + throwable -> { + if (log.isDebugEnabled()) { + log.debug("Removing flux for \"" + operationDescription + "\" with key " + key); + } + // remove terminated Flux from the map + remove(key); + // switch subscribers to a dummy Flux which completes and cancels their subscriptions + return Flux.empty(); + }); + } + + put(key, flux); + } + + return flux; + } +} \ No newline at end of file Index: lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/AssessmentServiceImpl.java =================================================================== diff -u -ra13c7c0eabe99a281bc65537d779610d07a56f04 -rc33d4aa11d22778bd16a874f0a95237da3f2f10c --- lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/AssessmentServiceImpl.java (.../AssessmentServiceImpl.java) (revision a13c7c0eabe99a281bc65537d779610d07a56f04) +++ lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/AssessmentServiceImpl.java (.../AssessmentServiceImpl.java) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c) @@ -45,7 +45,6 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -130,6 +129,7 @@ import org.lamsfoundation.lams.usermanagement.dto.UserDTO; import org.lamsfoundation.lams.usermanagement.service.IUserManagementService; import org.lamsfoundation.lams.util.FileUtil; +import org.lamsfoundation.lams.util.FluxMap; import org.lamsfoundation.lams.util.JsonUtil; import org.lamsfoundation.lams.util.MessageService; import org.lamsfoundation.lams.util.NumberUtil; @@ -143,6 +143,9 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + /** * @author Andrey Balan */ @@ -195,8 +198,20 @@ private ILearnerInteractionService learnerInteractionService; - private final Map LEARNER_RESULTS_UPDATED_MAP = new ConcurrentHashMap<>(); + // sink that accepts tool content ID for which learners' answers changed + private final Sinks.Many answersUpdatedSink; + // flux map for updating completion charts + private final FluxMap completionChartUpdateFluxMap; + + public AssessmentServiceImpl() { + answersUpdatedSink = Sinks.many().multicast().onBackpressureBuffer(1); + + completionChartUpdateFluxMap = new FluxMap<>("assessment completion chart update", answersUpdatedSink.asFlux(), + toolContentId -> getCompletionChartsData(toolContentId), FluxMap.STANDARD_THROTTLE, + FluxMap.STANDARD_TIMEOUT); + } + // ******************************************************************************* // Service method // ******************************************************************************* @@ -662,10 +677,6 @@ } } - if (isAnswerModified) { - LEARNER_RESULTS_UPDATED_MAP.put(assessment.getContentId(), true); - } - // store finished date only on user hitting submit all answers button (and not submit mark hedging // question) int maximumGrade = 0; @@ -713,6 +724,12 @@ learnerService.createCommandForLearners(assessment.getContentId(), userIds, jsonCommand.toString()); } + if (isAnswerModified) { + // need to flush so subscribers to sink see new answers in DB + assessmentResultDao.flush(); + answersUpdatedSink.tryEmitNext(assessment.getContentId()); + } + return true; } @@ -802,11 +819,32 @@ return questionResult; } + private String getCompletionChartsData(long toolContentId) { + try { + ObjectNode chartJson = JsonNodeFactory.instance.objectNode(); + + chartJson.put("possibleLearners", getCountLessonLearnersByContentId(toolContentId)); + chartJson.put("startedLearners", getCountUsersByContentId(toolContentId)); + chartJson.put("completedLearners", getCountLearnersWithFinishedCurrentAttempt(toolContentId)); + + chartJson.put("sessionCount", getSessionsByContentId(toolContentId).size()); + Map> answeredQuestionsByUsers = getAnsweredQuestionsByUsers(toolContentId); + if (!answeredQuestionsByUsers.isEmpty()) { + chartJson.set("answeredQuestionsByUsers", JsonUtil.readObject(answeredQuestionsByUsers)); + Map answeredQuestionsByUsersCount = answeredQuestionsByUsers.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().size())); + chartJson.set("answeredQuestionsByUsersCount", JsonUtil.readObject(answeredQuestionsByUsersCount)); + } + return chartJson.toString(); + } catch (Exception e) { + log.error("Unable to fetch completion charts data for tool content ID " + toolContentId); + return ""; + } + } + @Override - public boolean isAnswersUpdated(long toolContentId) { - Boolean isAnswersUpdated = LEARNER_RESULTS_UPDATED_MAP.get(toolContentId); - LEARNER_RESULTS_UPDATED_MAP.put(toolContentId, false); - return isAnswersUpdated == null ? true : isAnswersUpdated; + public Flux getCompletionChartsDataFlux(long toolContentId) { + return completionChartUpdateFluxMap.getFlux(toolContentId); } /** Index: lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/IAssessmentService.java =================================================================== diff -u -ra13c7c0eabe99a281bc65537d779610d07a56f04 -rc33d4aa11d22778bd16a874f0a95237da3f2f10c --- lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/IAssessmentService.java (.../IAssessmentService.java) (revision a13c7c0eabe99a281bc65537d779610d07a56f04) +++ lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/IAssessmentService.java (.../IAssessmentService.java) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c) @@ -50,6 +50,8 @@ import org.lamsfoundation.lams.usermanagement.User; import org.lamsfoundation.lams.util.excel.ExcelSheet; +import reactor.core.publisher.Flux; + /** * Interface that defines the contract that all ShareAssessment service provider must follow. * @@ -239,7 +241,7 @@ boolean storeUserAnswers(Assessment assessment, Long userId, List> pagedQuestions, boolean isAutosave) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException; - boolean isAnswersUpdated(long toolContentId); + Flux getCompletionChartsDataFlux(long toolContentId); void loadupLastAttempt(Long assessmentUid, Long userId, List> pagedQuestionDtos); Index: lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/web/controller/MonitoringController.java =================================================================== diff -u -ra13c7c0eabe99a281bc65537d779610d07a56f04 -rc33d4aa11d22778bd16a874f0a95237da3f2f10c --- lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/web/controller/MonitoringController.java (.../MonitoringController.java) (revision a13c7c0eabe99a281bc65537d779610d07a56f04) +++ lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/web/controller/MonitoringController.java (.../MonitoringController.java) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c) @@ -25,7 +25,6 @@ import java.io.IOException; import java.security.InvalidParameterException; -import java.time.Duration; import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.util.ArrayList; @@ -261,52 +260,13 @@ return "pages/monitoring/monitoring"; } - @RequestMapping(path = "/getCompletionChartsData") + @RequestMapping(path = "/getCompletionChartsData", method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE) @ResponseBody - public String getCompletionChartsData(@RequestParam long toolContentId, HttpServletResponse response) + public Flux getCompletionChartsData(@RequestParam long toolContentId) throws JsonProcessingException, IOException { - String chartData = getCompletionChartsData(toolContentId); - response.setContentType("application/json;charset=utf-8"); - return chartData; + return service.getCompletionChartsDataFlux(toolContentId); } - @RequestMapping(path = "/getCompletionChartsDataFlux", method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE) - @ResponseBody - public Flux getCompletionChartsDataFlux(@RequestParam long toolContentId) - throws JsonProcessingException, IOException { - return Flux.interval(Duration.ZERO, Duration.ofSeconds(1)).map(sequence -> { - boolean isAnswersUpdated = service.isAnswersUpdated(toolContentId); - if (!isAnswersUpdated) { - return ""; - } - String chartData = null; - try { - chartData = getCompletionChartsData(toolContentId); - } catch (IOException e) { - log.error(e); - } - return chartData; - }).filter(chartData -> !chartData.isBlank()).distinctUntilChanged(); - } - - private String getCompletionChartsData(long toolContentId) throws JsonProcessingException, IOException { - ObjectNode chartJson = JsonNodeFactory.instance.objectNode(); - - chartJson.put("possibleLearners", service.getCountLessonLearnersByContentId(toolContentId)); - chartJson.put("startedLearners", service.getCountUsersByContentId(toolContentId)); - chartJson.put("completedLearners", service.getCountLearnersWithFinishedCurrentAttempt(toolContentId)); - - chartJson.put("sessionCount", service.getSessionsByContentId(toolContentId).size()); - Map> answeredQuestionsByUsers = service.getAnsweredQuestionsByUsers(toolContentId); - if (!answeredQuestionsByUsers.isEmpty()) { - chartJson.set("answeredQuestionsByUsers", JsonUtil.readObject(answeredQuestionsByUsers)); - Map answeredQuestionsByUsersCount = answeredQuestionsByUsers.entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().size())); - chartJson.set("answeredQuestionsByUsersCount", JsonUtil.readObject(answeredQuestionsByUsersCount)); - } - return chartJson.toString(); - } - @RequestMapping("/userMasterDetail") public String userMasterDetail(HttpServletRequest request, HttpServletResponse response) { Long userId = WebUtil.readLongParam(request, AttributeNames.PARAM_USER_ID); Index: lams_tool_assessment/web/includes/javascript/chart.js =================================================================== diff -u -r62f565df2936fcc7978cb55d3e6b3f9f5a9621a5 -rc33d4aa11d22778bd16a874f0a95237da3f2f10c --- lams_tool_assessment/web/includes/javascript/chart.js (.../chart.js) (revision 62f565df2936fcc7978cb55d3e6b3f9f5a9621a5) +++ lams_tool_assessment/web/includes/javascript/chart.js (.../chart.js) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c) @@ -1,8 +1,11 @@ function drawCompletionCharts(toolContentId, useGroups, animate) { - const source = new EventSource( WEB_APP_URL + 'monitoring/getCompletionChartsDataFlux.do?toolContentId=' + toolContentId); + const source = new EventSource( WEB_APP_URL + 'monitoring/getCompletionChartsData.do?toolContentId=' + toolContentId); source.onmessage = function (event) { + if (!event.data) { + return; + } var data = JSON.parse(event.data); drawActivityCompletionChart(data, animate); drawAnsweredQuestionsChart(data, useGroups, animate);