Skip to content

Instantly share code, notes, and snippets.

@luvarqpp
Created October 13, 2022 13:06
Show Gist options
  • Save luvarqpp/8ea6ad5ad32a8fcbf3264c00ebe351b2 to your computer and use it in GitHub Desktop.
Save luvarqpp/8ea6ad5ad32a8fcbf3264c00ebe351b2 to your computer and use it in GitHub Desktop.
Sample code for stackoverflow question
import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import java.util.function.Function;
public class SinkTests {
@RequiredArgsConstructor
class Emitter {
private static final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> {
System.out.println(signalType + " -> " + emitResult);
return false;
};
private final Sinks.Many<String> sink;
private static void sleep(int dekaSeconds) {
try {
Thread.sleep(100 * dekaSeconds);
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
public void emit(String str) {
sink.emitNext(str, emitFailureHandler);
}
public void emitAndSleep(String str, int dekaSeconds) {
sink.emitNext(str, emitFailureHandler);
sleep(dekaSeconds);
}
public void emitComplete() {
sink.emitComplete(emitFailureHandler);
}
}
@Test
public void replay() {
// there is "synchronization" between clients
//final Sinks.Many<String> sink = Sinks.many().replay().limit(1, Duration.ZERO);
final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(5);
final Function<Sinks.Many<String>, Flux> getFlux = (Sinks.Many<String> snk) ->
snk.asFlux()
.subscribeOn(Schedulers.newSingle("asdf"))
.onBackpressureBuffer(6);
final Emitter s = new Emitter(sink);
s.emitAndSleep("nula", 10);
//getFlux.apply(sink).subscribe(x -> System.out.println("Consumer 0: " + x));
s.emit("jedna");
s.emitAndSleep("dva", 10);
getFlux.apply(sink).subscribe(x -> {
Emitter.sleep(4);
System.out.println("Consumer 1: " + x);
});
s.emitAndSleep("tri", 10);
getFlux.apply(sink).subscribe(x -> {
Emitter.sleep(9);
System.out.println(" SLOW er 2: " + x);
});
s.emitAndSleep("štyri", 10);
for (int i = 1; i < 18; i++) s.emitAndSleep("rýchlo " + i, 1);
Emitter.sleep(1);
s.emitComplete();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment