Created
August 22, 2019 09:55
-
-
Save bboyle1234/ddfea297465ce0faa3039491c3645713 to your computer and use it in GitHub Desktop.
ReactiveExperiments
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.VisualStudio.TestTools.UnitTesting; | |
using Newtonsoft.Json; | |
using Newtonsoft.Json.Linq; | |
using Nito.AsyncEx; | |
using Rx.Net.Plus; | |
using System; | |
using System.Collections.Generic; | |
using System.Collections.Immutable; | |
using System.Globalization; | |
using System.IO; | |
using System.Linq; | |
using System.Net; | |
using System.Reactive; | |
using System.Reactive.Disposables; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace ConsoleApp5 { | |
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed | |
#pragma warning disable ConfigureAwaitEnforcer // ConfigureAwaitEnforcer | |
interface IDeviceView : ICloneable { | |
Guid DeviceId { get; } | |
} | |
class DeviceTotalView : IDeviceView { | |
public Guid DeviceId { get; set; } | |
public int Voltage { get; set; } | |
public int Currents { get; set; } | |
public object Clone() => this.MemberwiseClone(); | |
} | |
class DeviceVoltagesUpdateView : IDeviceView { | |
public Guid DeviceId { get; set; } | |
public int Voltage { get; set; } | |
public object Clone() => this.MemberwiseClone(); | |
} | |
class DeviceCurrentsUpdateView : IDeviceView { | |
public Guid DeviceId { get; set; } | |
public int Current { get; set; } | |
public object Clone() => this.MemberwiseClone(); | |
} | |
class DeviceUpdateEvent { | |
public DeviceTotalView View; | |
public IDeviceView LastUpdate; | |
} | |
static class DeviceUtils { | |
public static DeviceUpdateEvent Update(DeviceUpdateEvent previousUpdate, IDeviceView update) { | |
if (update.DeviceId != previousUpdate.View.DeviceId) throw new InvalidOperationException("Device ids do not match (numskull exception)."); | |
var view = (DeviceTotalView)previousUpdate.View.Clone(); | |
switch (update) { | |
case DeviceVoltagesUpdateView x: { | |
view.Voltage = x.Voltage; | |
break; | |
} | |
case DeviceCurrentsUpdateView x: { | |
view.Currents = x.Current; | |
break; | |
} | |
} | |
return new DeviceUpdateEvent { View = view, LastUpdate = update }; | |
} | |
} | |
interface IDeviceService { | |
/// <summary> | |
/// Gets an observable that produces aggregated update events for the device with the given deviceId. | |
/// On subscription, the most recent event is immediately pushed to the subscriber. | |
/// There can be multiple subscribers. | |
/// </summary> | |
IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId); | |
} | |
public static class AsyncExExtensions { | |
public static IConnectableObservable<T> AsConnectableObservable<T>(this AsyncProducerConsumerQueue<T> queue) { | |
return new ConnectableObservableForAsyncProducerConsumerQueue<T>(queue); | |
} | |
} | |
class ConnectableObservableForAsyncProducerConsumerQueue<T> : IConnectableObservable<T> { | |
readonly AsyncProducerConsumerQueue<T> Queue; | |
long _isConnected = 0; | |
ImmutableList<IObserver<T>> Observers = ImmutableList<IObserver<T>>.Empty; | |
public ConnectableObservableForAsyncProducerConsumerQueue(AsyncProducerConsumerQueue<T> queue) { | |
Queue = queue; | |
} | |
public IDisposable Connect() { | |
if (Interlocked.Exchange(ref _isConnected, 1) == 1) throw new InvalidOperationException("Observable cannot be connected more than once."); | |
var cts = new CancellationTokenSource(); | |
var token = cts.Token; | |
Task.Run(async () => { | |
try { | |
while (true) { | |
token.ThrowIfCancellationRequested(); | |
var @event = await Queue.DequeueAsync(token).ConfigureAwait(false); | |
foreach (var observer in Observers) | |
observer.OnNext(@event); | |
} | |
} catch (Exception x) when (x is OperationCanceledException || x is InvalidOperationException) { | |
foreach (var observer in Observers) | |
observer.OnCompleted(); | |
} | |
}); | |
return Disposable.Create(() => { | |
cts.Cancel(); | |
cts.Dispose(); | |
}); | |
} | |
readonly object subscriberListMutex = new object(); | |
public IDisposable Subscribe(IObserver<T> observer) { | |
lock (subscriberListMutex) { | |
Observers = Observers.Add(observer); | |
} | |
return Disposable.Create(() => { | |
lock (subscriberListMutex) { | |
Observers = Observers.Remove(observer); | |
} | |
}); | |
} | |
} | |
class DeviceService : IDeviceService { | |
readonly IObservable<IDeviceView> Source; | |
public DeviceService(IConnectableObservable<IDeviceView> source) { | |
/// When injected here, "source" is cold in the sense that it won't produce events until the first time it is subscribed. | |
/// However, for all subsequent subscriptions, it will be "hot" in the sense that it won't deliver any historical events, only new events. | |
/// "source" will throw an exception if its "Subscribe" method is called more than once as it is intended to have only one observer and | |
/// be read all the way from the beginning. | |
Source = source; | |
/// Callers of the "Subscribe" method below will expect data to be preloaded and will expect to be immediately delivered the most | |
/// recent event. So we need to immediately subscribe to "source" and start preloading the aggregate streams. | |
/// I'm assuming there is going to need to be a groupby to split the stream by device id. | |
var groups = source.GroupBy(x => x.DeviceId); | |
/// Now somehow we need to perform the aggregrate function on each grouping. | |
/// And create an observable that immediately delivers the most recent aggregated event when "Subscribe" is called below. | |
source.Connect(); | |
} | |
public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) { | |
/// How do we implement this? The observable that we return must be pre-loaded with the latest | |
throw new NotImplementedException(); | |
} | |
} | |
[TestClass] | |
public class Tests { | |
[TestMethod] | |
public async Task Test1() { | |
DeviceUpdateEvent deviceView1 = null; | |
DeviceUpdateEvent deviceView2 = null; | |
var input = new AsyncProducerConsumerQueue<IDeviceView>(); | |
var source = new ConnectableObservableForAsyncProducerConsumerQueue<IDeviceView>(input); | |
var id1 = Guid.NewGuid(); | |
var id2 = Guid.NewGuid(); | |
await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 1 }); | |
await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 2 }); | |
await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 100 }); | |
var service = new DeviceService(source); | |
service.GetDeviceStream(id1).Subscribe(x => deviceView1 = x); | |
service.GetDeviceStream(id2).Subscribe(x => deviceView2 = x); | |
/// I believe there is no need to pause here because the Subscribe method calls above | |
/// block until the events have all been pushed into the subscribers above. | |
Assert.AreEqual(deviceView1.View.DeviceId, id1); | |
Assert.AreEqual(deviceView2.View.DeviceId, id2); | |
Assert.AreEqual(deviceView1.View.Voltage, 2); | |
Assert.AreEqual(deviceView2.View.Voltage, 100); | |
await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 101 }); | |
await Task.Delay(100); /// Give the event time to propagate. | |
Assert.AreEqual(deviceView2.View.Voltage, 101); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Project Sdk="Microsoft.NET.Sdk"> | |
<PropertyGroup> | |
<OutputType>Exe</OutputType> | |
<TargetFramework>netcoreapp2.2</TargetFramework> | |
</PropertyGroup> | |
<ItemGroup> | |
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.0.1" /> | |
<PackageReference Include="MSTest.TestAdapter" Version="1.4.0" /> | |
<PackageReference Include="MSTest.TestFramework" Version="1.4.0" /> | |
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" /> | |
<PackageReference Include="Nito.AsyncEx" Version="5.0.0" /> | |
<PackageReference Include="Rx.Net.Plus" Version="1.1.10" /> | |
<PackageReference Include="System.Reactive" Version="4.1.6" /> | |
</ItemGroup> | |
</Project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment