Created
December 22, 2023 11:03
-
-
Save sauthieg/7a9730c35da43042e7fbaf4d2a1ed1b0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.forgerock.openig.launcher; | |
import java.util.Deque; | |
import java.util.concurrent.ConcurrentLinkedDeque; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.function.Function; | |
import io.reactivex.rxjava3.core.Flowable; | |
import io.reactivex.rxjava3.schedulers.Schedulers; | |
import io.vertx.codegen.annotations.Fluent; | |
import io.vertx.codegen.annotations.Nullable; | |
import io.vertx.core.AbstractVerticle; | |
import io.vertx.core.AsyncResult; | |
import io.vertx.core.Future; | |
import io.vertx.core.Handler; | |
import io.vertx.core.Promise; | |
import io.vertx.core.Vertx; | |
import io.vertx.core.impl.ContextInternal; | |
import io.vertx.core.streams.ReadStream; | |
import io.vertx.core.streams.WriteStream; | |
import io.vertx.rxjava3.impl.ReadStreamSubscriber; | |
public class StreamIssue { | |
public static void main(String[] args) throws InterruptedException { | |
Vertx vertx = Vertx.vertx(); | |
vertx.deployVerticle(new AbstractVerticle() { | |
@Override | |
public void start() throws Exception { | |
for (int i = 0; i < 10; i++) { | |
CountDownLatch latch = new CountDownLatch(1); | |
Flowable<String> flowable = Flowable.<String>generate(emitter -> { | |
emitter.onNext("hello"); | |
emitter.onComplete(); | |
latch.countDown(); | |
}).subscribeOn(Schedulers.io()); // subscription happens on a NON event-loop thread | |
ReadStream<String> rs = new ReadStreamSubscriber<>(Function.identity(), flowable); | |
WriteInQueueStream ws = new WriteInQueueStream((ContextInternal) vertx.getOrCreateContext()); | |
ReadStream<String> rs2 = new DelegateReadStream(rs) { | |
@Override | |
public ReadStream<String> resume() { | |
// the RS is "open", accepting items from the flowable | |
// an initial request() is done, that triggers a subscription | |
// on a different thread: | |
// items are placed in the pending queue and consumed immediately on the | |
// non-Vert.x thread | |
super.resume(); | |
try { | |
// We block here, waiting for the flowable to complete | |
// So that, in the pipe, when | |
// `result.future().onComplete(...)` | |
// is executed, the RS has completed, and the completion handler | |
// is executed right away | |
latch.await(); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
return this; | |
} | |
}; | |
rs2.pipeTo(ws, result -> { | |
if (result.succeeded()) { | |
System.out.println(ws.list); | |
// => [RxCachedThreadScheduler-1 hello, vert.x-eventloop-thread-0 end] | |
} else { | |
System.out.println("KO " + result.cause()); | |
} | |
}); | |
} | |
} | |
}); | |
} | |
private static class WriteInQueueStream implements WriteStream<String> { | |
private final ContextInternal context; | |
Deque<String> list = new ConcurrentLinkedDeque<>(); | |
public WriteInQueueStream(final ContextInternal context) { | |
this.context = context; | |
} | |
@Override | |
public WriteStream<String> exceptionHandler(@Nullable final Handler<Throwable> handler) { | |
return this; | |
} | |
@Override | |
public Future<Void> write(final String data) { | |
Promise<Void> p = Promise.promise(); | |
write(data, p); | |
return p.future(); | |
} | |
@Override | |
public void write(final String data, final Handler<AsyncResult<Void>> handler) { | |
String receiver = " | Received on " + Thread.currentThread().getName(); | |
context.execute(() -> { | |
String executed = " | Executed by " + Thread.currentThread().getName(); | |
list.add(data + receiver + executed); | |
handler.handle(Future.succeededFuture()); | |
}); | |
/* | |
String executed = " | Executed by " + Thread.currentThread().getName(); | |
list.add(data + receiver + executed); | |
handler.handle(Future.succeededFuture()); | |
*/ | |
} | |
@Override | |
public Future<Void> end() { | |
Promise<Void> p = Promise.promise(); | |
end(p); | |
return p.future(); | |
} | |
@Override | |
public void end(final Handler<AsyncResult<Void>> handler) { | |
write("end", handler); | |
} | |
@Override | |
public WriteStream<String> setWriteQueueMaxSize(final int maxSize) { | |
return this; | |
} | |
@Override | |
public boolean writeQueueFull() { | |
return false; | |
} | |
@Override | |
public WriteStream<String> drainHandler(@Nullable final Handler<Void> handler) { | |
return this; | |
} | |
} | |
private static abstract class DelegateReadStream implements ReadStream<String> { | |
ReadStream<String> delegate; | |
public DelegateReadStream(final ReadStream<String> delegate) { | |
this.delegate = delegate; | |
} | |
@Override | |
public ReadStream<String> exceptionHandler(@Nullable final Handler<Throwable> handler) { | |
delegate.exceptionHandler(handler); | |
return this; | |
} | |
@Override | |
@Fluent | |
public ReadStream<String> handler(@Nullable final Handler<String> handler) { | |
delegate.handler(handler); | |
return this; | |
} | |
@Override | |
@Fluent | |
public ReadStream<String> pause() { | |
delegate.pause(); | |
return this; | |
} | |
@Override | |
@Fluent | |
public ReadStream<String> resume() { | |
delegate.resume(); | |
return this; | |
} | |
@Override | |
@Fluent | |
public ReadStream<String> fetch(final long amount) { | |
delegate.fetch(amount); | |
return this; | |
} | |
@Override | |
@Fluent | |
public ReadStream<String> endHandler(@Nullable final Handler<Void> endHandler) { | |
delegate.endHandler(endHandler); | |
return this; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I use
ContextInternal.execute()
inWriteInQueueStream.write()
to mimic the behaviour of theHttpClientRequest
(that comes fromHttp1xClientConnection.Streamimpl
) that enforces write operations on the Netty channel to be on the event loop thread.https://github.com/eclipse-vertx/vert.x/blob/4.x/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java#L574-L581
If you switch the comment block, you'll see that the output is different if the write stream performs the operations synchronously (order is respected, all good) or if it queue some operations (only end get it in the list)