Skip to content

Instantly share code, notes, and snippets.

@AntonSmolkov
Last active June 17, 2024 15:31
Show Gist options
  • Select an option

  • Save AntonSmolkov/e561fa2bf92eaab9c1423c2f818754ed to your computer and use it in GitHub Desktop.

Select an option

Save AntonSmolkov/e561fa2bf92eaab9c1423c2f818754ed to your computer and use it in GitHub Desktop.
NATS JetStreams partitions publishing code
using NATS.Client.Core;
using NATS.Client.JetStream;
namespace Zyfra.Udl.Tools.NatsPerfTester.Producer.Publishers;
public class CoreNatsPublisher(INatsConnection connection) : INatsPublisher
{
public ValueTask PublishAsync<T>(
string subject,
T? data,
NatsHeaders? headers = default,
string? replyTo = default,
INatsSerialize<T>? serializer = default,
NatsJSPubOpts? opts = default,
CancellationToken cancellationToken = default) =>
connection.PublishAsync(subject, data: data, opts: opts, serializer: serializer, cancellationToken: cancellationToken);
}
using NATS.Client.Core;
using NATS.Client.JetStream;
namespace Zyfra.Udl.Tools.NatsPerfTester.Producer.Publishers;
public class JsNatsPublisher(INatsJSContext jsContext) : INatsPublisher
{
public async ValueTask PublishAsync<T>(
string subject,
T? data,
NatsHeaders? headers = default,
string? replyTo = default,
INatsSerialize<T>? serializer = default,
NatsJSPubOpts? opts = default,
CancellationToken cancellationToken = default) =>
await jsContext.PublishAsync(subject, data: data, opts: opts, serializer: serializer, cancellationToken: cancellationToken);
}
using System.Diagnostics;
using System.IO.Hashing;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Options;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using Prometheus;
using Zyfra.Udl.Tools.NatsPerfTester.Dtos;
using Zyfra.Udl.Tools.NatsPerfTester.Misc;
using Zyfra.Udl.Tools.NatsPerfTester.Producer.Publishers;
namespace Zyfra.Udl.Tools.NatsPerfTester.Producer;
public class ProducerWorker : BackgroundService
{
private readonly ILogger<ProducerWorker> _logger;
private readonly MainOptions _mainOptions;
private readonly Stopwatch _sw;
private readonly Counter _producersPublishedTotal;
private readonly NatsOpts _natsOpts;
public ProducerWorker(IOptions<MainOptions> mainOptions, ILoggerFactory loggerFactory, ILogger<ProducerWorker> logger)
{
_logger = logger;
_mainOptions = mainOptions.Value;
_natsOpts = new NatsOpts
{
Url = _mainOptions.NatsUrl,
LoggerFactory = loggerFactory,
SerializerRegistry = new NatsProtoBufSerializerRegistry(),
Name = "NATS UDL producer",
TlsOpts = new NatsTlsOpts()
{
InsecureSkipVerify = _mainOptions.NatsTlsSkipVerify,
CaFile = _mainOptions.NatsTlsSkipVerify ? null : _mainOptions.NatsTlsCaCertPath
},
AuthOpts = new NatsAuthOpts()
{
Username = _mainOptions.NatsUserName,
Password = _mainOptions.NatsPassword
}
};
_sw = new Stopwatch();
_producersPublishedTotal = Metrics.CreateCounter(
"udl_lt_producers_js_published_total",
"Total number of published messages to JetStreams");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (_mainOptions.NatsJetStreamsCreate)
{
await using var natsConnection = new NatsConnection(_natsOpts);
var jsContext = new NatsJSContext(natsConnection);
for (var i = 0; i < _mainOptions.NatsJetStreamsPartitionsCount; i++)
{
var streamName = $"udl-js-partition-{i}";
var streamConfig = new StreamConfig(streamName, [$"*.at-least-once.{i}"])
{
Compression = _mainOptions.NatsJetStreamsCompressionEnabled ? StreamConfigCompression.S2 : StreamConfigCompression.None,
Storage = _mainOptions.NatsJetStreamsUseInMemoryStorage ? StreamConfigStorage.Memory : StreamConfigStorage.File,
NumReplicas = _mainOptions.NatsJetStreamsReplicasCount,
DuplicateWindow = TimeSpan.FromSeconds(_mainOptions.NatsJetStreamsDedupWindowSec),
NoAck = !_mainOptions.NatsJetStreamsAcksEnabled
};
try
{
await jsContext.CreateStreamAsync(streamConfig, cancellationToken: stoppingToken);
}
catch (NatsJSApiException e) when(e.Error.ErrCode == 10058)
{
}
}
}
_sw.Start();
for (var i = 0; i < _mainOptions.NatsProducersParallel; i++)
{
var producerIndex = i;
_ = Task.Run(() => GenerateAndPublishMessagesAsync(producerIndex, stoppingToken), stoppingToken);
}
_ = Task.Run(() => StartReportingAsync(stoppingToken), stoppingToken);
}
private async Task GenerateAndPublishMessagesAsync(int producerIndex, CancellationToken cancellationToken)
{
await using var natsConnection = new NatsConnection(_natsOpts);
INatsPublisher natsPublisher = _mainOptions.NatsProducersUseCoreInsteadOfJetStream
? new CoreNatsPublisher(natsConnection)
: new JsNatsPublisher(new NatsJSContext(natsConnection));
_logger.LogInformation("Producer {ProducerIndex} started", producerIndex);
while (!cancellationToken.IsCancellationRequested)
{
try
{
var propertyId = Guid.NewGuid();
var propertyIdStr = propertyId.ToString();
string msgSubject;
if (_mainOptions.NatsProducersUseSingleSubject)
{
var partition = Random.Shared.NextInt64(0, _mainOptions.NatsJetStreamsPartitionsCount);
msgSubject = $"single.at-least-once.{partition}";
}
else
{
var partition = Crc32.HashToUInt32(propertyId.ToByteArray()) % _mainOptions.NatsJetStreamsPartitionsCount;
msgSubject = $"{propertyId}.at-least-once.{partition}";
}
var natsOpts = _mainOptions.NatsProducersIncludeMessageId
? new NatsJSPubOpts() { MsgId = msgSubject }
: null;
var udlValue = new UdlValue()
{
PropertyId = propertyIdStr,
Value = new TypedValue() { Double = Random.Shared.NextDouble() },
Timestamp = Timestamp.FromDateTime(DateTime.UtcNow),
QualityType = UdlValueQualityType.Good,
QualityStatus = UdlValueQualityStatus.StubStatus1
};
await natsPublisher.PublishAsync(msgSubject,
data: udlValue,
opts: natsOpts,
serializer: NatsProtoBufSerializer<UdlValue>.Default,
cancellationToken: cancellationToken);
_producersPublishedTotal.Inc();
}
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
{
}
catch (Exception e)
{
_logger.LogError(e, "Producer {ProducerIndex}. An error has occured while generating and publishing message", producerIndex);
}
}
_logger.LogInformation("Producer {ProducerIndex} finished", producerIndex);
}
private async Task StartReportingAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(1000, cancellationToken);
if (_sw.ElapsedMilliseconds == 0)
continue;
var elapsedSeconds = _sw.ElapsedMilliseconds / 1000;
var producedPerSecond = _producersPublishedTotal.Value / elapsedSeconds;
_logger.LogInformation("Producing rate: {ProducedPerSecond} values per second. (Elapsed sec: {ElapsedSeconds}. Produced: {ProducedTotal})", producedPerSecond, elapsedSeconds, _producersPublishedTotal.Value);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment