Last active
August 20, 2018 09:52
-
-
Save bboyle1234/9ac5a469dbeb5ea38e7994837b008cda to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Apex.LoggingUtils; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.Extensions.DependencyInjection.Extensions; | |
using Microsoft.Extensions.Logging; | |
using Nito.AsyncEx; | |
using Orleans; | |
using Orleans.Runtime; | |
using Orleans.Streams; | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using static Apex.TaskUtilities.Tasks; | |
namespace Apex.Grains { | |
/// <summary> | |
/// Class recreates and connects a cluster client after a disconnection, raising events to inform consumers. | |
/// An example of this might be the interval while a cluster being restarted or upgraded. | |
/// </summary> | |
public abstract class ReconnectingClusterClient : IClusterClient { | |
public event Action<ReconnectingClusterClient> Connected; | |
public event Action<ReconnectingClusterClient> Disconnected; | |
public bool IsConnected { get; set; } | |
public Exception Error { get; private set; } | |
readonly ILogger Log; | |
readonly CancellationTokenSource Disposed = new CancellationTokenSource(); | |
// This ManualResetEvent allows multiple consumers to await the same connection event without affecting each other. | |
// The trick is to make sure we manually reset it when disconnected. | |
readonly AsyncManualResetEvent ConnectedEvent = new AsyncManualResetEvent(false); | |
IClusterClient _client; | |
public ReconnectingClusterClient(ILoggerFactory loggerFactory) { | |
// Because we're an abstract class we don't use the generic method. | |
Log = loggerFactory.CreateLogger(GetType()); | |
FireAndForget(Connect); | |
} | |
public async Task WaitForConnection(TimeSpan waitTime) { | |
using (var cts = new CancellationTokenSource(waitTime)) { | |
await WaitForConnection(cts.Token); | |
} | |
} | |
public async Task WaitForConnection(CancellationToken ct) { | |
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(ct, Disposed.Token)) { | |
await ConnectedEvent.WaitAsync(cts.Token); | |
} | |
} | |
async Task Connect() { | |
await CreateAndConnect(); | |
if (Disposed.IsCancellationRequested) return; // We never managed to connect before we were disposed. | |
IsConnected = true; | |
ConnectedEvent.Set(); | |
try { | |
Connected?.Invoke(this); | |
} catch (Exception x) { | |
Log.LogError(x, $"Exception thrown while executing the 'Connected' event handler."); | |
} | |
} | |
async Task CreateAndConnect() { | |
/// The orleans client has the ability to keep retrying connections before its first connection. (It won't retry after a disconnection) | |
/// In this implementation we don't use the built-in client's ability to retry connections during its first connection attempt, because | |
/// we found that it messed up stream subscriptions. Therefore we now recreate the client for each connection attempt. | |
int connectionFailCount = 0; | |
while (true) { | |
if (Disposed.IsCancellationRequested) return; | |
try { | |
var builder = new ClientBuilder(); | |
Configure(builder); | |
builder.AddClusterConnectionLostHandler(OnDisconnected); | |
_client = builder.Build(); | |
} catch (Exception x) { | |
Error = x; | |
Log.LogError(x, $"Exception thrown while configuring client."); | |
throw; | |
} | |
try { | |
/// This overload does not allow the client to reattempt connection if it fails the first time. | |
/// Tests showed that stream subscriptions didn't work so well when re-attempted connections were made, | |
/// so it was decided that on failed connection attempt, the client would be disposed, and a new one created. | |
await _client.Connect(); | |
Log.LogInformation("Connected."); | |
return; | |
} catch (Exception x) { | |
Error = x; | |
_client.Dispose(); | |
if (connectionFailCount++ < 5) | |
Log.LogWarning(x, $"Failed to connect."); | |
} | |
await Task.Delay(1000); // Attempt reconnection once per second. | |
} | |
} | |
protected abstract void Configure(ClientBuilder builder); | |
void OnDisconnected(object sender, EventArgs e) { | |
IsConnected = false; | |
ConnectedEvent.Reset(); | |
_client?.Dispose(); | |
if (Disposed.IsCancellationRequested) return; | |
try { | |
Disconnected?.Invoke(this); | |
} catch (Exception x) { | |
Log.LogError(x, $"Exception thrown while executing the 'Disconnected' event handler."); | |
} | |
FireAndForget(Connect); | |
} | |
public void Dispose() { | |
Disposed.Cancel(); | |
_client?.Close(); // required for graceful (informing silo) disconnection | |
_client?.Dispose(); // does not inform silo | |
} | |
#region IClusterClient | |
// There is potential for null reference / object disposed exceptions here if these properties/methods are called while the _client is not properly connected. | |
// For code simplicity, and because the consumers "know" about it, I've decided not to add handling for that scenario. | |
public bool IsInitialized => _client.IsInitialized; | |
public IServiceProvider ServiceProvider => _client.ServiceProvider; | |
// uhhh --- yep, code smell, I know. | |
public Task Connect(Func<Exception, Task<bool>> retryFilter = null) => throw new NotImplementedException("This client starts connection attempts immediately. Try using 'WaitForConnection()' instead."); | |
public Task Close() => throw new NotImplementedException("Use Dispose() instead."); | |
public void Abort() => throw new NotImplementedException("Use Dispose() instead."); | |
public IStreamProvider GetStreamProvider(string name) => _client.GetStreamProvider(name); | |
public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidKey | |
=> _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerKey | |
=> _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
public TGrainInterface GetGrain<TGrainInterface>(string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey | |
=> _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidCompoundKey | |
=> _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix); | |
public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerCompoundKey | |
=> _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix); | |
public Task<TGrainObserverInterface> CreateObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver | |
=> _client.CreateObjectReference<TGrainObserverInterface>(obj); | |
public Task DeleteObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver | |
=> _client.DeleteObjectReference<TGrainObserverInterface>(obj); | |
public void BindGrainReference(IAddressable grain) | |
=> _client.BindGrainReference(grain); | |
#endregion | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@martinothamar, updated.