Index: lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java =================================================================== diff -u --- lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (revision 0) +++ lams_common/src/java/org/lamsfoundation/lams/flux/FluxMap.java (revision f7b8a16c38d3569a3a675eaaef3be8d3a99e74ff) @@ -0,0 +1,137 @@ +package org.lamsfoundation.lams.util; + +import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import org.apache.log4j.Logger; + +import reactor.core.publisher.Flux; + +/** + * Utility class for serving updates to shared Fluxes. + * It receives signals from a source Flux, probably a part of SharedSink. + * For each requested key it creates a hot publisher Flux which fetches data for the key. + * Supports time-based throttling. + * If the sink does not produce any matching signals for given timeout, the flux gets removed. + * + * @author Marcin Cieslak + */ +public class FluxMap extends ConcurrentHashMap> { + private static final long serialVersionUID = -8271928054306412858L; + + private static Logger log = Logger.getLogger(FluxMap.class.getName()); + + public static final int STANDARD_THROTTLE = 1; + public static final int STANDARD_TIMEOUT = 5 * 60; + + // only for logging purposes + private final String operationDescription; + private final Flux source; + // default timeout is null, i.e. never expire + private final Integer timeoutSeconds; + // default throttle time is null, i.e. no throttling + private final Integer throttleSeconds; + private final Function fetchFunction; + + public FluxMap(String operationDescritpion, Flux source, Function fetchFunction, Integer throttleSeconds, + Integer timeoutSeconds) { + this.operationDescription = operationDescritpion; + this.source = source; + this.fetchFunction = fetchFunction; + this.throttleSeconds = throttleSeconds; + this.timeoutSeconds = timeoutSeconds; + } + + /** + * Get a hot publisher Flux for the given key. + */ + public Flux getFlux(T key) { + // try to get existing Flux + Flux flux = get(key); + + if (flux == null) { + // create a new Flux + if (log.isDebugEnabled()) { + log.debug("Creating new flux for \"" + operationDescription + "\" with key " + key); + } + + // filter out signals which do not match the key + Flux filteringFlux = source.filter(item -> item.equals(key)); + + // do not emit more often than this amount of time + if (throttleSeconds != null) { + filteringFlux = filteringFlux.sample(Duration.ofSeconds(throttleSeconds)); + } + + // Manually complete this flux if all subscribers are gone. + // Using available factory methods it does not seem possible to have a cached and shared Flux. + AtomicInteger subscriberCounter = new AtomicInteger(); + filteringFlux = filteringFlux.handle((item, sink) -> { + int counter = subscriberCounter.get(); + if (counter <= 0) { + if (log.isDebugEnabled()) { + log.debug("Completing and removing flux for \"" + operationDescription + "\" with key " + key); + } + sink.complete(); + remove(key); + return; + } + sink.next(item); + }); + + // map items from sink to fetch function result + flux = filteringFlux.map(item -> fetchFunction.apply(item)) + // push initial value to the Flux so data is available immediately after subscribing + .startWith(fetchFunction.apply(key)) + + // make sure the subsequent subscribers also have data immediately available + .cache(1) + + // just some logging + .doOnSubscribe(subscription -> { + int counter = subscriberCounter.incrementAndGet(); + if (counter <= 0) { + subscriberCounter.set(1); + counter = 1; + } + + if (log.isDebugEnabled()) { + log.debug("Subscribed (" + counter + ") to flux \"" + operationDescription + "\" with key " + + key); + } + }) + + // just some logging + .doOnCancel(() -> { + int counter = subscriberCounter.decrementAndGet(); + + if (log.isDebugEnabled()) { + log.debug("Cancelling (" + counter + ") subscription to flux for \"" + operationDescription + + "\" with key " + key); + } + }); + + // remove Flux when source Flux did not emit an accepted signal before given timeout + if (timeoutSeconds != null) { + flux = flux.timeout(Duration.ofSeconds(timeoutSeconds)).onErrorResume(TimeoutException.class, + throwable -> { + if (log.isDebugEnabled()) { + log.debug( + "Removing timed out flux for \"" + operationDescription + "\" with key " + key); + } + // remove terminated Flux from the map + remove(key); + // switch subscribers to a dummy Flux which completes and cancels their subscriptions + return Flux.empty(); + }); + } + + put(key, flux); + } + + return flux; + } +} \ No newline at end of file Index: lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java =================================================================== diff -u --- lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java (revision 0) +++ lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java (revision f7b8a16c38d3569a3a675eaaef3be8d3a99e74ff) @@ -0,0 +1,40 @@ +package org.lamsfoundation.lams.util; + +import org.apache.log4j.Logger; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + +/** + * This class allows both sink functionality (manually pushing elements to Flux) + * and hot publisher (so multiple other Fluxes can use it as source). + * + * @author Marcin Cieslak + */ +public class SharedSink { + + private static Logger log = Logger.getLogger(SharedSink.class.getName()); + + private final Sinks.Many sink; + private final Flux flux; + + public SharedSink(String operationDescription) { + if (log.isDebugEnabled()) { + log.debug("Created sink for \"" + operationDescription + "\""); + } + sink = Sinks.many().replay().latest(); + flux = sink.asFlux().doFinally((signalType) -> { + if (log.isDebugEnabled()) { + log.debug("Terminated (" + signalType + ") sink for \"" + operationDescription + "\""); + } + }).publish().autoConnect(); + } + + public void emit(T item) { + sink.tryEmitNext(item); + } + + public Flux getFlux() { + return flux; + } +} \ No newline at end of file Fisheye: Tag f7b8a16c38d3569a3a675eaaef3be8d3a99e74ff refers to a dead (removed) revision in file `lams_common/src/java/org/lamsfoundation/lams/util/FluxMap.java'. Fisheye: No comparison available. Pass `N' to diff? Fisheye: Tag f7b8a16c38d3569a3a675eaaef3be8d3a99e74ff refers to a dead (removed) revision in file `lams_common/src/java/org/lamsfoundation/lams/util/SharedSink.java'. Fisheye: No comparison available. Pass `N' to diff?