Created
November 22, 2020 22:05
-
-
Save razorcd/32b3da7c02f9f2df93da088e13240a47 to your computer and use it in GitHub Desktop.
Scaling reactive APIs - www.razorcodes.com
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Value | |
public class EventStream { | |
private final String eventStreamId; | |
private final Long offsetTimeMs; | |
private final Long offsetCount; | |
private final Set<FluxSink<Object>> fluxSinks; | |
public void addStream(FluxSink<Object> newFluxSinkObject) { | |
synchronized(this) { | |
fluxSinks.add(newFluxSinkObject); | |
} | |
} | |
public void removeStream(FluxSink<Object> existingFluxSinkObject) { | |
synchronized(this) { | |
fluxSinks.remove(existingFluxSinkObject); | |
} | |
} | |
public boolean hasListeners() { | |
return !fluxSinks.isEmpty(); | |
} | |
//closes all stream connections with an error message. | |
public void sendError(String message) { | |
fluxSinks.forEach(fluxSink -> fluxSink.error(new ConnectionClosedException(message))); | |
} | |
//publish an event to all streams | |
public void publishEvent(String event) { | |
fluxSinks.forEach(fluxSink -> fluxSink.next(event)); | |
} | |
//creates a new object with updated offsets | |
public EventStream withOffset(Long offsetMs, Long offsetCount) { | |
return new EventStream(this.eventStreamId, offsetMs, offsetCount, this.fluxSinks); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment