Ниже — рабочий “скелет”, который можно положить в проект и дальше обрастить Kafka/RabbitMQ/NATS, HTTP-клиентами и нормальной бизнес-логикой.
Идея такая:
Transactional Outbox: бизнес-изменение и запись события делаются в одной MySQL-транзакции. Потом отдельный воркер читает outbox_events и публикует события. В Go для этого нормально использовать database/sql, BeginTx, Commit, Rollback; официальная документация Go отдельно предупреждает не смешивать sql.Tx и обычные sql.DB-методы внутри одной транзакции. (go.dev) Для MySQL можно использовать github.com/go-sql-driver/mysql, он подключается как драйвер для database/sql. (GitHub)
Saga Compensation: каждый шаг саги хранит состояние. Если шаг упал, уже выполненные шаги откатываются компенсационными действиями в обратном порядке.
CREATE TABLE orders (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
customer_id BIGINT NOT NULL,
status VARCHAR(32) NOT NULL,
total_cents BIGINT NOT NULL DEFAULT 0,
created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
ON UPDATE CURRENT_TIMESTAMP(6)
) ENGINE=InnoDB;
CREATE TABLE order_items (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
order_id BIGINT NOT NULL,
sku VARCHAR(64) NOT NULL,
qty INT NOT NULL,
price_cents BIGINT NOT NULL,
FOREIGN KEY (order_id) REFERENCES orders(id)
) ENGINE=InnoDB;
CREATE TABLE outbox_events (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
aggregate_type VARCHAR(64) NOT NULL,
aggregate_id BIGINT NOT NULL,
event_type VARCHAR(128) NOT NULL,
payload JSON NOT NULL,
status ENUM('PENDING', 'PROCESSING', 'PUBLISHED', 'FAILED')
NOT NULL DEFAULT 'PENDING',
attempts INT NOT NULL DEFAULT 0,
locked_by VARCHAR(128) NULL,
locked_until DATETIME(6) NULL,
last_error TEXT NULL,
published_at DATETIME(6) NULL,
-- ключ идемпотентности, чтобы не создать одно и то же событие дважды
idempotency_key VARCHAR(255) NOT NULL,
created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
ON UPDATE CURRENT_TIMESTAMP(6),
UNIQUE KEY uq_outbox_idempotency_key (idempotency_key),
KEY idx_outbox_fetch (status, created_at),
KEY idx_outbox_lock (status, locked_until)
) ENGINE=InnoDB;Для Saga:
CREATE TABLE inventory (
sku VARCHAR(64) PRIMARY KEY,
available INT NOT NULL,
reserved INT NOT NULL DEFAULT 0
) ENGINE=InnoDB;
CREATE TABLE payments (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
order_id BIGINT NOT NULL,
amount_cents BIGINT NOT NULL,
status VARCHAR(32) NOT NULL,
created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
UNIQUE KEY uq_payment_order (order_id)
) ENGINE=InnoDB;
CREATE TABLE shipments (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
order_id BIGINT NOT NULL,
status VARCHAR(32) NOT NULL,
created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
UNIQUE KEY uq_shipment_order (order_id)
) ENGINE=InnoDB;
CREATE TABLE saga_instances (
id CHAR(36) PRIMARY KEY,
saga_type VARCHAR(128) NOT NULL,
correlation_id VARCHAR(255) NOT NULL,
status ENUM('RUNNING', 'COMPLETED', 'FAILED', 'COMPENSATING', 'COMPENSATED', 'COMPENSATION_FAILED')
NOT NULL DEFAULT 'RUNNING',
payload JSON NOT NULL,
created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
ON UPDATE CURRENT_TIMESTAMP(6),
UNIQUE KEY uq_saga_correlation (saga_type, correlation_id)
) ENGINE=InnoDB;
CREATE TABLE saga_steps (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
saga_id CHAR(36) NOT NULL,
step_order INT NOT NULL,
step_name VARCHAR(128) NOT NULL,
status ENUM('PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'COMPENSATING', 'COMPENSATED', 'COMPENSATION_FAILED')
NOT NULL DEFAULT 'PENDING',
error TEXT NULL,
created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
ON UPDATE CURRENT_TIMESTAMP(6),
UNIQUE KEY uq_saga_step (saga_id, step_order),
FOREIGN KEY (saga_id) REFERENCES saga_instances(id)
) ENGINE=InnoDB;package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"time"
_ "github.com/go-sql-driver/mysql"
)
type OrderItem struct {
SKU string
Qty int
PriceCents int64
}
type OrderCreatedEvent struct {
OrderID int64 `json:"order_id"`
CustomerID int64 `json:"customer_id"`
Items []OrderItem `json:"items"`
TotalCents int64 `json:"total_cents"`
CreatedAt time.Time `json:"created_at"`
}
func CreateOrderWithOutbox(
ctx context.Context,
db *sql.DB,
customerID int64,
items []OrderItem,
) (int64, error) {
tx, err := db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
})
if err != nil {
return 0, err
}
// После успешного Commit Rollback фактически уже ничего не откатит.
// Это стандартный удобный паттерн для database/sql.
defer func() {
_ = tx.Rollback()
}()
var total int64
for _, item := range items {
total += int64(item.Qty) * item.PriceCents
}
res, err := tx.ExecContext(ctx, `
INSERT INTO orders (customer_id, status, total_cents)
VALUES (?, 'CREATED', ?)
`, customerID, total)
if err != nil {
return 0, err
}
orderID, err := res.LastInsertId()
if err != nil {
return 0, err
}
for _, item := range items {
_, err = tx.ExecContext(ctx, `
INSERT INTO order_items (order_id, sku, qty, price_cents)
VALUES (?, ?, ?, ?)
`, orderID, item.SKU, item.Qty, item.PriceCents)
if err != nil {
return 0, err
}
}
event := OrderCreatedEvent{
OrderID: orderID,
CustomerID: customerID,
Items: items,
TotalCents: total,
CreatedAt: time.Now().UTC(),
}
payload, err := json.Marshal(event)
if err != nil {
return 0, err
}
idempotencyKey := fmt.Sprintf("order:%d:created", orderID)
_, err = tx.ExecContext(ctx, `
INSERT INTO outbox_events (
aggregate_type,
aggregate_id,
event_type,
payload,
idempotency_key
)
VALUES (?, ?, ?, CAST(? AS JSON), ?)
`, "order", orderID, "OrderCreated", string(payload), idempotencyKey)
if err != nil {
return 0, err
}
if err := tx.Commit(); err != nil {
return 0, err
}
return orderID, nil
}Ключевой момент: если приложение упадёт до commit — нет ни заказа, ни события. Если упадёт после commit — заказ и событие уже сохранены, а outbox-воркер позже его опубликует.
В MySQL 8+ удобно брать пачку событий через FOR UPDATE SKIP LOCKED. SELECT ... FOR UPDATE ставит эксклюзивные блокировки на выбранные строки, а SKIP LOCKED позволяет нескольким воркерам не ждать друг друга и пропускать уже заблокированные строки. (MySQL)
type OutboxEvent struct {
ID int64
AggregateType string
AggregateID int64
EventType string
Payload []byte
IdempotencyKey string
}
type EventPublisher interface {
Publish(ctx context.Context, event OutboxEvent) error
}
type LogPublisher struct{}
func (p LogPublisher) Publish(ctx context.Context, event OutboxEvent) error {
log.Printf(
"publish event id=%d type=%s aggregate=%s:%d payload=%s",
event.ID,
event.EventType,
event.AggregateType,
event.AggregateID,
string(event.Payload),
)
return nil
}
func ProcessOutboxBatch(
ctx context.Context,
db *sql.DB,
publisher EventPublisher,
workerID string,
limit int,
) error {
events, err := lockOutboxEvents(ctx, db, workerID, limit)
if err != nil {
return err
}
for _, event := range events {
err := publisher.Publish(ctx, event)
if err != nil {
_ = markOutboxFailed(ctx, db, event.ID, err)
continue
}
if err := markOutboxPublished(ctx, db, event.ID); err != nil {
return err
}
}
return nil
}
func lockOutboxEvents(
ctx context.Context,
db *sql.DB,
workerID string,
limit int,
) ([]OutboxEvent, error) {
tx, err := db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
})
if err != nil {
return nil, err
}
defer func() {
_ = tx.Rollback()
}()
rows, err := tx.QueryContext(ctx, `
SELECT
id,
aggregate_type,
aggregate_id,
event_type,
CAST(payload AS CHAR),
idempotency_key
FROM outbox_events
WHERE status IN ('PENDING', 'FAILED')
AND attempts < 10
AND (locked_until IS NULL OR locked_until < NOW(6))
ORDER BY id
LIMIT ?
FOR UPDATE SKIP LOCKED
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var events []OutboxEvent
var ids []int64
for rows.Next() {
var e OutboxEvent
var payload string
if err := rows.Scan(
&e.ID,
&e.AggregateType,
&e.AggregateID,
&e.EventType,
&payload,
&e.IdempotencyKey,
); err != nil {
return nil, err
}
e.Payload = []byte(payload)
events = append(events, e)
ids = append(ids, e.ID)
}
if err := rows.Err(); err != nil {
return nil, err
}
if len(ids) == 0 {
if err := tx.Commit(); err != nil {
return nil, err
}
return nil, nil
}
inSQL, args := buildInClause(ids)
args = append([]any{workerID}, args...)
_, err = tx.ExecContext(ctx, fmt.Sprintf(`
UPDATE outbox_events
SET
status = 'PROCESSING',
locked_by = ?,
locked_until = DATE_ADD(NOW(6), INTERVAL 2 MINUTE)
WHERE id IN (%s)
`, inSQL), args...)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return events, nil
}
func markOutboxPublished(ctx context.Context, db *sql.DB, eventID int64) error {
_, err := db.ExecContext(ctx, `
UPDATE outbox_events
SET
status = 'PUBLISHED',
published_at = NOW(6),
locked_by = NULL,
locked_until = NULL,
last_error = NULL
WHERE id = ?
`, eventID)
return err
}
func markOutboxFailed(ctx context.Context, db *sql.DB, eventID int64, publishErr error) error {
_, err := db.ExecContext(ctx, `
UPDATE outbox_events
SET
status = CASE
WHEN attempts + 1 >= 10 THEN 'FAILED'
ELSE 'PENDING'
END,
attempts = attempts + 1,
locked_by = NULL,
locked_until = NULL,
last_error = ?
WHERE id = ?
`, publishErr.Error(), eventID)
return err
}
func buildInClause(ids []int64) (string, []any) {
placeholders := ""
args := make([]any, 0, len(ids))
for i, id := range ids {
if i > 0 {
placeholders += ","
}
placeholders += "?"
args = append(args, id)
}
return placeholders, args
}Запуск воркера:
func RunOutboxWorker(ctx context.Context, db *sql.DB, publisher EventPublisher, workerID string) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := ProcessOutboxBatch(ctx, db, publisher, workerID, 100); err != nil {
log.Printf("outbox worker error: %v", err)
}
}
}
}Важно: outbox обычно даёт at-least-once delivery. То есть событие может быть опубликовано повторно, например если публикация прошла, а приложение упало до markOutboxPublished. Поэтому потребители должны быть идемпотентными: хранить idempotency_key / event_id и не применять одно событие дважды.
Пример саги:
- Зарезервировать товар.
- Списать оплату.
- Создать доставку.
Если доставка упала:
- Отменить оплату.
- Освободить резерв товара.
Это не “rollback одной большой транзакции”. Это набор локальных транзакций + компенсации.
type SagaPayload struct {
OrderID int64 `json:"order_id"`
CustomerID int64 `json:"customer_id"`
Items []OrderItem `json:"items"`
TotalCents int64 `json:"total_cents"`
}
type SagaStep struct {
Name string
Action func(ctx context.Context, db *sql.DB, sagaID string, payload SagaPayload) error
Compensate func(ctx context.Context, db *sql.DB, sagaID string, payload SagaPayload) error
}
func StartOrderSaga(
ctx context.Context,
db *sql.DB,
sagaID string,
payload SagaPayload,
) error {
payloadJSON, err := json.Marshal(payload)
if err != nil {
return err
}
_, err = db.ExecContext(ctx, `
INSERT INTO saga_instances (
id,
saga_type,
correlation_id,
status,
payload
)
VALUES (?, 'CreateOrderSaga', ?, 'RUNNING', CAST(? AS JSON))
ON DUPLICATE KEY UPDATE id = id
`, sagaID, fmt.Sprintf("order:%d", payload.OrderID), string(payloadJSON))
return err
}
func RunOrderSaga(
ctx context.Context,
db *sql.DB,
sagaID string,
payload SagaPayload,
) error {
steps := []SagaStep{
{
Name: "ReserveStock",
Action: reserveStock,
Compensate: releaseStock,
},
{
Name: "ChargePayment",
Action: chargePayment,
Compensate: refundPayment,
},
{
Name: "CreateShipment",
Action: createShipment,
Compensate: cancelShipment,
},
}
var completed []int
for i, step := range steps {
stepOrder := i + 1
if err := markSagaStep(ctx, db, sagaID, stepOrder, step.Name, "RUNNING", ""); err != nil {
return err
}
if err := step.Action(ctx, db, sagaID, payload); err != nil {
_ = markSagaStep(ctx, db, sagaID, stepOrder, step.Name, "FAILED", err.Error())
_ = markSagaStatus(ctx, db, sagaID, "COMPENSATING")
compErr := compensateCompletedSteps(ctx, db, sagaID, payload, steps, completed)
if compErr != nil {
_ = markSagaStatus(ctx, db, sagaID, "COMPENSATION_FAILED")
return fmt.Errorf("saga failed: %w; compensation failed: %v", err, compErr)
}
_ = markSagaStatus(ctx, db, sagaID, "COMPENSATED")
return fmt.Errorf("saga compensated after failure: %w", err)
}
if err := markSagaStep(ctx, db, sagaID, stepOrder, step.Name, "COMPLETED", ""); err != nil {
return err
}
completed = append(completed, i)
}
return markSagaStatus(ctx, db, sagaID, "COMPLETED")
}
func compensateCompletedSteps(
ctx context.Context,
db *sql.DB,
sagaID string,
payload SagaPayload,
steps []SagaStep,
completed []int,
) error {
for i := len(completed) - 1; i >= 0; i-- {
stepIndex := completed[i]
step := steps[stepIndex]
stepOrder := stepIndex + 1
if err := markSagaStep(ctx, db, sagaID, stepOrder, step.Name, "COMPENSATING", ""); err != nil {
return err
}
if err := step.Compensate(ctx, db, sagaID, payload); err != nil {
_ = markSagaStep(ctx, db, sagaID, stepOrder, step.Name, "COMPENSATION_FAILED", err.Error())
return err
}
if err := markSagaStep(ctx, db, sagaID, stepOrder, step.Name, "COMPENSATED", ""); err != nil {
return err
}
}
return nil
}
func markSagaStep(
ctx context.Context,
db *sql.DB,
sagaID string,
stepOrder int,
stepName string,
status string,
errText string,
) error {
_, err := db.ExecContext(ctx, `
INSERT INTO saga_steps (
saga_id,
step_order,
step_name,
status,
error
)
VALUES (?, ?, ?, ?, NULLIF(?, ''))
ON DUPLICATE KEY UPDATE
status = VALUES(status),
error = VALUES(error),
updated_at = NOW(6)
`, sagaID, stepOrder, stepName, status, errText)
return err
}
func markSagaStatus(ctx context.Context, db *sql.DB, sagaID string, status string) error {
_, err := db.ExecContext(ctx, `
UPDATE saga_instances
SET status = ?
WHERE id = ?
`, status, sagaID)
return err
}Каждый шаг делает свою локальную транзакцию. И если шаг меняет данные, он тоже может писать событие в outbox.
Тут нужен SELECT ... FOR UPDATE, чтобы два заказа одновременно не зарезервировали один и тот же остаток. В MySQL FOR UPDATE блокирует выбранные строки до конца транзакции. (MySQL)
func reserveStock(
ctx context.Context,
db *sql.DB,
sagaID string,
payload SagaPayload,
) error {
tx, err := db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
})
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
for _, item := range payload.Items {
var available int
err := tx.QueryRowContext(ctx, `
SELECT available
FROM inventory
WHERE sku = ?
FOR UPDATE
`, item.SKU).Scan(&available)
if err != nil {
return err
}
if available < item.Qty {
return fmt.Errorf("not enough stock for sku=%s", item.SKU)
}
_, err = tx.ExecContext(ctx, `
UPDATE inventory
SET
available = available - ?,
reserved = reserved + ?
WHERE sku = ?
`, item.Qty, item.Qty, item.SKU)
if err != nil {
return err
}
}
eventPayload, _ := json.Marshal(map[string]any{
"saga_id": sagaID,
"order_id": payload.OrderID,
"items": payload.Items,
})
_, err = tx.ExecContext(ctx, `
INSERT INTO outbox_events (
aggregate_type,
aggregate_id,
event_type,
payload,
idempotency_key
)
VALUES (?, ?, ?, CAST(? AS JSON), ?)
`, "order", payload.OrderID, "StockReserved", string(eventPayload),
fmt.Sprintf("saga:%s:stock-reserved", sagaID))
if err != nil {
return err
}
return tx.Commit()
}func releaseStock(
ctx context.Context,
db *sql.DB,
sagaID string,
payload SagaPayload,
) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
for _, item := range payload.Items {
_, err = tx.ExecContext(ctx, `
UPDATE inventory
SET
available = available + ?,
reserved = reserved - ?
WHERE sku = ?
AND reserved >= ?
`, item.Qty, item.Qty, item.SKU, item.Qty)
if err != nil {
return err
}
}
eventPayload, _ := json.Marshal(map[string]any{
"saga_id": sagaID,
"order_id": payload.OrderID,
"items": payload.Items,
})
_, err = tx.ExecContext(ctx, `
INSERT INTO outbox_events (
aggregate_type,
aggregate_id,
event_type,
payload,
idempotency_key
)
VALUES (?, ?, ?, CAST(? AS JSON), ?)
ON DUPLICATE KEY UPDATE id = id
`, "order", payload.OrderID, "StockReleased", string(eventPayload),
fmt.Sprintf("saga:%s:stock-released", sagaID))
if err != nil {
return err
}
return tx.Commit()
}func chargePayment(
ctx context.Context,
db *sql.DB,
sagaID string,
payload SagaPayload,
) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
_, err = tx.ExecContext(ctx, `
INSERT INTO payments (
order_id,
amount_cents,
status
)
VALUES (?, ?, 'CHARGED')
ON DUPLICATE KEY UPDATE id = id
`, payload.OrderID, payload.TotalCents)
if err != nil {
return err
}
eventPayload, _ := json.Marshal(map[string]any{
"saga_id": sagaID,
"order_id": payload.OrderID,
"amount_cents": payload.TotalCents,
})
_, err = tx.ExecContext(ctx, `
INSERT INTO outbox_events (
aggregate_type,
aggregate_id,
event_type,
payload,
idempotency_key
)
VALUES (?, ?, ?, CAST(? AS JSON), ?)
ON DUPLICATE KEY UPDATE id = id
`, "order", payload.OrderID, "PaymentCharged", string(eventPayload),
fmt.Sprintf("saga:%s:payment-charged", sagaID))
if err != nil {
return err
}
return tx.Commit()
}func refundPayment(
ctx context.Context,
db *sql.DB,
sagaID string,
payload SagaPayload,
) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
_, err = tx.ExecContext(ctx, `
UPDATE payments
SET status = 'REFUNDED'
WHERE order_id = ?
AND status = 'CHARGED'
`, payload.OrderID)
if err != nil {
return err
}
eventPayload, _ := json.Marshal(map[string]any{
"saga_id": sagaID,
"order_id": payload.OrderID,
})
_, err = tx.ExecContext(ctx, `
INSERT INTO outbox_events (
aggregate_type,
aggregate_id,
event_type,
payload,
idempotency_key
)
VALUES (?, ?, ?, CAST(? AS JSON), ?)
ON DUPLICATE KEY UPDATE id = id
`, "order", payload.OrderID, "PaymentRefunded", string(eventPayload),
fmt.Sprintf("saga:%s:payment-refunded", sagaID))
if err != nil {
return err
}
return tx.Commit()
}func createShipment(
ctx context.Context,
db *sql.DB,
sagaID string,
payload SagaPayload,
) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
_, err = tx.ExecContext(ctx, `
INSERT INTO shipments (
order_id,
status
)
VALUES (?, 'CREATED')
ON DUPLICATE KEY UPDATE id = id
`, payload.OrderID)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, `
UPDATE orders
SET status = 'CONFIRMED'
WHERE id = ?
`, payload.OrderID)
if err != nil {
return err
}
eventPayload, _ := json.Marshal(map[string]any{
"saga_id": sagaID,
"order_id": payload.OrderID,
})
_, err = tx.ExecContext(ctx, `
INSERT INTO outbox_events (
aggregate_type,
aggregate_id,
event_type,
payload,
idempotency_key
)
VALUES (?, ?, ?, CAST(? AS JSON), ?)
ON DUPLICATE KEY UPDATE id = id
`, "order", payload.OrderID, "ShipmentCreated", string(eventPayload),
fmt.Sprintf("saga:%s:shipment-created", sagaID))
if err != nil {
return err
}
return tx.Commit()
}func cancelShipment(
ctx context.Context,
db *sql.DB,
sagaID string,
payload SagaPayload,
) error {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
_, err = tx.ExecContext(ctx, `
UPDATE shipments
SET status = 'CANCELLED'
WHERE order_id = ?
AND status = 'CREATED'
`, payload.OrderID)
if err != nil {
return err
}
eventPayload, _ := json.Marshal(map[string]any{
"saga_id": sagaID,
"order_id": payload.OrderID,
})
_, err = tx.ExecContext(ctx, `
INSERT INTO outbox_events (
aggregate_type,
aggregate_id,
event_type,
payload,
idempotency_key
)
VALUES (?, ?, ?, CAST(? AS JSON), ?)
ON DUPLICATE KEY UPDATE id = id
`, "order", payload.OrderID, "ShipmentCancelled", string(eventPayload),
fmt.Sprintf("saga:%s:shipment-cancelled", sagaID))
if err != nil {
return err
}
return tx.Commit()
}func main() {
ctx := context.Background()
dsn := "app:secret@tcp(127.0.0.1:3306)/appdb?parseTime=true&multiStatements=false"
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Fatal(err)
}
defer db.Close()
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(20)
db.SetConnMaxLifetime(3 * time.Minute)
if err := db.PingContext(ctx); err != nil {
log.Fatal(err)
}
items := []OrderItem{
{
SKU: "iphone-15",
Qty: 1,
PriceCents: 100_000,
},
}
orderID, err := CreateOrderWithOutbox(ctx, db, 123, items)
if err != nil {
log.Fatal(err)
}
payload := SagaPayload{
OrderID: orderID,
CustomerID: 123,
Items: items,
TotalCents: 100_000,
}
sagaID := fmt.Sprintf("saga-%d", time.Now().UnixNano())
if err := StartOrderSaga(ctx, db, sagaID, payload); err != nil {
log.Fatal(err)
}
if err := RunOrderSaga(ctx, db, sagaID, payload); err != nil {
log.Printf("saga result: %v", err)
}
go RunOutboxWorker(ctx, db, LogPublisher{}, "worker-1")
select {}
}Для MySQL 5.7 FOR UPDATE есть, но SKIP LOCKED лучше не использовать. Документация 5.7 описывает FOR UPDATE как блокирующий выбранные строки до конца транзакции, но без удобного SKIP LOCKED для очередей. (MySQL)
Тогда outbox можно “захватывать” через UPDATE ... LIMIT:
UPDATE outbox_events
SET
status = 'PROCESSING',
locked_by = ?,
locked_until = DATE_ADD(NOW(6), INTERVAL 2 MINUTE)
WHERE status IN ('PENDING', 'FAILED')
AND attempts < 10
AND (locked_until IS NULL OR locked_until < NOW(6))
ORDER BY id
LIMIT 100;Потом воркер выбирает только свои события:
SELECT
id,
aggregate_type,
aggregate_id,
event_type,
CAST(payload AS CHAR),
idempotency_key
FROM outbox_events
WHERE status = 'PROCESSING'
AND locked_by = ?
ORDER BY id;Это чуть менее красиво, чем SKIP LOCKED, но для MySQL 5.7 вполне нормальный практический вариант. Главное — иметь locked_until, чтобы другой воркер смог подобрать события, если текущий умер.