Skip to content

Instantly share code, notes, and snippets.

@lucasteles
Last active October 29, 2024 20:01
Show Gist options
  • Save lucasteles/4a551714011289c95d433d404ae6f7e6 to your computer and use it in GitHub Desktop.
Save lucasteles/4a551714011289c95d433d404ae6f7e6 to your computer and use it in GitHub Desktop.
Multi API Request Samples
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Channels;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Diagnostics;
{
// Test Parameters
const int apiErrorPercentage = 10;
const int maxApiRetries = 3;
const int maxSimultaneousRequests = 10;
const int requestTotalCount = 120;
var timeout = TimeSpan.FromSeconds(10);
var apiLatency = (Min: 10, Max: 500);
const int randomSeed = 42;
var requestParamList = Enumerable.Range(0, requestTotalCount).Select(n => $"R{n + 1:D3}").ToArray();
// Execution Path
ApiService api = new()
{
ErrorPercentage = apiErrorPercentage,
MaxSimultaneousRequests = maxSimultaneousRequests,
Latency = apiLatency,
};
ApiProcessor processor = new(api, maxSimultaneousRequests, maxApiRetries);
TestExecutor executor = new( api, processor, timeout, randomSeed);
await executor.Run("V0", p => p.RequestAllV0, requestParamList);
await executor.Run("V1", p => p.RequestAllV1, requestParamList);
await executor.Run("V2", p => p.RequestAllV2, requestParamList);
await executor.Run("V3", p => p.RequestAllV3, requestParamList);
await executor.Run("V4", p => p.RequestAllV4, requestParamList);
executor.PrintSummary();
}
// Execution helper/benchmark
public class TestExecutor(
ApiService api,
ApiProcessor processor,
TimeSpan timeout,
int randomSeed
)
{
readonly List<Summary> summaries = [];
public async Task Run(
string name,
Func<ApiProcessor, Func<IEnumerable<string>, CancellationToken, Task<IReadOnlyList<string>>>> request,
IReadOnlyCollection<string> parameters
)
{
BeforeRun(name);
var startTimestamp = Stopwatch.GetTimestamp();
using CancellationTokenSource timeoutToken = new(timeout);
var responses = await request(processor).Invoke(parameters, timeoutToken.Token);
foreach (var response in responses)
Logger.Success(response);
Footer(name, startTimestamp, parameters.Count);
}
public async Task Run(
string name,
Func<ApiProcessor, Func<IEnumerable<string>, CancellationToken, IAsyncEnumerable<string>>> request,
IReadOnlyCollection<string> parameters
)
{
BeforeRun(name);
var startTimestamp = Stopwatch.GetTimestamp();
using CancellationTokenSource timeoutToken = new(timeout);
var responses = request(processor).Invoke(parameters, timeoutToken.Token);
await foreach (var response in responses)
Logger.Success(response);
Footer(name, startTimestamp, parameters.Count);
}
void BeforeRun(string message)
{
api.LocalRandom = new(Seed: randomSeed);
Console.WriteLine(Environment.NewLine);
Logger.Header($"{message}: Begin Requests");
}
void Footer(string name, long startTimeStamp, int totalRequests)
{
var elapsedTime = Stopwatch.GetElapsedTime(startTimeStamp);
// Show dead-letter
var deadLetter = processor.DeadLetter;
var deadLetterCount = deadLetter.Count;
if (deadLetterCount > 0)
{
Logger.Header("Dead Letter", color: ConsoleColor.Red);
var failed = string.Join("; ", deadLetter);
Logger.Important(failed);
}
// Show Results
Logger.Line();
var processedCount = totalRequests - deadLetterCount;
Summary summary = new(
Name: name,
ElapsedTime: elapsedTime,
Processed: $"{processedCount}/{totalRequests}",
SuccessRate: $"{processedCount * 100 / totalRequests}%"
);
summaries.Add(summary);
Logger.Info($"Results:\n {summary}");
}
public void PrintSummary()
{
Console.WriteLine(Environment.NewLine);
Logger.Header("Final Results: ");
foreach (var summary in summaries.OrderBy(x => x.ElapsedTime))
Logger.Info(summary.ToString());
}
record Summary(string Name, TimeSpan ElapsedTime, string Processed, string SuccessRate);
}
/// <summary>
/// Fake API
/// </summary>
public class ApiService
{
public class ApiException(string err) : Exception(err);
/// <summary>
/// Percentage of errors to be forced
/// </summary>
public int ErrorPercentage;
public int MaxSimultaneousRequests;
public (int Min, int Max) Latency = (0, 100);
public Random LocalRandom { private get; set; } = Random.Shared;
int requestCounter;
int simultaneousCounter;
public async Task<string> RequestOne(string param, CancellationToken ct = default)
{
if (Interlocked.Increment(ref simultaneousCounter) > MaxSimultaneousRequests)
throw new InvalidOperationException(
$"TEST BROKEN, MAX simultaneous requests overflow: {simultaneousCounter}");
try
{
if (LocalRandom.Next(0, 100) < ErrorPercentage)
throw new ApiException("Connection Refused!");
await Task.Delay(LocalRandom.Next(Latency.Min, Latency.Max), ct);
return $"{param.ToUpper()}-{Interlocked.Increment(ref requestCounter)}";
}
finally
{
Interlocked.Decrement(ref simultaneousCounter);
}
}
}
/// <summary>
/// Concurrent Request Logic
/// </summary>
public class ApiProcessor(
ApiService api,
int? maxSimultaneousRequests = null,
int maxRetries = 0
)
{
class MaxRetryException : Exception;
readonly int maxSimultaneousRequests = maxSimultaneousRequests ?? Environment.ProcessorCount;
readonly ConcurrentBag<string> deadLetter = [];
public IReadOnlyList<string> DeadLetter => deadLetter.ToArray().AsReadOnly();
async Task<string> RequestWithRetry(string request, CancellationToken ct)
{
// retry/dead-letter logic
var retryCount = 0;
while (retryCount < maxRetries)
try
{
if (retryCount > 0)
{
var retryDelay = TimeSpan.FromSeconds(Math.Pow(retryCount, 2));
Logger.Important($"Retrying({retryCount}) '{request}', waiting {retryDelay}");
await Task.Delay(retryDelay, ct);
}
// actual request
return await api.RequestOne(request, ct);
}
catch (ApiService.ApiException)
{
// increment on API Error
retryCount++;
}
deadLetter.Add(request);
throw new MaxRetryException();
}
public async Task<IReadOnlyList<string>> RequestAllV0(IEnumerable<string> requestParams,
CancellationToken ct)
{
deadLetter.Clear();
List<string> result = [];
foreach (var r in requestParams)
try
{
var response = await RequestWithRetry(r, ct);
result.Add(response);
}
catch (MaxRetryException)
{
// was sent to dead letter
Logger.Error($"Dead '{r}'");
}
catch (OperationCanceledException)
{
// stop if cancellation was requested
Logger.Note($"Skipping '{r}', cancellation requested");
}
return result.AsReadOnly();
}
public async IAsyncEnumerable<string> RequestAllV1(
IEnumerable<string> requestParams,
[EnumeratorCancellation] CancellationToken ct)
{
deadLetter.Clear();
// Channels are basically pub-subs
// I'm using it as a real-time output queue
var channel = Channel.CreateUnbounded<string>();
// using only one extra thread to asynchronously make all the requests
var writer = Task.Run(async () =>
{
using SemaphoreSlim semaphore = new(maxSimultaneousRequests, maxSimultaneousRequests);
await Task.WhenAll(requestParams.Select(async r =>
{
try
{
// so, yes, we need 2 try/catch blocks
// otherwise if this operation throws TaskCanceledException
// we will try to "release" the semaphore while it is full
// ReSharper disable AccessToDisposedClosure
await semaphore.WaitAsync(ct);
try
{
var response = await RequestWithRetry(r, ct);
await channel.Writer.WriteAsync(response, ct);
}
finally
{
semaphore.Release();
}
// ReSharper restore AccessToDisposedClosure
}
catch (MaxRetryException)
{
// was sent to dead letter
Logger.Error($"Dead '{r}'");
}
catch (OperationCanceledException)
{
// stop if cancellation was requested
Logger.Note($"Skipping '{r}', cancellation requested");
}
}));
// tell reader tha no more data will be written
channel.Writer.Complete();
}, ct);
while (true)
{
try
{
// wait data to be available
if (!await channel.Reader.WaitToReadAsync(ct))
break;
}
// check cancellation
catch (OperationCanceledException)
{
break;
}
// return each value as it became available
if (channel.Reader.TryRead(out var next))
yield return next;
}
await writer;
}
public async Task<IReadOnlyList<string>> RequestAllV2(IEnumerable<string> requestParams, CancellationToken ct)
{
deadLetter.Clear();
ConcurrentBag<string> result = [];
using SemaphoreSlim semaphore = new(maxSimultaneousRequests, maxSimultaneousRequests);
// using only one extra thread to asynchronously make all the requests
await Task.WhenAll(requestParams.Select(async r =>
{
try
{
// so, yes, we need 2 try/catch blocks
// otherwise if this operation throws TaskCanceledException
// we will try to "release" the semaphore while it is full
// ReSharper disable AccessToDisposedClosure
await semaphore.WaitAsync(ct);
try
{
Logger.Info($"waiting: {r}");
var response = await RequestWithRetry(r, ct);
result.Add(response);
}
finally
{
semaphore.Release();
// ReSharper restore AccessToDisposedClosure
}
}
catch (MaxRetryException)
{
// was sent to dead letter
Logger.Error($"Dead '{r}'");
}
catch (OperationCanceledException)
{
// stop if cancellation was requested
Logger.Note($"Skipping '{r}', cancellation requested");
}
}));
return result.ToArray().AsReadOnly();
}
public async Task<IReadOnlyList<string>> RequestAllV3(IEnumerable<string> requestParams, CancellationToken ct)
{
deadLetter.Clear();
ConcurrentBag<string> result = [];
var queue = new ConcurrentQueue<string>(requestParams);
// One thread per simultaneous item
var threads = Enumerable.Range(0, maxSimultaneousRequests).Select(_ => Task.Run(async () =>
{
while (queue.TryDequeue(out var r))
{
try
{
result.Add(await RequestWithRetry(r, ct));
}
catch (MaxRetryException)
{
// was sent to dead letter
Logger.Error($"Dead '{r}'");
}
catch (OperationCanceledException)
{
// stop if cancellation was requested
Logger.Note($"Skipping '{r}', cancellation requested");
break;
}
}
}, ct));
await Task.WhenAll(threads);
return result.ToArray().AsReadOnly();
}
public async Task<IReadOnlyList<string>> RequestAllV4(IEnumerable<string> requestParams, CancellationToken ct)
{
deadLetter.Clear();
ConcurrentBag<string> result = [];
await Parallel.ForEachAsync(
requestParams,
new ParallelOptions { CancellationToken = ct, MaxDegreeOfParallelism = maxSimultaneousRequests },
async (r, c) =>
{
try
{
var response = await RequestWithRetry(r, c);
result.Add(response);
}
catch (MaxRetryException)
{
// was sent to dead letter
Logger.Error($"Dead '{r}'");
}
catch (OperationCanceledException)
{
// stop if cancellation was requested
Logger.Note($"Skipping '{r}', cancellation requested");
}
});
return result.ToArray().AsReadOnly();
}
}
/// <summary>
/// Logger Helper
/// </summary>
public static class Logger
{
static void Log(string message, string prefix, ConsoleColor color)
{
Console.ForegroundColor = color;
Console.WriteLine($"{prefix} [{DateTime.UtcNow:T}] {message}");
Console.ForegroundColor = ConsoleColor.White;
}
public static void Info(string message) => Log(message, "==", ConsoleColor.White);
public static void Success(string message) => Log(message, "=>", ConsoleColor.Cyan);
public static void Important(string message) => Log(message, "->", ConsoleColor.Yellow);
public static void Note(string message) => Log(message, "- ", ConsoleColor.Blue);
public static void Error(string message) => Log(message, "*>", ConsoleColor.Red);
public static void Header(string message, int size = 40, ConsoleColor color = ConsoleColor.White)
{
Console.WriteLine();
Line(size);
Log($"-- {message} --", "", color);
Line(size);
}
public static void Line(int size = 40) => Console.WriteLine(new string('-', size));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment