Skip to content

Instantly share code, notes, and snippets.

@egil
Last active February 14, 2025 08:16
Show Gist options
  • Save egil/f532ba1a9dd6e07250dc8d7abc0159ee to your computer and use it in GitHub Desktop.
Save egil/f532ba1a9dd6e07250dc8d7abc0159ee to your computer and use it in GitHub Desktop.
Make any Orleans grain storage observable during testing
using Orleans.Storage;
using R3;
namespace Egil.Orleans.Storage.Testing;
public sealed class StorageObserver(string storageName, IGrainStorage observableTarget) : IGrainStorage
{
private readonly Subject<StorageOperation> operationsFeed = new();
public Observable<StorageOperation> StorageOperationFeed => operationsFeed.AsObservable();
public async Task ClearStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
await observableTarget.ClearStateAsync(stateName, grainId, grainState);
operationsFeed.OnNext(
new StorageOperation(
StorageOperationKind.Clear,
grainId,
storageName,
stateName,
grainState.ETag,
grainState.State));
}
public async Task ReadStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
await observableTarget.ReadStateAsync(stateName, grainId, grainState);
operationsFeed.OnNext(
new StorageOperation(
StorageOperationKind.Read,
grainId,
storageName,
stateName,
grainState.ETag,
grainState.State));
}
public async Task WriteStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
await observableTarget.WriteStateAsync(stateName, grainId, grainState);
operationsFeed.OnNext(
new StorageOperation(
StorageOperationKind.Write,
grainId,
storageName,
stateName,
grainState.ETag,
grainState.State));
}
}
public enum StorageOperationKind
{
Clear,
Read,
Write
}
public record class StorageOperation(
StorageOperationKind Kind,
GrainId GrainId,
string StorageName,
string StateName,
string? Etag,
object? State);
using R3;
namespace Egil.Orleans.Storage.Testing;
public sealed class StorageObserverAggregator : IDisposable
{
private readonly Lock subLock = new();
private readonly ReplaySubject<StorageOperation> operationsFeed = new();
private IDisposable childObserverSubscriptions = Disposable.Empty;
public Observable<StorageOperation> StorageOperationFeed => operationsFeed.AsObservable();
public void Dispose()
{
childObserverSubscriptions.Dispose();
operationsFeed.Dispose();
}
public Observable<StorageOperation> GetOperationFeed(GrainId grainId)
{
return StorageOperationFeed.Where(x => x.GrainId == grainId).AsObservable();
}
internal void AddObserver(StorageObserver observer)
{
var subscription = observer.StorageOperationFeed.Subscribe(operationsFeed.OnNext);
lock (subLock)
{
childObserverSubscriptions = Disposable.Combine(childObserverSubscriptions, subscription);
}
}
}
using Orleans.Providers;
using Orleans.Storage;
namespace Egil.Orleans.Storage.Testing;
public static class StorageObserverSiloBuilderExtensions
{
public static ISiloBuilder MakeDefaultGrainStorageObservable(this ISiloBuilder siloBuilder, StorageObserverAggregator observerAggregator)
=> siloBuilder.MakeGrainStorageObservable(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, observerAggregator);
public static ISiloBuilder MakeGrainStorageObservable(this ISiloBuilder siloBuilder, string name, StorageObserverAggregator observerAggregator)
{
ArgumentNullException.ThrowIfNull(siloBuilder);
ArgumentNullException.ThrowIfNull(name);
ArgumentNullException.ThrowIfNull(observerAggregator);
var target = siloBuilder.Services.LastOrDefault(x => x.IsKeyedService && x.ServiceKey?.Equals(name) == true && x.KeyedImplementationFactory is not null)
?? throw new InvalidOperationException($"No grain storage provider with name '{name}' was found.");
siloBuilder.Services.Remove(target);
siloBuilder.Services.AddKeyedSingleton<IGrainStorage>(
name,
(sp, _) =>
{
var inner = (IGrainStorage)target.KeyedImplementationFactory!(sp, name);
var observer = new StorageObserver(name, inner);
observerAggregator.AddObserver(observer);
return observer;
});
return siloBuilder;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment