Created
August 24, 2024 07:59
-
-
Save chemicL/9e5e01cda24a97f16dbecba97c5fdfbc to your computer and use it in GitHub Desktop.
Observable arbiters around a streaming request-response chain
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.jedrzejczyk.reactorlab; | |
import java.util.List; | |
import reactor.core.publisher.Flux; | |
import reactor.core.scheduler.Schedulers; | |
public class WrapAroundExercise { | |
public static void main(String[] args) { | |
ArbiterAppender a = new ArbiterAppender("A"); | |
ArbiterAppender b = new ArbiterAppender("B"); | |
Request request = new Request("Request"); | |
List<Arbiter> arbiters = List.of(a, b); | |
// imperativeStyle(request, arbiters); | |
reactiveStyle(request, arbiters); | |
} | |
static void reactiveStyle(Request request, List<Arbiter> arbiters) { | |
Flux.fromIterable(arbiters) | |
.transformDeferredContextual((f, ctx) -> f | |
// This allows us to call blocking code in reduce | |
.publishOn(Schedulers.boundedElastic()) | |
.reduce(request, (r, a) -> { | |
// TODO: Here we can combine reactive with imperative by wrapping | |
// the imperative call with an Observation that has the | |
// parent set to ctx.get(ObservationThreadLocalAccessor.KEY) | |
return a.before(r); | |
})) | |
// Because the above accepts a Publisher we need to enforce mapping to Mono | |
.single() | |
.doOnNext(r -> System.out.println("Request: " + r)) | |
.flatMapMany(r -> { | |
// We can apply a similar strategy to the after arbitters for each | |
// response or for an aggregation of the response. | |
return streamResponse(r); | |
}) | |
.blockLast(); | |
} | |
static void imperativeStyle(Request request, List<Arbiter> arbiters) { | |
for (Arbiter arbiter : arbiters) { | |
request = arbiter.before(request); | |
} | |
System.out.println("Request: " + request); | |
Response response = makeRequest(request); | |
for (Arbiter arbiter : arbiters.reversed()) { | |
response = arbiter.after(response); | |
} | |
System.out.println("Response: " + response); | |
} | |
static Response makeRequest(Request request) { | |
return new Response("Response"); | |
} | |
static Flux<Response> streamResponse(Request request) { | |
return Flux.just(new Response("Streaming response")); | |
} | |
static class Request { | |
String content; | |
public Request(String content) { | |
this.content = content; | |
} | |
String content() { | |
return content; | |
} | |
@Override | |
public String toString() { | |
return content; | |
} | |
} | |
static class Response { | |
String content; | |
public Response(String content) { | |
this.content = content; | |
} | |
String content() { | |
return content; | |
} | |
@Override | |
public String toString() { | |
return content; | |
} | |
} | |
static interface Arbiter { | |
Request before(Request request); | |
Response after(Response response); | |
} | |
static class ArbiterAppender implements Arbiter { | |
String suffix; | |
public ArbiterAppender(String suffix) { | |
this.suffix = suffix; | |
} | |
@Override | |
public Request before(Request request) { | |
return new Request(request.content() + " _ " + suffix); | |
} | |
@Override | |
public Response after(Response response) { | |
return new Response(response.content() + " _ " + suffix); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment