Created
April 20, 2016 07:14
-
-
Save mookid8000/accd90b15dc23f8221c4a3bfcd89cf17 to your computer and use it in GitHub Desktop.
Subscriber caching extension for Rebus2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class SubscriberCacheExtensions | |
{ | |
public static void EnableSubscriberCache(this OptionsConfigurer configurer) | |
{ | |
configurer.Decorate<ISubscriptionStorage>(c => | |
{ | |
var subscriptionStorage = c.Get<ISubscriptionStorage>(); | |
return new CachingSubscriptionStorage(subscriptionStorage); | |
}); | |
} | |
class CachingSubscriptionStorage : ISubscriptionStorage | |
{ | |
readonly ISubscriptionStorage _subscriptionStorage; | |
public CachingSubscriptionStorage(ISubscriptionStorage subscriptionStorage) | |
{ | |
_subscriptionStorage = subscriptionStorage; | |
} | |
public async Task<string[]> GetSubscriberAddresses(string topic) | |
{ | |
if (MessageContext.Current == null) | |
{ | |
return await _subscriptionStorage.GetSubscriberAddresses(topic); | |
} | |
var cache = GetCache(); | |
string[] subscribers; | |
if (cache.TryGetValue(topic, out subscribers)) | |
{ | |
return subscribers; | |
} | |
subscribers = await _subscriptionStorage.GetSubscriberAddresses(topic); | |
cache[topic] = subscribers; | |
return subscribers; | |
} | |
public async Task RegisterSubscriber(string topic, string subscriberAddress) | |
{ | |
if (MessageContext.Current != null) | |
{ | |
FlushCache(topic); | |
} | |
await _subscriptionStorage.RegisterSubscriber(topic, subscriberAddress); | |
} | |
public async Task UnregisterSubscriber(string topic, string subscriberAddress) | |
{ | |
if (MessageContext.Current != null) | |
{ | |
FlushCache(topic); | |
} | |
await _subscriptionStorage.UnregisterSubscriber(topic, subscriberAddress); | |
} | |
public bool IsCentralized => _subscriptionStorage.IsCentralized; | |
static ConcurrentDictionary<string, string[]> GetCache() | |
{ | |
var transactionContext = MessageContext.Current.TransactionContext; | |
var cache = transactionContext.GetOrAdd("cached-subscribers", () => new ConcurrentDictionary<string, string[]>()); | |
return cache; | |
} | |
static void FlushCache(string topic) | |
{ | |
var cache = GetCache(); | |
string[] temp; | |
cache.TryRemove(topic, out temp); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment