Index: lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java =================================================================== diff -u -rbaf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5 -r92c9b190efccbd14176e74c4b1b6346e5e02b75a --- lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java (.../SharedSink.java) (revision baf2503b7b75aa6c2d59b3df2e3a16f62a3d80d5) +++ lams_common/src/java/org/lamsfoundation/lams/flux/SharedSink.java (.../SharedSink.java) (revision 92c9b190efccbd14176e74c4b1b6346e5e02b75a) @@ -1,13 +1,13 @@ package org.lamsfoundation.lams.flux; import org.apache.log4j.Logger; - -import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.util.Loggers; +import java.util.logging.Level; + /** * This class allows both sink functionality (manually pushing elements to Flux) and hot publisher (so multiple other * Fluxes can use it as source). @@ -21,12 +21,16 @@ private final Sinks.Many sink; private final Flux flux; + static { + Loggers.useJdkLoggers(); + } + public SharedSink(String name) { if (log.isDebugEnabled()) { log.debug("Created sink for \"" + name + "\""); } sink = Sinks.many().replay().latest(); - flux = sink.asFlux().onBackpressureLatest().doFinally((signalType) -> { + flux = sink.asFlux().log("reactor.", Level.FINE, true).onBackpressureLatest().doFinally((signalType) -> { if (log.isDebugEnabled()) { log.debug("Terminated (" + signalType + ") sink for \"" + name + "\""); }