Skip to content

Instantly share code, notes, and snippets.

@BlackDante
Created May 9, 2025 06:47
Show Gist options
  • Save BlackDante/bdb8cc786e58451518a5008536d1e668 to your computer and use it in GitHub Desktop.
Save BlackDante/bdb8cc786e58451518a5008536d1e668 to your computer and use it in GitHub Desktop.
public class Transaction
{
public string Id { get; set; }
public string Data { get; set; }
}
class Program
{
private static Subject<Transaction> _transactionStream = new();
static async Task Main(string[] args)
{
Console.WriteLine("Starting Rx.NET enrichment demo...");
// Setup reactive batching pipeline
_transactionStream
.Buffer(TimeSpan.FromMilliseconds(100), 500)
.Where(batch => batch.Count > 0)
.Subscribe(async batch =>
{
Console.WriteLine($"[Batcher] Processing batch of {batch.Count} transactions.");
var enriched = await CallEnrichmentServiceAsync(batch);
foreach (var tx in enriched)
{
Console.WriteLine($"[Batcher] Enriched: {tx.Id} -> {tx.Data}");
}
});
var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
var accountId = i;
tasks.Add(Task.Run(() =>
{
for (int j = 0; j < 10; j++)
{
var tx = new Transaction
{
Id = $"{accountId}-{j}",
Data = $"Data_{accountId}_{j}"
};
Console.WriteLine($"[Thread {accountId}] Publishing transaction {tx.Id}");
_transactionStream.OnNext(tx);
}
}));
}
await Task.WhenAll(tasks);
// Allow time for the last batch to process
await Task.Delay(1000);
_transactionStream.OnCompleted();
Console.WriteLine("All transactions processed.");
}
static Task<List<Transaction>> CallEnrichmentServiceAsync(IList<Transaction> transactions)
{
var enriched = new List<Transaction>();
foreach (var tx in transactions)
{
enriched.Add(new Transaction
{
Id = tx.Id,
Data = tx.Data + "_enriched"
});
}
return Task.FromResult(enriched);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment