Skip to content

Instantly share code, notes, and snippets.

@egil
Last active April 27, 2025 10:24
Show Gist options
  • Save egil/be9c2510d43ab1269c4dca5e136de024 to your computer and use it in GitHub Desktop.
Save egil/be9c2510d43ab1269c4dca5e136de024 to your computer and use it in GitHub Desktop.
Orleans test set up for xUnit 3 using class fixture, stream support, observable storage writing, service injection
public class ExampleTest(SiloFixture fixture) : IClassFixture<SiloFixture>
{
[Fact]
public async Task Sending_data_to_stream()
{
var message = "foo bar baz";
var stream = fixture.GetStream<string>("grainId1");
await stream.OnNextAsync(message);
await WaitForWrites("grainId1", writeCount: 1); // blocks tests until grain has written once to its storage provider
var latestGrainMessage = await GetGrain<IMyGrain)("grainId1").GetLastMessageAsync();
Assert.Equal(message, latestGrainMessage);
}
}
using Egil.Orleans.Storage.Testing;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Streams;
using Orleans.TestingHost;
using TimeProviderExtensions;
using R3;
public sealed class SiloFixture : IAsyncLifetime, IGrainFactory
{
private const string TestStreamProviderName = "TestStreamProvider";
private IStreamProvider? streamProvider;
private InProcessTestCluster? cluster;
private IGrainFactory? grainFactory;
public StorageObserverAggregator StorageObserver { get; }
public ManualTimeProvider TimeProvider { get; }
public SiloFixture()
{
TimeProvider = new ManualTimeProvider(DateTimeOffset.UtcNow);
StorageObserver = new StorageObserverAggregator();
}
public async ValueTask InitializeAsync()
{
var builder = new InProcessTestClusterBuilder(initialSilosCount: 1);
builder.ConfigureSilo((options, siloBuilder) =>
{
// add other service registrations here. to make the services accessible in tests,
// provide instances of the service directly and keep a reference to them
// in the SiloFixture, e.g., through properties, as done with TimeProvider below.
siloBuilder.Services.AddKeyedSingleton<TimeProvider>("DomainTimeProvider", TimeProvider);
// Set up default storage and make that observable in tests
siloBuilder.AddMemoryGrainStorageAsDefault(options => options.GrainStorageSerializer = new SystemTextJsonGrainStorageSerializer(PricingEngineJsonSerializerOptions.Instance))
.MakeDefaultGrainStorageObservable(StorageObserver);
// Set up custom storage and make that observable in tests
siloBuilder.AddMemoryGrainStorage(
"Custom",
options => options.GrainStorageSerializer = new SystemTextJsonGrainStorageSerializer(PricingEngineJsonSerializerOptions.Instance))
.MakeGrainStorageObservable(
"Custom",
StorageObserver);
siloBuilder.AddMemoryStreams(
TestStreamProviderName,
configurator =>
{
configurator.ConfigureStreamPubSub(StreamPubSubType.ImplicitOnly);
});
});
builder.ConfigureClient((clientBuilder) =>
{
clientBuilder.AddMemoryStreams(TestStreamProviderName);
});
cluster = builder.Build();
await cluster.DeployAsync();
grainFactory = cluster.Client;
streamProvider = cluster.Client.GetStreamProvider(TestStreamProviderName);
}
public async ValueTask DisposeAsync()
{
if (cluster is not null)
{
await cluster.StopAllSilosAsync();
}
StorageObserver.Dispose();
}
public IAsyncStream<T> GetStream<T>(string @namespace, string key)
=> streamProvider?.GetStream<T>(StreamId.Create(@namespace, key))
?? throw new InvalidOperationException("Cluster not ready");
public async Task WaitForWrites<T>(string grainId, int writeCount) where T : IGrainWithStringKey
=> await StorageObserver
.GetOperationFeed(GetGrain<T>(grainId).GetGrainId())
.Where(x => x.Kind is StorageOperationKind.Write)
.Index()
.TakeUntil(x => x.Index == writeCount - 1)
.WaitAsync(TestContext.Current.CancellationToken);
public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidKey
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerKey
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(string primaryKey, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string keyExtension, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidCompoundKey
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string keyExtension, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerCompoundKey
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix);
public TGrainObserverInterface CreateObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).CreateObjectReference<TGrainObserverInterface>(obj);
public void DeleteObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).DeleteObjectReference<TGrainObserverInterface>(obj);
public IGrain GetGrain(Type grainInterfaceType, Guid grainPrimaryKey)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey);
public IGrain GetGrain(Type grainInterfaceType, long grainPrimaryKey)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey);
public IGrain GetGrain(Type grainInterfaceType, string grainPrimaryKey)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey);
public IGrain GetGrain(Type grainInterfaceType, Guid grainPrimaryKey, string keyExtension)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey, keyExtension);
public IGrain GetGrain(Type grainInterfaceType, long grainPrimaryKey, string keyExtension)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey, keyExtension);
public TGrainInterface GetGrain<TGrainInterface>(GrainId grainId) where TGrainInterface : IAddressable
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(grainId);
public IAddressable GetGrain(GrainId grainId)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainId);
public IAddressable GetGrain(GrainId grainId, GrainInterfaceType interfaceType)
=> (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainId, interfaceType);
}
using Orleans.Storage;
using R3;
namespace Egil.Orleans.Storage.Testing;
public sealed class StorageObserver(string storageName, IGrainStorage observableTarget) : IGrainStorage
{
private readonly Subject<StorageOperation> operationsFeed = new();
public Observable<StorageOperation> StorageOperationFeed => operationsFeed.AsObservable();
public string StorageName { get; } = storageName;
public async Task ClearStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
await observableTarget.ClearStateAsync(stateName, grainId, grainState);
operationsFeed.OnNext(
new StorageOperation(
StorageOperationKind.Clear,
grainId,
StorageName,
stateName,
grainState.ETag,
grainState.State));
}
public async Task ReadStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
await observableTarget.ReadStateAsync(stateName, grainId, grainState);
operationsFeed.OnNext(
new StorageOperation(
StorageOperationKind.Read,
grainId,
StorageName,
stateName,
grainState.ETag,
grainState.State));
}
public async Task WriteStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
await observableTarget.WriteStateAsync(stateName, grainId, grainState);
operationsFeed.OnNext(
new StorageOperation(
StorageOperationKind.Write,
grainId,
StorageName,
stateName,
grainState.ETag,
grainState.State));
}
}
public enum StorageOperationKind
{
Clear,
Read,
Write
}
public record class StorageOperation(
StorageOperationKind Kind,
GrainId GrainId,
string StorageName,
string StateName,
string? Etag,
object? State);
using R3;
namespace Egil.Orleans.Storage.Testing;
public sealed class StorageObserverAggregator : IDisposable
{
private readonly Lock subLock = new();
private readonly ReplaySubject<StorageOperation> operationsFeed = new();
private readonly HashSet<string> observers = [];
private IDisposable childObserverSubscriptions = Disposable.Empty;
public Observable<StorageOperation> StorageOperationFeed => operationsFeed.AsObservable();
public void Dispose()
{
childObserverSubscriptions.Dispose();
operationsFeed.Dispose();
}
public Observable<StorageOperation> GetOperationFeed(GrainId grainId)
{
return StorageOperationFeed.Where(x => x.GrainId == grainId).AsObservable();
}
public Observable<StorageOperation> GetOperationFeed(string grainId)
{
var id = IdSpan.Create(grainId);
return StorageOperationFeed.Where(x => x.GrainId.Key == id).AsObservable();
}
internal void AddObserver(StorageObserver observer)
{
var subscription = observer.StorageOperationFeed.Subscribe(operationsFeed.OnNext);
lock (subLock)
{
if (observers.Contains(observer.StorageName))
{
throw new InvalidOperationException($"Observer for storage name {observer.StorageName} has already been added.");
}
observers.Add(observer.StorageName);
childObserverSubscriptions = Disposable.Combine(childObserverSubscriptions, subscription);
}
}
}
using Microsoft.Extensions.DependencyInjection;
using Orleans.Providers;
using Orleans.Storage;
namespace Egil.Orleans.Storage.Testing;
public static class StorageObserverSiloBuilderExtensions
{
public static ISiloBuilder MakeDefaultGrainStorageObservable(this ISiloBuilder siloBuilder, StorageObserverAggregator observerAggregator)
=> siloBuilder.MakeGrainStorageObservable(ProviderConstants.DEFAULT_STORAGE_PROVIDER_NAME, observerAggregator);
public static ISiloBuilder MakeGrainStorageObservable(this ISiloBuilder siloBuilder, string name, StorageObserverAggregator observerAggregator)
{
ArgumentNullException.ThrowIfNull(siloBuilder);
ArgumentNullException.ThrowIfNull(name);
ArgumentNullException.ThrowIfNull(observerAggregator);
var target = siloBuilder.Services.LastOrDefault(x => x.IsKeyedService && x.ServiceKey?.Equals(name) == true && x.KeyedImplementationFactory is not null)
?? throw new InvalidOperationException($"No grain storage provider with name '{name}' was found.");
siloBuilder.Services.Remove(target);
siloBuilder.Services.AddKeyedSingleton<IGrainStorage>(
name,
(sp, _) =>
{
var inner = (IGrainStorage)target.KeyedImplementationFactory!(sp, name);
var observer = new StorageObserver(name, inner);
observerAggregator.AddObserver(observer);
return observer;
});
return siloBuilder;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment