Skip to content

Instantly share code, notes, and snippets.

@OnurGumus
Last active May 2, 2025 11:32
Show Gist options
  • Save OnurGumus/b5b313c02a56be9ca08238c994c8d0e3 to your computer and use it in GitHub Desktop.
Save OnurGumus/b5b313c02a56be9ca08238c994c8d0e3 to your computer and use it in GitHub Desktop.
akka.net pubsub
// See https://aka.ms/new-console-template for more information
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;
// Create an ActorSystem
using var system = ActorSystem.Create("StreamSystem");
// Create a Materializer
using var materializer = system.Materializer();
// Obtain a Sink and Source which will publish and receive from the "bus" respectively.
var (hubSink, hubSource) = MergeHub
.Source<string>(perProducerBufferSize: 16)
.Select(msg => $"{msg} Foo") // Append "Foo" to all messages in the hub
.ToMaterialized(BroadcastHub.Sink<string>(bufferSize: 256), Keep.Both)
.Run(materializer);
// Add a default ignore sink to prevent buffering
hubSource.To(Sink.Ignore<string>()).Run(materializer);
// Define the bus flow which includes a bidirectional KillSwitch
Flow<string, string, UniqueKillSwitch> busFlow = Flow.FromSinkAndSource(hubSink, hubSource)
.JoinMaterialized(KillSwitches.SingleBidi<string, string>(), Keep.Right)
.BackpressureTimeout(TimeSpan.FromSeconds(3));
// Define KillSwitch flow for the second source
var dedicatedKillSwitchFlow = KillSwitches.Single<string>();
// --- Stream Setups ---
// Console input source
var consoleQueue = Source
.Queue<string>(bufferSize: 100, overflowStrategy: OverflowStrategy.DropHead)
.Select(msg => $"{msg}") // Append "Foo" to each message
.To(hubSink) // Connect to the hub sink
.Run(materializer);
// Start a background task to read console input
_ = Task.Run(async () =>
{
while (true)
{
var line = await Console.In.ReadLineAsync();
if (line != null)
{
await consoleQueue.OfferAsync(line);
}
}
});
// First source & consumer setup using busFlow
var busKillSwitch = Source
.Repeat("Hello World")
.ZipWithIndex()
.Select(pair => $"{pair.Item1} {pair.Item2 + 1}") // Add 1-based index
.Throttle(1, TimeSpan.FromSeconds(10), 1, ThrottleMode.Shaping)
.ViaMaterialized(busFlow, Keep.Right) // Use busFlow, materialize its switch
.To(Sink.ForEach<string>(msg => Console.WriteLine($"BusFlow Received: {msg}"))) // Consume via busFlow
.Run(materializer);
// Second source - Another message with dedicated kill switch
var dedicatedKillSwitch = Source
.Repeat("Another message")
.ZipWithIndex()
.Select(pair => $"{pair.Item1} {pair.Item2 + 1}") // Add 1-based index
.Throttle(1, TimeSpan.FromSeconds(20), 1, ThrottleMode.Shaping)
.ViaMaterialized(dedicatedKillSwitchFlow, Keep.Right) // Apply and materialize dedicated switch
.ToMaterialized(hubSink, Keep.Left) // Connect directly to hub sink, keep the KillSwitch
.Run(materializer);
// --- Control Logic ---
// Keep the program running indefinitely
await Task.Delay(-1);
// Console.WriteLine("--- Both sources running. Press key to stop 'Another message' (dedicated switch)...");
// Console.ReadKey();
// Console.WriteLine("--- Shutting down 'Another message' source via dedicated KillSwitch...");
// dedicatedKillSwitch.Shutdown();
// Console.WriteLine("--- Only 'Hello World' source running (via busFlow). Press key to stop busFlow (kills producer AND consumer)...");
// Console.ReadKey();
// Console.WriteLine("--- Shutting down busFlow via its KillSwitch...");
// busKillSwitch.Shutdown();
// Console.WriteLine("--- No sources or consumers active. Press key to exit...");
// Console.ReadKey();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment