Skip to content

Instantly share code, notes, and snippets.

@dazinator
Last active January 23, 2025 16:25
Show Gist options
  • Save dazinator/2eb58bda2aaa1c9694b7ab07c1a5e196 to your computer and use it in GitHub Desktop.
Save dazinator/2eb58bda2aaa1c9694b7ab07c1a5e196 to your computer and use it in GitHub Desktop.
Mass Transit multibus outbox test
public static class MassTransitExtensions
{
public static IBusRegistrationConfigurator AddEntityFrameworkOutboxMultibus<TDbContext>(
this IBusRegistrationConfigurator configurator,
Action<EntityFrameworkOutboxConfigurator<TDbContext>>? configure)
where TDbContext : DbContext
{
AddEntityFrameworkOutbox<TDbContext>((IServiceCollection)configurator, configure);
return configurator;
}
public static IServiceCollection AddEntityFrameworkOutbox<TDbContext>(this IServiceCollection services, Action<EntityFrameworkOutboxConfigurator<TDbContext>>? configure)
where TDbContext : DbContext
{
var configurator = new EntityFrameworkOutboxConfigurator<TDbContext>(services);
configure?.Invoke(configurator);
return services;
}
public class EntityFrameworkOutboxConfigurator<TDbContext>
where TDbContext : DbContext
{
private readonly IServiceCollection _services;
private Action<EntityFrameworkOutboxOptions>? ConfigureEntityFrameworkOutboxOptionsCallback { get; set; }
public EntityFrameworkOutboxConfigurator<TDbContext> ConfigureProvider(Action<EntityFrameworkOutboxOptions> configureOutboxOptions)
{
this.ConfigureEntityFrameworkOutboxOptionsCallback = configureOutboxOptions;
return this;
}
public EntityFrameworkOutboxConfigurator(IServiceCollection services)
{
// there can only be one per dbcontext type
// however its using the
services.TryAddScoped<IOutboxContextFactory<TDbContext>, EntityFrameworkOutboxContextFactory<TDbContext>>();
services.AddOptions<EntityFrameworkOutboxOptions<TDbContext>>()
.Configure(options =>
{
ConfigureEntityFrameworkOutboxOptionsCallback?.Invoke(options);
});
_services = services;
}
public EntityFrameworkOutboxConfigurator<TDbContext> AddBusOutbox<TBus>(Action<EntityFrameworkBusOutboxConfigurator<TBus, TDbContext>>? configureBusOutboxOptions)
where TBus : class, IBus, IBusControl
{
var builder = new EntityFrameworkBusOutboxConfigurator<TBus, TDbContext>();
configureBusOutboxOptions?.Invoke(builder);
builder.Build(_services);
return this;
}
}
public class EntityFrameworkBusOutboxConfigurator<TBus, TDbContext>
where TBus : class, IBus, IBusControl
where TDbContext : DbContext
{
private Action<OutboxDeliveryServiceOptions> ConfigureOutboxDeliveryOptionsCallback { get; set; }
public bool DeliveryServiceEnabled { get; private set; } = true;
public bool InboxCleanupServiceEnabled { get; private set; } = true;
private Action<InboxCleanupServiceOptions> ConfigureInboxCleanupServiceOptionsCallback { get; set; }
public EntityFrameworkBusOutboxConfigurator<TBus, TDbContext> ConfigureOutboxDeliveryOptions(Action<OutboxDeliveryServiceOptions> configureOutboxOptions)
{
this.ConfigureOutboxDeliveryOptionsCallback = configureOutboxOptions;
return this;
}
public EntityFrameworkBusOutboxConfigurator<TBus, TDbContext> ConfigureInboxCleanupOptions(Action<InboxCleanupServiceOptions> configureOutboxOptions)
{
this.ConfigureInboxCleanupServiceOptionsCallback = configureOutboxOptions;
return this;
}
/// <summary>
/// Don't register the hosted service that delivers messages from the outbox for this bus type.
/// </summary>
public EntityFrameworkBusOutboxConfigurator<TBus, TDbContext> DisableDeliveryService()
{
this.DeliveryServiceEnabled = false;
return this;
}
public EntityFrameworkBusOutboxConfigurator<TBus, TDbContext> DisableInboxCleanupService()
{
this.InboxCleanupServiceEnabled = false;
return this;
}
internal void Build(IServiceCollection services)
{
// see: https://github.com/MassTransit/MassTransit/issues/5781
// https://github.com/MassTransit/MassTransit/blob/29ec12e7a5c8e504d8babf6a63f57655db3a5a81/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/EntityFrameworkScopedBusContext.cs#L89
// https://github.com/MassTransit/MassTransit/blob/29ec12e7a5c8e504d8babf6a63f57655db3a5a81/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/Configuration/Configuration/EntityFrameworkBusOutboxConfigurator.cs#L43
// fix up the services that are not registered at a bus instance level for multi-bus support.
services.AddScoped<IScopedBusContextProvider<TBus>, MultibusEntityFrameworkScopedBusContextProvider<TBus, TDbContext>>();
services.AddSingleton<BusOutboxNotification<TBus>>();
services.AddOptions<OutboxDeliveryServiceOptions<TBus>>()
.Configure(options =>
{
ConfigureOutboxDeliveryOptionsCallback?.Invoke(options);
});
if (DeliveryServiceEnabled)
{
// replace with one scoped for bus instance.
services.AddHostedService<BusOutboxDeliveryService<TBus, TDbContext>>();
}
services.AddOptions<InboxCleanupServiceOptions<TBus>>()
.Configure(options =>
{
ConfigureInboxCleanupServiceOptionsCallback?.Invoke(options);
});
if (InboxCleanupServiceEnabled)
{
services.AddHostedService<InboxCleanupService<TBus, TDbContext>>();
}
}
}
public class BusOutboxNotification<TBus> : BusOutboxNotification
where TBus : class, IBus
{
public BusOutboxNotification(IOptions<OutboxDeliveryServiceOptions<TBus>> options) : base(options)
{
}
}
/// <summary>
/// A version of <see cref="EntityFrameworkScopedBusContextProvider{TBus, TDbContext}"/> that passes correctly scoped dependencies when using a multi-bus setup.
/// </summary>
/// <typeparam name="TBus"></typeparam>
/// <typeparam name="TDbContext"></typeparam>
/// <remarks>See: https://github.com/MassTransit/MassTransit/issues/5781</remarks>
public class MultibusEntityFrameworkScopedBusContextProvider<TBus, TDbContext> : EntityFrameworkScopedBusContextProvider<TBus, TDbContext>
where TBus : class, IBus
where TDbContext : DbContext
{
public MultibusEntityFrameworkScopedBusContextProvider(TBus bus,
TDbContext dbContext,
BusOutboxNotification<TBus> notification, // Note: scoped to bus instance here
Bind<TBus, IClientFactory> clientFactory,
Bind<TBus, IScopedConsumeContextProvider> consumeContextProvider,
IScopedConsumeContextProvider globalConsumeContextProvider,
IServiceProvider provider) : base(bus, dbContext, notification, clientFactory, consumeContextProvider, globalConsumeContextProvider, provider)
{
// this.AddSingleton(provider => Bind<IBus, TRider>.Create(CreateRegistrationContext(provider)));
}
}
public class OutboxDeliveryServiceOptions<TBus> : OutboxDeliveryServiceOptions
where TBus : class, IBus
{
}
// Added once per db context.
// if multiple buses use the same db context, they'll use the same options here.
// doesn't make sense to configure these differently per bus only per dbcontext.
public class EntityFrameworkOutboxOptions<TDbContext> : EntityFrameworkOutboxOptions
where TDbContext : DbContext
{
}
public class BusOutboxDeliveryService<TBus, TDbContext> : BusOutboxDeliveryService<TDbContext>
where TBus : class, IBus, IBusControl
where TDbContext : DbContext
{
public BusOutboxDeliveryService(
TBus busControl, // Note: Using typed bus instance here
IOptions<OutboxDeliveryServiceOptions<TBus>> options, // Note: Using typed bus instance here
IOptions<EntityFrameworkOutboxOptions<TDbContext>> outboxOptions,
BusOutboxNotification<TBus> notification, // Note: scoped to bus instance here
ILogger<BusOutboxDeliveryService<TDbContext>> logger,
IServiceProvider provider
)
: base(busControl, options, outboxOptions, notification, logger, provider)
{
}
}
public class InboxCleanupService<TBus, TDbContext> : InboxCleanupService<TDbContext>
where TBus : class, IBus, IBusControl
where TDbContext : DbContext
{
public InboxCleanupService(
IOptions<InboxCleanupServiceOptions<TBus>> options,
ILogger<InboxCleanupService<TDbContext>> logger,
IServiceProvider provider
)
: base(options, logger, provider)
{
}
}
public class InboxCleanupServiceOptions<TBus> : InboxCleanupServiceOptions
where TBus : class, IBus
{
}
public class EntityFrameworkOutboxContextFactory<TBus, TDbContext> : EntityFrameworkOutboxContextFactory<TDbContext>
where TBus : class, IBus
where TDbContext : DbContext
{
public EntityFrameworkOutboxContextFactory(TDbContext dbContext, IServiceProvider provider, IOptions<EntityFrameworkOutboxOptions<TDbContext>> options)
: base(dbContext, provider, options)
{
}
}
}
[Exploratory]
public class MassTransitTests
{
[Fact]
public async Task Outbox_DoesntWorkWhenUsingTypedBusRegistration_ReproFixAttemptMultipleBuses()
{
var services = new ServiceCollection();
AddMassTransitBus<IModuleBusA, TestDbContext>(services);
AddMassTransitBus<IModuleBusB, TestDbContext>(services);
using var sharedConnection = CreateSqliteInMemoryConnection();
sharedConnection.Open();
services.AddDbContext<TestDbContext>((sp, builder) =>
{
builder.UseSqlite(sharedConnection, (o) =>
{
});
}, ServiceLifetime.Scoped, ServiceLifetime.Singleton);
var sp = services.BuildServiceProvider();
var scopeCreateDb = sp.CreateScope();
{
var dbContext = scopeCreateDb.ServiceProvider.GetRequiredService<TestDbContext>();
await dbContext.Database.EnsureCreatedAsync();
}
var scope = sp.CreateScope();
{
var dbContext = scope.ServiceProvider.GetRequiredService<TestDbContext>();
var cancelToken = CancellationToken.None;
await using var newTrans = await dbContext.Database.BeginTransactionAsync(cancelToken);
await AssertSendToBusOutbox<IModuleBusA>(scope);
await AssertSendToBusOutbox<IModuleBusB>(scope);
await dbContext.SaveChangesAsync();
//
var outboxMessages = await dbContext.OutboxMessages.ToListAsync();
Assert.NotEmpty(outboxMessages);
Assert.Equal(2, outboxMessages.Count);
}
}
private static async Task AssertSendToBusOutbox<TBus>(IServiceScope scope, CancellationToken cancellation = default)
where TBus : class, IBus
{
var bus = scope.ServiceProvider.GetRequiredService<TBus>();
string myqueueUri = "queue:foo";
// incorrect endpoint types - none are using the outbox
var endpint = await bus.GetPublishSendEndpoint<TestMessage>();
var endpoint2 = scope.ServiceProvider.GetRequiredService<Bind<TBus, IPublishEndpoint>>();
var endpointProvider = scope.ServiceProvider.GetRequiredService<Bind<TBus, ISendEndpointProvider>>();
var endpoint3 = await endpointProvider.Value.GetSendEndpoint(new Uri(myqueueUri));
Assert.IsType<MassTransit.Middleware.Outbox.OutboxSendEndpointProvider>(endpointProvider.Value);
var message = new TestMessage();
await endpoint3.Send(message, cancellation);
}
private void AddMassTransitBus<T, TDbContext>(ServiceCollection services)
where T : class, IBus
where TDbContext : DbContext
{
services.AddMassTransit<T>(x =>
{
x.AddEntityFrameworkOutbox<TDbContext>(options =>
{
options.QueryDelay = TimeSpan.FromSeconds(5);
// TODO: Check to see what db provider is being used.
options.UseSqlServer()
.UseBusOutbox(a =>
{
a.DisableDeliveryService();
});
options.DisableInboxCleanupService();
});
// https://github.com/MassTransit/MassTransit/blob/29ec12e7a5c8e504d8babf6a63f57655db3a5a81/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/EntityFrameworkScopedBusContext.cs#L89
// https://github.com/MassTransit/MassTransit/blob/29ec12e7a5c8e504d8babf6a63f57655db3a5a81/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/Configuration/Configuration/EntityFrameworkBusOutboxConfigurator.cs#L43
x.ReplaceScoped<IScopedBusContextProvider<T>, EntityFrameworkScopedBusContextProvider<T, TDbContext>>(); // this appears to fix send endpoint provider.
x.UsingRabbitMq((x, cfg) =>
{
//cfg.UseMessageScope(x);
});
x.RemoveMassTransitHostedService();
});
}
private const string InMemoryPrivateConnectionString = "DataSource=:memory:";
private const string InMemorySharedConnectionString = "DataSource=Sharable;Mode=Memory;Cache=Shared";
public static SqliteConnection CreateSqliteInMemoryConnection(bool useSharedMemory = false)
{
var connString = useSharedMemory ? InMemorySharedConnectionString : InMemoryPrivateConnectionString;
var connection = new SqliteConnection(connString);
// see https://stackoverflow.com/questions/47292970/testing-asp-net-core-application-with-sqlite-throws-unknown-function-newid
connection.CreateFunction("NEWSEQUENTIALID", () => Guid.NewGuid());
connection.CreateFunction("GETUTCDATE", () => DateTime.UtcNow);
return connection;
}
public interface IModuleBusA : IBus, IPublishEndpoint, IBusControl, ISendEndpointProvider
{
// No additional members needed - just a marker interface
}
public interface IModuleBusB : IBus, IPublishEndpoint, IBusControl, ISendEndpointProvider
{
// No additional members needed - just a marker interface
}
public record TestMessage
{
}
public class TestDbContext : DbContext
{
public TestDbContext(DbContextOptions<TestDbContext> options) : base(options)
{
}
public DbSet<TestEntity> TestEntities { get; set; }
public DbSet<OutboxMessage> OutboxMessages { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<TestEntity>().HasKey(e => e.Id);
modelBuilder.AddInboxStateEntity();
modelBuilder.AddOutboxMessageEntity();
modelBuilder.AddOutboxStateEntity();
}
}
public class TestEntity
{
public int Id { get; set; }
public string Name { get; set; }
}
}
services.AddMassTransit<TBus>(x =>
{
// See https://github.com/MassTransit/MassTransit/issues/5781
x.AddEntityFrameworkOutboxMultibus<TDbContext>(options =>
{
options.ConfigureProvider(x =>
{
// sql server is the default already.
//x.LockStatementProvider = new SqlServerLockStatementProvider();
})
.AddBusOutbox<TBus>(a => // stuff here is now scoped at the types bus level
{
a
// .DisableDeliveryService() // delivers message from the table scoped to this specific bus
// .DisableInboxCleanupService() // cleans from the table scoped to this specific bus
.ConfigureOutboxDeliveryOptions(a =>
{
a.QueryDelay = TimeSpan.FromSeconds(5);
})
.ConfigureInboxCleanupOptions(a =>
{
a.QueryDelay = TimeSpan.FromSeconds(5);
});
});
});
// other stuff
}
@dazinator
Copy link
Author

not sure if a singleton IBusNotificationService is going to be problematic

https://github.com/MassTransit/MassTransit/blob/29ec12e7a5c8e504d8babf6a63f57655db3a5a81/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/Configuration/Configuration/EntityFrameworkBusOutboxConfigurator.cs#L44

@dazinator
Copy link
Author

Yes I think this outbox notification service should honour the outbox settings for the particular bus, which it doesn't presently.
I'll most likely derive a generic IBusOutboxNotification<TBus> and then register EntityFrameworkScopedBusContextProvider and resolv eit myself passing the resolved IBusOutboxNotification<TBus> as the IBusNotificationService .
It feels like I could use Bind<> but I don't really know how that works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment