This is a long one.
My problem: I'm having trouble backpressureing a function that is currently using an old pub/sub device on one of its events (that is to say: I have a producer that is running too fast for my consumer). I'm using guava's EventBus
, which is old and very stateful. This works fine for many use cases, but in this particular backpressure case I've got deadlock for reasons I dont entirely understand if a subscriber calls runBlocking{}
to attempt to block ImportantService
when it fires a message.
EventBus
is effectively depricated; it's really convienient for UI (read: giant stateful objects) but it's awful for any service trying to maintain referential transparency (read: using pure functions and composing message processing with things like flow.map
)
At its core, I have this kind of object:
class ImportantService(..., private val eventPublisher: EventBus){
fun startLargeWork(): CompletableFuture<LargeResultID> { ... }
}
and I want this
class ImportantService(...) {
fun startLargeWork(): FlowOrStreamOrChannel<WorkOutputMessage> { ... }
}
sealed interface WorkOutputMessage {
class PublishSubscribeMessage(...): WorkOutputMessage
class FinishedWithLargeResultID(val id: LargeResultID): WorkOutputMessage
}
The key thing I'm looking for here is clear back-pressure. I want it to be the default that when a message is emitted to FlowStreamish
, the subscriber can block the up-stream publisher trivially. What I really want is the ability to put a breakpoint on a subscriber, and have the publisher simply be up-stack.
I sort've achieved this now, but it does not play nicely at all with coroutines, which are becoming increasingly common. Right now I have this:
class ImportantService(..., private val eventPublisher: EventBus){
fun startLargeWork(): CompletableFuture<LargeResultID> {
...
require( ! Thread.currentThread().isMainThread())
runBlocking(Dispatchers.Main) {
eventPublisher.post(PublishSubscribeMessage(...))
}
}
}
// eliding the guava event bus's semantics a bit
eventPublisher.register<PublishSubscribeMessage> { message: PublishSubscribeMessage ->
coroutineScope.launch {
state = generateNewState(message)
}
}
So my first question is why replacing coroutineScope.launch
with runBlocking
might cause deadlock. I have some theories but I'm wondering if anybody has a simple explanation for me. In any event: I think that I consider liberal use of runBlocking
harmful. Its fine for an entry-point, but stay away from nesting (IE calling runBlocking
when your own code is upstack)
Some extra wrinkles:
on stack, between startLargeWork
and eventBus.post
are native frames. In other words our code is actually like this:
class ImportantService(
...,
private val nativeCode: JNA_NativeBridge,
private val eventPublisher: EventBus
){
fun startLargeWork(): CompletableFuture<LargeResultID> {
...
nativeCode.doMathAndCallback(runIterationCallback = formatToJNAUpcall { bigOlFortranMatrix ->
...
require( ! Thread.currentThread().isMainThread())
runBlocking(Dispatchers.Main) {
eventPublisher.post(PublishSubscribeMessage(...))
}
})
}
}
which, from what I understand, kills any hope of either the kotlin-coroutines compiler or project loom having the ability to insert a suspension device at eventPublisher.post()
. As written, eventPublisher.post()
(or flowBuilder.emit()
or stream.send
etc) must block.
Another wrinkle is that there are a couple flags on the return value from startLargeWork()
that I'd need to support. Specifically something like isPaused
, which is a state check (and can reasonably elegantly be implemented as a message processor): IE I want to write
fun mainIsh(){
val runningWork = importantService.startLargeWork(stuff)
...
val paused = runningWork.isPaused
if(paused){ ... }
}
the concept of paused
(et al) is fairly trivially implemented by keeping track of state on the stream, but there are a number of places where we're doing this stateful polling and it would be obnoxious every time. eg:
fun mainIsh(){
var runningWork = importantService.startLargeWork(stuff)
var isPaused: Boolean = false
runningWork = runningWork.map { message ->
isPaused = when(message) {
is PauseStartedMessage -> true
is PauseEndedMessage -> false
else -> isPaused
}
}
//...
if(paused) { ... }
}
I instinctively go to subclassing, something like a custom class OptimizerMessageFlow: Flow<OptimizerMessage>
implementation that delegates its scary flow operators to a ChannelFlow
or similar, that adds these state-check functions, but I'm not sure thats best and the channel/flow operators really are pretty scary (read: easy to get wrong).
So this is my first big question: How would you guys go about migrating from this old pub-sub guava EventBus
to a newer Stream
or Channel
or Flow
? I'm personally really concerned about keeping nice stack-traces, but I get the feeling that I'm going to have to give that up (at least somewhat).
Firstly, the native code uses a bunch of global variables which means it's not reentrant, so calling it on a worker thread is odd. Its safer to wrap it in some kind of thread confinement, and if I do that with some kind of message passing surface then I loose my really clean stacks, but I can now use asyncs top-to-bottom on the consumer end. Exactly how I do this escapes me right now, TBD.
Next I need to find the kind of flow that I want. That flow will have to suspend on the above mentioned native code, which means I think it has to be a channelFlow, whcih means I think i loose my really nice stack traces here too, which kinda sucks.
I really want this thing to be simple in that its debuggable (IE: has nice stack-traces). TBD.