-
-
Save ludoo0d0a/6567ba0b61f611ca746e5ddcdf798d8c to your computer and use it in GitHub Desktop.
write a client iterating on next page using a cursor, a value coming from the first response | |
Here's an example of a Java client that iterates through pages using a cursor-based pagination approach: | |
```java | |
import org.springframework.web.reactive.function.client.WebClient; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
public class CursorPaginationClient { | |
private static final String BASE_URL = "https://api.example.com"; | |
private static final int PAGE_SIZE = 100; | |
private final WebClient webClient; | |
public CursorPaginationClient() { | |
this.webClient = WebClient.create(BASE_URL); | |
} | |
public Flux<String> fetchAllItems() { | |
return fetchPage(null) | |
.expand(response -> { | |
String nextCursor = response.getNextCursor(); | |
return nextCursor != null ? fetchPage(nextCursor) : Mono.empty(); | |
}) | |
.flatMapIterable(PageResponse::getItems); | |
} | |
private Mono<PageResponse> fetchPage(String cursor) { | |
return webClient.get() | |
.uri(uriBuilder -> uriBuilder | |
.path("/items") | |
.queryParam("pageSize", PAGE_SIZE) | |
.queryParamIfPresent("cursor", Optional.ofNullable(cursor)) | |
.build()) | |
.retrieve() | |
.bodyToMono(PageResponse.class); | |
} | |
private static class PageResponse { | |
private List<String> items; | |
private String nextCursor; | |
// Getters and setters | |
} | |
} | |
``` | |
This client uses Spring WebFlux's WebClient to make asynchronous HTTP requests. Here's how it works: | |
1. The `fetchAllItems()` method initiates the pagination process by calling `fetchPage(null)` for the first page. | |
2. The `expand()` operator is used to continue fetching subsequent pages. It checks if there's a `nextCursor` in the response and fetches the next page if available. | |
3. The `fetchPage()` method constructs the URI with the appropriate query parameters, including the cursor when provided. | |
4. The `PageResponse` class represents the structure of the API response, containing the list of items and the next cursor. | |
5. The `flatMapIterable()` operator is used to flatten the stream of page responses into a stream of individual items. | |
To use this client: | |
```java | |
CursorPaginationClient client = new CursorPaginationClient(); | |
client.fetchAllItems() | |
.subscribe( | |
item -> System.out.println("Received item: " + item), | |
error -> System.err.println("Error: " + error), | |
() -> System.out.println("All items fetched") | |
); | |
``` | |
This implementation efficiently handles cursor-based pagination, automatically fetching all pages and emitting items as they are received[1][3][5]. | |
Citations : | |
[1] Cursor Based Pagination with Spring Boot and JPA - GitHub https://github.com/hedza06/jpa-cursor-based-pagination | |
[2] Paginating through lists using cursor pagination https://developer.zendesk.com/documentation/api-basics/pagination/paginating-through-lists-using-cursor-pagination/ | |
[3] GraphQL pagination: Cursor and offset tutorials | Contentful https://www.contentful.com/blog/graphql-pagination-cursor-offset-tutorials/ | |
[4] Example implementation of cursor-based pagination - GitHub Gist https://gist.github.com/ahx/2fe6bdd1ab8925ce3ec7c8556730bfdd | |
[5] It is possible in cursor-based pagination to get the prev and next ... https://stackoverflow.com/questions/71699810/it-is-possible-in-cursor-based-pagination-to-get-the-prev-and-next-cursor-in-the | |
[6] Client Cursor API via REST example - GitHub https://github.com/andrei-punko/client-cursor | |
[7] REST Pagination in Spring - Baeldung https://www.baeldung.com/rest-api-pagination-in-spring | |
[8] Spring for GraphQL Documentation https://docs.spring.io/spring-graphql/docs/1.2.0-RC1/reference/html/ | |
[9] Request Execution :: Spring GraphQL https://docs.spring.io/spring-graphql/reference/request-execution.html | |
[10] Spring Boot GraphQL Tutorial - Part 7 Cursor based pagination https://www.youtube.com/watch?v=R2CrjOoJ4NU | |
[11] Does applying Pagination on response using Spring Webflux make ... https://stackoverflow.com/questions/66848041/does-applying-pagination-on-response-using-spring-webflux-make-it-blocking | |
[12] Spring Boot WebFlux Pagination for Reactive Spring Data example https://github.com/hendisantika/webflux-reactive | |
[13] Learn about cursor-based pagination in Spring Boot - LinkedIn https://www.linkedin.com/posts/keshavramk_webdevelopment-pagination-cursorpagination-activity-7127543949719707648-netv | |
[14] Pagination in Spring Webflux and Spring Data Reactive | Baeldung https://www.baeldung.com/spring-data-webflux-pagination | |
[15] Cursor based pagination - Client SDKs - Apollo Community https://community.apollographql.com/t/cursor-based-pagination/867 | |
[16] Why Use Cursor Based Pagination Over Offset Pagination? https://blog.stackademic.com/cursor-based-pagination-0d898719655c | |
[17] WebClient :: Spring Framework https://docs.spring.io/spring-framework/reference/web/webflux-webclient.html | |
[18] Using Pagination and cursor how to refresh existing results for use ... https://forums.fauna.com/t/using-pagination-and-cursor-how-to-refresh-existing-results-for-use-in-grids/2983 | |
[19] How to create loop within loop within loop to read document and ... https://tech.forums.softwareag.com/t/how-to-create-loop-within-loop-within-loop-to-read-document-and-grab-a-key-to-get-the-value-using-webmethods/238231?page=2 | |
[20] How to implement cursors for pagination in an api - Stack Overflow https://stackoverflow.com/questions/18314687/how-to-implement-cursors-for-pagination-in-an-api | |
[21] Spring WebClient exchange() vs retrieve() | Baeldung https://www.baeldung.com/spring-webclient-exchange-vs-retrieve | |
[22] Pagination with Spring Webflux - YouTube https://www.youtube.com/watch?v=mlM5qKPS15k | |
[23] Setup and Examples for the Spring WebFlux WebClient - rieckpil https://rieckpil.de/spring-webclient-for-restful-communication-setup-and-examples/ | |
[24] Java Api Client Pagination | Restackio https://www.restack.io/p/java-api-client-answer-pagination-techniques | |
[25] GraphQL Pagination with Java Spring Boot's “GraphQL for Spring” https://compositecode.blog/2023/06/23/graphql-pagination-with-java-spring-boots-graphql-for-spring/ | |
[26] Paginating Real-Time Data with Cursor Based Pagination - SitePoint https://www.sitepoint.com/paginating-real-time-data-cursor-based-pagination/ | |
[27] STUCK: How to run a cursor paramater for API pagination? https://forum.bubble.io/t/stuck-how-to-run-a-cursor-paramater-for-api-pagination/203606 | |
[28] Offset and Cursor Pagination explained - DEV Community https://dev.to/jackmarchant/offset-and-cursor-pagination-explained-b89 | |
[29] Paginate data with query cursors | Firestore - Firebase - Google https://firebase.google.com/docs/firestore/query-data/query-cursors | |
[30] Dynamic API Pagination with next page Cursor in a Record https://community.powerbi.com/t5/Desktop/Dynamic-API-Pagination-with-next-page-Cursor-in-a-Record/td-p/2446595 |
avoid thread.sleep
Vous avez raison, l'utilisation de Thread.sleep()
n'est pas idéale dans un contexte réactif. Voici une version améliorée du client qui évite Thread.sleep()
et utilise une approche plus réactive :
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class ParallelCursorPaginationClient {
private static final String BASE_URL = "https://api.example.com";
private static final int PAGE_SIZE = 100;
private static final int MAX_PARALLEL_REQUESTS = 10;
private final WebClient webClient;
private final Queue<String> cursorQueue = new ConcurrentLinkedQueue<>();
private final AtomicInteger activeRequests = new AtomicInteger(0);
public ParallelCursorPaginationClient() {
this.webClient = WebClient.create(BASE_URL);
}
public Flux<String> fetchAllItems() {
return fetchPage(null)
.flatMap(response -> {
cursorQueue.addAll(response.getNextCursors());
return Flux.fromIterable(response.getItems());
})
.concatWith(processRemainingCursors())
.subscribeOn(Schedulers.boundedElastic());
}
private Flux<String> processRemainingCursors() {
return Flux.defer(() -> {
if (cursorQueue.isEmpty() && activeRequests.get() == 0) {
return Flux.empty();
}
return Flux.merge(
Flux.range(0, MAX_PARALLEL_REQUESTS)
.flatMap(i -> processNextCursor())
).takeWhile(item -> !cursorQueue.isEmpty() || activeRequests.get() > 0)
.concatWith(processRemainingCursors());
});
}
private Flux<String> processNextCursor() {
return Mono.defer(() -> {
String cursor = cursorQueue.poll();
if (cursor == null) {
return Mono.empty();
}
activeRequests.incrementAndGet();
return fetchPage(cursor)
.flatMapIterable(PageResponse::getItems)
.doFinally(signalType -> activeRequests.decrementAndGet());
});
}
private Mono<PageResponse> fetchPage(String cursor) {
return webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/items")
.queryParam("pageSize", PAGE_SIZE)
.queryParamIfPresent("cursor", Optional.ofNullable(cursor))
.build())
.retrieve()
.bodyToMono(PageResponse.class);
}
private static class PageResponse {
private List<String> items;
private List<String> nextCursors;
// Getters and setters
public List<String> getItems() {
return items;
}
public List<String> getNextCursors() {
return nextCursors;
}
}
}
Voici les principales améliorations apportées :
-
Suppression de
Thread.sleep()
: Nous avons complètement éliminé l'utilisation deThread.sleep()
. -
Approche réactive pour le traitement des curseurs : La méthode
processRemainingCursors()
utilise maintenantFlux.defer()
pour créer un flux qui se renouvelle tant qu'il y a des curseurs à traiter ou des requêtes actives. -
Traitement parallèle amélioré :
Flux.merge()
est utilisé pour traiter jusqu'àMAX_PARALLEL_REQUESTS
curseurs simultanément. -
Gestion réactive des curseurs :
processNextCursor()
gère de manière réactive le traitement d'un seul curseur, en incrémentant et décrémentant le compteur de requêtes actives de façon appropriée. -
Récursivité réactive :
processRemainingCursors()
s'appelle récursivement de manière réactive pour continuer le traitement jusqu'à ce que tous les curseurs soient traités.
Cette version du client est entièrement réactive et évite toute forme de blocage ou d'attente active. Elle maintient l'efficacité du traitement parallèle tout en respectant la limite de requêtes simultanées.
L'utilisation du client reste la même :
ParallelCursorPaginationClient client = new ParallelCursorPaginationClient();
client.fetchAllItems()
.subscribe(
item -> System.out.println("Élément reçu : " + item),
error -> System.err.println("Erreur : " + error),
() -> System.out.println("Tous les éléments ont été récupérés")
);
Cette implémentation offre une meilleure performance et une utilisation plus efficace des ressources dans un environnement réactif.
make queries in parallel for better performance , don't care about the order, but apply a limit on parallel threads
Certainement. Voici une version améliorée du client qui effectue les requêtes en parallèle pour de meilleures performances, sans se soucier de l'ordre, tout en appliquant une limite sur les threads parallèles :
Voici les principales modifications et améliorations apportées :
Parallélisation des requêtes : Le code utilise maintenant une file d'attente (
ConcurrentLinkedQueue
) pour stocker les curseurs et traiter plusieurs pages en parallèle.Limite de requêtes parallèles :
MAX_PARALLEL_REQUESTS
limite le nombre de requêtes simultanées à 10.Gestion des requêtes actives :
AtomicInteger activeRequests
suit le nombre de requêtes en cours.Traitement asynchrone :
processRemainingCursors()
gère le traitement asynchrone des curseurs restants.Réponse API modifiée :
PageResponse
retourne maintenant une liste de curseurs suivants (nextCursors
) au lieu d'un seul.Flux non ordonné : Les éléments sont émis dès qu'ils sont disponibles, sans garantie d'ordre.
Pour utiliser ce client :
Ce code offre de meilleures performances en traitant les pages en parallèle, tout en contrôlant le nombre de requêtes simultanées pour éviter de surcharger l'API ou le client.