Index: lams_common/src/java/org/lamsfoundation/lams/util/FluxMap.java =================================================================== diff -u -rc33d4aa11d22778bd16a874f0a95237da3f2f10c -rd0685b6bd29bf47ed67fbc4eb6abcc69f0c1acad --- lams_common/src/java/org/lamsfoundation/lams/util/FluxMap.java (.../FluxMap.java) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c) +++ lams_common/src/java/org/lamsfoundation/lams/util/FluxMap.java (.../FluxMap.java) (revision d0685b6bd29bf47ed67fbc4eb6abcc69f0c1acad) @@ -3,6 +3,7 @@ import java.time.Duration; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.log4j.Logger; @@ -11,8 +12,9 @@ /** * Utility class for serving updates to shared Fluxes. - * It receives signals from a source Flux, probably a Sink.Many. + * It receives signals from a source Flux, probably a part of SharedSink. * For each requested key it creates a hot publisher Flux which fetches data for the key. + * Supports time-based throttling. * If the sink does not produce any matching signals for given timeout, the flux gets removed. * * @author Marcin Cieslak @@ -58,37 +60,67 @@ // filter out signals which do not match the key Flux filteringFlux = source.filter(item -> item.equals(key)); + // do not emit more often than this amount of time if (throttleSeconds != null) { filteringFlux = filteringFlux.sample(Duration.ofSeconds(throttleSeconds)); } - // fetch data based on the key - flux = filteringFlux.map(item -> fetchFunction.apply(item)); - // push initial value to the Flux so data is available immediately after subscribing - flux = flux.startWith(fetchFunction.apply(key)) + // Manually complete this flux if all subscribers are gone. + // Using available factory methods it does not seem possible to have a cached and shared Flux. + AtomicInteger subscriberCounter = new AtomicInteger(); + filteringFlux = filteringFlux.handle((item, sink) -> { + int counter = subscriberCounter.get(); + if (counter <= 0) { + if (log.isDebugEnabled()) { + log.debug("Completing and removing flux for \"" + operationDescription + "\" with key " + key); + } + sink.complete(); + remove(key); + return; + } + sink.next(item); + }); + + // map items from sink to fetch function result + flux = filteringFlux.map(item -> fetchFunction.apply(item)) + // push initial value to the Flux so data is available immediately after subscribing + .startWith(fetchFunction.apply(key)) + // make sure the subsequent subscribers also have data immediately available .cache(1) + // just some logging .doOnSubscribe(subscription -> { + int counter = subscriberCounter.incrementAndGet(); + if (counter <= 0) { + subscriberCounter.set(1); + counter = 1; + } + if (log.isDebugEnabled()) { - log.debug("Subscribed to flux \"" + operationDescription + "\" with key " + key); + log.debug("Subscribed (" + counter + ") to flux \"" + operationDescription + "\" with key " + + key); } }) + // just some logging .doOnCancel(() -> { + int counter = subscriberCounter.decrementAndGet(); + if (log.isDebugEnabled()) { - log.debug("Cancelling subscription to flux for \"" + operationDescription + "\" with key " - + key); + log.debug("Cancelling (" + counter + ") subscription to flux for \"" + operationDescription + + "\" with key " + key); } }); - // remove Flux on timeout + // remove Flux when source Flux did not emit an accepted signal before given timeout if (timeoutSeconds != null) { flux = flux.timeout(Duration.ofSeconds(timeoutSeconds)).onErrorResume(TimeoutException.class, throwable -> { if (log.isDebugEnabled()) { - log.debug("Removing flux for \"" + operationDescription + "\" with key " + key); + log.debug( + "Removing timed out flux for \"" + operationDescription + "\" with key " + key); } // remove terminated Flux from the map remove(key); Index: lams_common/src/java/org/lamsfoundation/lams/util/SharedSink.java =================================================================== diff -u --- lams_common/src/java/org/lamsfoundation/lams/util/SharedSink.java (revision 0) +++ lams_common/src/java/org/lamsfoundation/lams/util/SharedSink.java (revision d0685b6bd29bf47ed67fbc4eb6abcc69f0c1acad) @@ -0,0 +1,40 @@ +package org.lamsfoundation.lams.util; + +import org.apache.log4j.Logger; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + +/** + * This class allows both sink functionality (manually pushing elements to Flux) + * and hot publisher (so multiple other Fluxes can use it as source). + * + * @author Marcin Cieslak + */ +public class SharedSink { + + private static Logger log = Logger.getLogger(SharedSink.class.getName()); + + private final Sinks.Many sink; + private final Flux flux; + + public SharedSink(String operationDescription) { + if (log.isDebugEnabled()) { + log.debug("Created sink for \"" + operationDescription + "\""); + } + sink = Sinks.many().replay().latest(); + flux = sink.asFlux().doFinally((signalType) -> { + if (log.isDebugEnabled()) { + log.debug("Terminated (" + signalType + ") sink for \"" + operationDescription + "\""); + } + }).publish().autoConnect(); + } + + public void emit(T item) { + sink.tryEmitNext(item); + } + + public Flux getFlux() { + return flux; + } +} \ No newline at end of file Index: lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/AssessmentServiceImpl.java =================================================================== diff -u -rc33d4aa11d22778bd16a874f0a95237da3f2f10c -rd0685b6bd29bf47ed67fbc4eb6abcc69f0c1acad --- lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/AssessmentServiceImpl.java (.../AssessmentServiceImpl.java) (revision c33d4aa11d22778bd16a874f0a95237da3f2f10c) +++ lams_tool_assessment/src/java/org/lamsfoundation/lams/tool/assessment/service/AssessmentServiceImpl.java (.../AssessmentServiceImpl.java) (revision d0685b6bd29bf47ed67fbc4eb6abcc69f0c1acad) @@ -133,6 +133,7 @@ import org.lamsfoundation.lams.util.JsonUtil; import org.lamsfoundation.lams.util.MessageService; import org.lamsfoundation.lams.util.NumberUtil; +import org.lamsfoundation.lams.util.SharedSink; import org.lamsfoundation.lams.util.WebUtil; import org.lamsfoundation.lams.util.excel.ExcelCell; import org.lamsfoundation.lams.util.excel.ExcelRow; @@ -144,7 +145,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; /** * @author Andrey Balan @@ -199,15 +199,15 @@ private ILearnerInteractionService learnerInteractionService; // sink that accepts tool content ID for which learners' answers changed - private final Sinks.Many answersUpdatedSink; + private final SharedSink answersUpdatedSink; // flux map for updating completion charts private final FluxMap completionChartUpdateFluxMap; public AssessmentServiceImpl() { - answersUpdatedSink = Sinks.many().multicast().onBackpressureBuffer(1); + answersUpdatedSink = new SharedSink<>("assessment learner answers update"); - completionChartUpdateFluxMap = new FluxMap<>("assessment completion chart update", answersUpdatedSink.asFlux(), + completionChartUpdateFluxMap = new FluxMap<>("assessment completion chart update", answersUpdatedSink.getFlux(), toolContentId -> getCompletionChartsData(toolContentId), FluxMap.STANDARD_THROTTLE, FluxMap.STANDARD_TIMEOUT); } @@ -727,7 +727,7 @@ if (isAnswerModified) { // need to flush so subscribers to sink see new answers in DB assessmentResultDao.flush(); - answersUpdatedSink.tryEmitNext(assessment.getContentId()); + answersUpdatedSink.emit(assessment.getContentId()); } return true;