Last active
March 7, 2025 14:26
-
-
Save bijukunjummen/a12ab5d3e823c5f052ce608b5fc7b6a4 to your computer and use it in GitHub Desktop.
Demonstration of caching with Project Reactor and Caffeine
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.github.benmanes.caffeine.cache.AsyncLoadingCache; | |
import com.github.benmanes.caffeine.cache.Caffeine; | |
import com.github.benmanes.caffeine.cache.LoadingCache; | |
import org.jetbrains.annotations.NotNull; | |
import reactor.core.publisher.Mono; | |
import reactor.core.scheduler.Schedulers; | |
import java.time.Duration; | |
import java.util.function.Function; | |
import java.util.function.Supplier; | |
public class CachingUtils { | |
private static final String FIXED_KEY = "FIXED_KEY"; | |
private CachingUtils() { | |
} | |
public static <T> Function<String, T> of(@NotNull Duration duration, @NotNull Function<String, T> fn) { | |
final LoadingCache<String, T> cache = Caffeine.newBuilder() | |
.expireAfterWrite(duration) | |
.build((String k) -> fn.apply(k)); | |
return (String key) -> cache.get(key); | |
} | |
public static <T> Supplier<T> of(@NotNull Duration duration, @NotNull Supplier<T> supplier) { | |
Function<String, T> fn = of(duration, k -> supplier.get()); | |
return () -> fn.apply(FIXED_KEY); | |
} | |
public static <T> Function<String, Mono<T>> ofMono(@NotNull Duration duration, @NotNull Function<String, Mono<T>> fn) { | |
final AsyncLoadingCache<String, T> cache = Caffeine.newBuilder() | |
.expireAfterWrite(duration.multipliedBy(2)) | |
.refreshAfterWrite(duration) | |
.buildAsync((k, e) -> | |
fn.apply(k) | |
.subscribeOn(Schedulers.fromExecutor(e)) | |
.toFuture()); | |
return (k) -> Mono.fromFuture(cache.get(k)); | |
} | |
public static <T> Mono<T> ofMonoFixedKey(@NotNull Duration duration, @NotNull Mono<T> mono) { | |
Function<String, Mono<T>> monoFn = ofMono(duration, key -> mono); | |
return Mono.defer(() -> monoFn.apply(FIXED_KEY)); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.junit.jupiter.api.Test; | |
import reactor.core.publisher.Mono; | |
import reactor.test.StepVerifier; | |
import java.time.Duration; | |
import java.util.Random; | |
import java.util.concurrent.ThreadLocalRandom; | |
import java.util.function.Function; | |
import java.util.function.Supplier; | |
import static org.assertj.core.api.Assertions.assertThat; | |
class CachingUtilsTest { | |
@Test | |
void testSyncCaching() { | |
Random random = new Random(); | |
Function<String, Integer> fn = (k) -> random.nextInt(); | |
Function<String, Integer> wrappedFn = CachingUtils.of(Duration.ofSeconds(10), fn); | |
int result1 = wrappedFn.apply("key1"); | |
assertThat(wrappedFn.apply("key1")).isEqualTo(result1); | |
assertThat(wrappedFn.apply("key1")).isEqualTo(result1); | |
assertThat(wrappedFn.apply("key2")).isNotEqualTo(result1); | |
} | |
@Test | |
void testSupplierCaching() { | |
Random random = new Random(); | |
Supplier<Integer> fn = () -> random.nextInt(); | |
Supplier<Integer> wrappedSupplier = CachingUtils.of(Duration.ofSeconds(60), fn); | |
int result1 = wrappedSupplier.get(); | |
assertThat(wrappedSupplier.get()).isEqualTo(result1); | |
assertThat(wrappedSupplier.get()).isEqualTo(result1); | |
} | |
private Mono<String> get(String key) { | |
Random random = ThreadLocalRandom.current(); | |
return Mono.fromSupplier(() -> key + random.nextInt()); | |
} | |
@Test | |
void testMonoCaching() { | |
Function<String, Mono<String>> fn = (k) -> get(k); | |
Function<String, Mono<String>> wrappedFn = CachingUtils.ofMono(Duration.ofSeconds(10), fn); | |
StepVerifier.create(wrappedFn.apply("key1")) | |
.assertNext(result1 -> { | |
StepVerifier.create(wrappedFn.apply("key1")) | |
.assertNext(result2 -> { | |
assertThat(result2).isEqualTo(result1); | |
}) | |
.verifyComplete(); | |
StepVerifier.create(wrappedFn.apply("key1")) | |
.assertNext(result2 -> { | |
assertThat(result2).isEqualTo(result1); | |
}) | |
.verifyComplete(); | |
StepVerifier.create(wrappedFn.apply("key2")) | |
.assertNext(result2 -> { | |
assertThat(result2).isNotEqualTo(result1); | |
}) | |
.verifyComplete(); | |
}) | |
.verifyComplete(); | |
} | |
@Test | |
void testMonoCachingFixedKey() { | |
Random random = new Random(); | |
Mono<Integer> mono = Mono.fromCallable(() -> random.nextInt()); | |
Mono<Integer> wrapped = CachingUtils.ofMonoFixedKey(Duration.ofSeconds(10), mono); | |
StepVerifier.create(wrapped) | |
.assertNext(result1 -> { | |
StepVerifier.create(wrapped) | |
.assertNext(result2 -> { | |
assertThat(result2).isEqualTo(result1); | |
}) | |
.verifyComplete(); | |
}) | |
.verifyComplete(); | |
} | |
} |
Thanks for the reminder @ben-manes . I've even used that feature before! :D
You can use
AsyncCache
so that the in-flight load is available in the cache for other threads to obtain. This avoids cache stampedes. This can be done using the future converters,public static <T> Function<String, Mono<T>> ofMono(Duration duration, Function<String, Mono<T>> fn) { AsyncCache<String, T> cache = Caffeine.newBuilder() .expireAfterWrite(duration) .buildAsync(); return key -> Mono.fromFuture(cache.get(key, k -> fn.apply(k).toFuture())); }
Awesome!! thanks Ben-manes! Yes, makes sense to use the AsyncCache version, I was indeed running into cache stampede with the sync cache. I have modified the gist now.
Love Caffeine by the way and thank you for creating/maintaining it.
You can use
AsyncCache
so that the in-flight load is available in the cache for other threads to obtain. This avoids cache stampedes. This can be done using the future converters,public static <T> Function<String, Mono<T>> ofMono(Duration duration, Function<String, Mono<T>> fn) { AsyncCache<String, T> cache = Caffeine.newBuilder() .expireAfterWrite(duration) .buildAsync(); return key -> Mono.fromFuture(cache.get(key, k -> fn.apply(k).toFuture())); }
public static <T> Function<String, Mono<T>> ofMono(Duration duration, Function<String, Mono<T>> fn) {
AsyncCache<String, T> cache = Caffeine.newBuilder()
.expireAfterWrite(duration)
.buildAsync();
return key -> Mono.fromFuture(cache.get(key, (k, executor) -> fn.apply(k).toFuture()));
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You can use
AsyncCache
so that the in-flight load is available in the cache for other threads to obtain. This avoids cache stampedes. This can be done using the future converters,