-
Reactive Streams Spec: The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.
- Publisher:
void subscribe(Subscriber<? super T> s)
- Subscriber:
void onSubscribe(Subscription s); void onNext(T t); void onError(Throwable t); void onComplete();
- Subscription:
void request(long n); void cancel();
- Processor:
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
- Publisher:
-
Publisher/Subscriber flow (push/pull communication model)
<------------ subscribe() ------------- ------ onSubscribe (Subscription) ----> Pub <------------ request(n) -------------- Sub ------------- onNext(data) -----------> ... (n times) ------------- onComplete() -----------> -
Main types
- Flux: (0-n items async seq): https://projectreactor.io/docs/core/release/reference/images/flux.svg
- Mono: (0-1 async result): https://projectreactor.io/docs/core/release/reference/images/mono.svg
-
Notes:
- Flux stops emitting after an error
Flux.just("testin", "flux", "out") .concatWith(Flux.error(new RuntimeException("Some error"))) .concatWith(Flux.just("Won't be published")) .subscribe(System.out::println, System.err::println);
-
Schedulers: https://projectreactor.io/docs/core/release/reference/#schedulers
-
Operations
- flatMap vs concatMap vs flatMapSequential
- merge vs concat vs zip
-
Backpressure
- subscribe overload takes subscription handler, can be used to request n elements or cancel
- for custom logic provide implementation of
BaseSubscriberand overridehookOnNextfiniteFlux.subscribe(new BaseSubscriber<Integer>() { @Override protected void hookOnNext(Integer value) { request(1); System.out.println("Value received is : " + value); if(value == 4){ cancel(); } } });
-
Hot Publisher
Flux<String> stringFlux = Flux.just("A","B","C","D","E","F") .delayElements(Duration.ofSeconds(1)); ConnectableFlux<String> connectableFlux = stringFlux.publish(); connectableFlux.connect(); connectableFlux.subscribe(s -> System.out.println("Subscriber 1 : " + s)); Thread.sleep(3000); connectableFlux.subscribe(s -> System.out.println("Subscriber 2 : " + s)); // does not emit the values from beginning Thread.sleep(4000); /* Output Subscriber 1 : A Subscriber 1 : B Subscriber 1 : C Subscriber 2 : C Subscriber 1 : D Subscriber 2 : D Subscriber 1 : E Subscriber 2 : E Subscriber 1 : F Subscriber 2 : F */
-
Spring WebFlux Internals
- Netty event loop
- Reactive streams adapter
- WebFilter
- WebHandler (DispatcherHandler)
- Controller/Router
-
Spring WebFlux Controller:
- Return flux as stream:
@GetMapping(value = "/fluxstream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<Long> returnInfiniteFluxStream() { return Flux.interval(Duration.ofSeconds(1)); }
- Test class to be annotated with
@RunWith(SpringRunner.class)&@WebFluxTest- Doesn't scan
@Component,@Repositoryor@Service, only@Controller
- Doesn't scan
- Testing approaches for static flux using
webTestClient.get().uri("/flux").accept(MediaType.APPLICATION_JSON_UTF8).exchange().expectStatus().isOk():- Get flux using
.returnResult(<Class>).getResponseBody()and useStepVerifier - Assert size using
.expectBodyList(<Class>).hasSize(<n>) - Get result using
.expectBodyList(<Class>).returnResult().getResponseBody()and use assert with expected list - Get result consumer using
.expectBodyList(<Class>).sonsumeWith(response -> /* assert with expected list */)
- Get flux using
- Testing flux as stream:
Flux<Long> longStreamFlux = webTestClient.get().uri("/fluxstream") .accept(MediaType.APPLICATION_STREAM_JSON) .exchange() .expectStatus().isOk() .returnResult(Long.class) .getResponseBody(); StepVerifier.create(longStreamFlux) .expect(0L) .expect(1L) .expect(2L) .expect(3L) .thenCancel() .verify();
- Testing mono: use approach 4 for static flux
- Exception Handling:
@ExceptionHandler(Class<T extends Exception>)annotated method returningResponseEntity(NotMono<ResponseEntity>) in controller class for local handling or in@ControllerAdviceannotated class for global handling- Use
.expectStatus().is5xxServerError()for testing
- Return flux as stream:
-
Spring WebFlux Router:
- Use
@SpringBootTestinstead of@WebFluxTestin tests to scan services etc. for routers/handlers- Need to explicitly add
@AutoConfigureWebTestClienttoo
- Need to explicitly add
- Exception Handling:
DefaultErrorWebExceptionHandlerused by default- Extend
AbstractErrorWebExceptionHandler- Add
ServerCodecConfigurerto constructor, and set writers & readers on super - Override
getRoutingFunction, useRouterFunctions.route(RequestPredicates.all(), this::myExceptionHandler) - Use
getErrorAttributesto map of error attributes like the key-values in the default error response json - Example: https://programmer.help/blogs/webflux-rest-api-global-exception-handling-error-handling.html
- Add
- Use
- Netty
- Channels used for communication b/w server and client through events
- Stages:
- Created
- Registered with event loop (So event loop can forward events to it in future)
- Active (C/S connected, ready to send & receive events)
- InAcitve (C/S are not connected)
- Unregistered from event loop
- Stages:
- Non-blocking client requests data, netty returns promise
- Event queue is processed by event loop
- Inbound events: Request for data, post data
- Outbound events: Connection open/close, response
- Channels used for communication b/w server and client through events
- Streaming Endpoint (SSE: Server Sent Events)
- Connection with the client remains open and server sends items/events as and when they come
- MongoDB:
- Tailable Cursors: Connection remains open even after all retrieved (
@Tailableannotation onFluxreturning methods in the repository) - Capped Collections: Fixed size collections that preserve insertion order (Can't be used for permanent storage)
- Tailable Cursors: Connection remains open even after all retrieved (
MediaType.APPLICATION_STREAM_JSON_VALUE&@Tailable: All entries in MongoDB get delivered to client real time
- Tidbits
- Spring has
CommandLineRunnerinterface, to run tasks on startup @DataMongoTestcan be used for mongo-only tests (Does not create service & component beans)@Profile("!test")can be used to create service bean for profiles other than test@ResponseStatus(HttpStatus.CREATED)can be used on a controller method to change default status code when return type isTinstead ofResponseEntity<T>
- Spring has