Skip to content

Instantly share code, notes, and snippets.

@egil
Created February 13, 2025 15:56
Show Gist options
  • Save egil/a7624d6b7dfb340664eaa7f45003260a to your computer and use it in GitHub Desktop.
Save egil/a7624d6b7dfb340664eaa7f45003260a to your computer and use it in GitHub Desktop.
A fake/observable implementation of Orleans IGrainStorage that can be used during testing
using System.Collections.Concurrent;
using System.Globalization;
using Orleans.Storage;
// https://www.nuget.org/packages/R3
using R3;
using Xunit.Sdk;
namespace Egil.Orleans.Storage.Testing;
public sealed class ObservableMemoryStorage : IGrainStorage
{
private readonly ConcurrentDictionary<GrainId, (string StateName, string? Etag, object? State)> storage = new();
private readonly ReplaySubject<StorageOperation> changeFeed = new();
public Observable<StorageOperation> ChangeFeed => changeFeed.AsObservable();
public object? this[string grainId]
{
get
{
var id = IdSpan.Create(grainId);
return storage.TryGetValue(storage.Keys.FirstOrDefault(x => x.Key == id), out var result)
? result
: null;
}
}
public Observable<StorageOperation> GetStorageOperationFeed(GrainId grainId)
{
return changeFeed.Where(x => x.GrainId == grainId).AsObservable();
}
Task IGrainStorage.ClearStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
ArgumentException.ThrowIfNullOrWhiteSpace(stateName, nameof(stateName));
ArgumentNullException.ThrowIfNull(grainState);
if (storage.TryGetValue(grainId, out var oldValue)
&& (oldValue.Etag != grainState.ETag || oldValue.StateName != stateName || !storage.TryRemove(new KeyValuePair<GrainId, (string StateName, string? Etag, object? State)>(grainId, oldValue))))
{
throw new InconsistentStateException($"Storage condition not satisfied. CurrentETag: {grainState.ETag}", oldValue.Etag, grainState.ETag);
}
grainState.ETag = null;
grainState.State = default!;
grainState.RecordExists = false;
changeFeed.OnNext(
new StorageOperation(
StorageOperationKind.Clear,
grainId,
stateName,
grainState.ETag,
grainState.State));
return Task.CompletedTask;
}
Task IGrainStorage.ReadStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
ArgumentException.ThrowIfNullOrWhiteSpace(stateName, nameof(stateName));
ArgumentNullException.ThrowIfNull(grainState);
if (storage.TryGetValue(grainId, out var stateObject) && stateObject.StateName == stateName && stateObject.State is T state)
{
grainState.State = state;
grainState.ETag = stateObject.Etag;
grainState.RecordExists = true;
}
else
{
grainState.State = (T)ActivatorCreateInstance(typeof(T));
grainState.ETag = null;
grainState.RecordExists = false;
}
changeFeed.OnNext(
new StorageOperation(
StorageOperationKind.Read,
grainId,
stateName,
grainState.ETag,
grainState.State));
return Task.CompletedTask;
}
Task IGrainStorage.WriteStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
ArgumentException.ThrowIfNullOrWhiteSpace(stateName, nameof(stateName));
ArgumentNullException.ThrowIfNull(grainState);
var newEtag = grainState.State is not null
? grainState.State.GetHashCode().ToString(CultureInfo.InvariantCulture)
: null;
var newState = storage.AddOrUpdate(
grainId,
(stateName, newEtag, grainState.State),
(key, oldValue) =>
{
if (oldValue.Etag == grainState.ETag && oldValue.StateName == stateName)
{
return (stateName, newEtag, grainState.State);
}
else
{
throw new InconsistentStateException($"Storage condition not satisfied. CurrentETag: {grainState.ETag}", oldValue.Etag, grainState.ETag);
}
});
grainState.ETag = newState.Etag;
grainState.RecordExists = newState.State is T;
changeFeed.OnNext(
new StorageOperation(
StorageOperationKind.Write,
grainId,
stateName,
grainState.ETag,
grainState.State));
return Task.CompletedTask;
}
private static object ActivatorCreateInstance(Type type)
{
var ctor = type.GetConstructors().OrderBy(x => x.GetParameters().Length).First();
var parameters = ctor
.GetParameters()
.Select(x => x.ParameterType.IsNullable() ? null : ActivatorCreateInstance(x.ParameterType))
.ToArray();
return ctor.Invoke(parameters);
}
}
namespace Egil.Orleans.Testing;
public record class StorageOperation(
StorageOperationKind Kind,
GrainId GrainId,
string StateName,
string? Etag,
object? State);
namespace Egil.Orleans.Storage.Testing;
public enum StorageOperationKind
{
Clear,
Read,
Write
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment