Skip to content

Instantly share code, notes, and snippets.

@5kr1p7
Last active June 26, 2026 13:15
Show Gist options
  • Select an option

  • Save 5kr1p7/a3783ddd6a722745abb8fe0bf05af902 to your computer and use it in GitHub Desktop.

Select an option

Save 5kr1p7/a3783ddd6a722745abb8fe0bf05af902 to your computer and use it in GitHub Desktop.

Transactional Outbox и Saga Compensation на Python + MySQL

Ниже — пример на Python + MySQL. Беру mysql-connector-python, без ORM, чтобы было видно, где именно начинается и коммитится транзакция. В mysql-connector-python параметры передаются через %s, а commit()/rollback() явно фиксируют или откатывают транзакцию; autocommit по умолчанию выключен, но в примере он всё равно задан явно — так спокойнее и читабельнее. (dev.mysql.com)

Установка:

pip install mysql-connector-python

Меню

  1. SQL-схема
  2. Общие утилиты для подключения и транзакций
  3. Transactional Outbox: создание заказа + событие в одной транзакции
  4. Outbox worker для MySQL 8+
  5. Saga Compensation
  6. Шаги Saga
  7. Пример запуска
  8. Вариант outbox worker для MySQL 5.7
  9. Дополнительные слои надёжности
  10. Пошаговый разбор сценариев
  11. Неуспешные сценарии
  12. Общая карта состояний
  13. Самая важная логика
  14. Итоговые сценарии коротко

1. SQL-схема

Схема включает бизнес-таблицы, outbox, таблицы Saga и дополнительные таблицы для идемпотентности, inbox, dead letter и командной очереди. В реальном проекте это часто лежит в разных сервисах и базах, но в одном примере удобнее видеть всё рядом.

CREATE TABLE orders (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    customer_id BIGINT NOT NULL,
    status VARCHAR(32) NOT NULL,
    total_cents BIGINT NOT NULL DEFAULT 0,
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
        ON UPDATE CURRENT_TIMESTAMP(6)
) ENGINE=InnoDB;

CREATE TABLE order_items (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    order_id BIGINT NOT NULL,
    sku VARCHAR(64) NOT NULL,
    qty INT NOT NULL,
    price_cents BIGINT NOT NULL,
    FOREIGN KEY (order_id) REFERENCES orders(id)
) ENGINE=InnoDB;

CREATE TABLE outbox_events (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,

    aggregate_type VARCHAR(64) NOT NULL,
    aggregate_id BIGINT NOT NULL,
    event_type VARCHAR(128) NOT NULL,

    payload JSON NOT NULL,

    status ENUM('PENDING', 'PROCESSING', 'PUBLISHED', 'FAILED')
        NOT NULL DEFAULT 'PENDING',

    attempts INT NOT NULL DEFAULT 0,
    locked_by VARCHAR(128) NULL,
    locked_until DATETIME(6) NULL,

    last_error TEXT NULL,
    published_at DATETIME(6) NULL,

    -- Нужен для идемпотентности.
    -- Например: "order:123:created" или "saga:abc:payment-refunded".
    idempotency_key VARCHAR(255) NOT NULL,

    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
        ON UPDATE CURRENT_TIMESTAMP(6),

    UNIQUE KEY uq_outbox_idempotency_key (idempotency_key),
    KEY idx_outbox_fetch (status, created_at),
    KEY idx_outbox_lock (status, locked_until)
) ENGINE=InnoDB;

CREATE TABLE inventory (
    sku VARCHAR(64) PRIMARY KEY,
    available INT NOT NULL,
    reserved INT NOT NULL DEFAULT 0
) ENGINE=InnoDB;

CREATE TABLE inventory_reservations (
    saga_id CHAR(36) NOT NULL,
    sku VARCHAR(64) NOT NULL,
    qty INT NOT NULL,
    status ENUM('RESERVED', 'RELEASED') NOT NULL DEFAULT 'RESERVED',
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
        ON UPDATE CURRENT_TIMESTAMP(6),

    PRIMARY KEY (saga_id, sku),
    FOREIGN KEY (sku) REFERENCES inventory(sku)
) ENGINE=InnoDB;

CREATE TABLE payments (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    order_id BIGINT NOT NULL,
    amount_cents BIGINT NOT NULL,
    status VARCHAR(32) NOT NULL,
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    UNIQUE KEY uq_payment_order (order_id)
) ENGINE=InnoDB;

CREATE TABLE shipments (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    order_id BIGINT NOT NULL,
    status VARCHAR(32) NOT NULL,
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    UNIQUE KEY uq_shipment_order (order_id)
) ENGINE=InnoDB;

CREATE TABLE saga_instances (
    id CHAR(36) PRIMARY KEY,
    saga_type VARCHAR(128) NOT NULL,
    correlation_id VARCHAR(255) NOT NULL,
    status ENUM(
        'RUNNING',
        'COMPLETED',
        'FAILED',
        'COMPENSATING',
        'COMPENSATED',
        'COMPENSATION_FAILED'
    ) NOT NULL DEFAULT 'RUNNING',
    payload JSON NOT NULL,
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
        ON UPDATE CURRENT_TIMESTAMP(6),

    UNIQUE KEY uq_saga_correlation (saga_type, correlation_id)
) ENGINE=InnoDB;

CREATE TABLE saga_steps (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    saga_id CHAR(36) NOT NULL,
    step_order INT NOT NULL,
    step_name VARCHAR(128) NOT NULL,
    status ENUM(
        'PENDING',
        'RUNNING',
        'COMPLETED',
        'FAILED',
        'COMPENSATING',
        'COMPENSATED',
        'COMPENSATION_FAILED'
    ) NOT NULL DEFAULT 'PENDING',
    error TEXT NULL,
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
        ON UPDATE CURRENT_TIMESTAMP(6),

    UNIQUE KEY uq_saga_step (saga_id, step_order),
    FOREIGN KEY (saga_id) REFERENCES saga_instances(id)
) ENGINE=InnoDB;

CREATE TABLE idempotency_keys (
    idempotency_key VARCHAR(255) PRIMARY KEY,
    request_hash CHAR(64) NOT NULL,
    status ENUM('PROCESSING', 'COMPLETED', 'FAILED') NOT NULL,
    response_body JSON NULL,
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
        ON UPDATE CURRENT_TIMESTAMP(6)
) ENGINE=InnoDB;

CREATE TABLE inbox_events (
    event_id VARCHAR(255) PRIMARY KEY,
    event_type VARCHAR(128) NOT NULL,
    payload JSON NOT NULL,
    processed_at DATETIME(6) NULL,
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
) ENGINE=InnoDB;

CREATE TABLE dead_letter_events (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    source_event_id BIGINT NULL,
    event_type VARCHAR(128) NOT NULL,
    payload JSON NOT NULL,
    error TEXT NOT NULL,
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
) ENGINE=InnoDB;

CREATE TABLE command_queue (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    command_type VARCHAR(128) NOT NULL,
    payload JSON NOT NULL,
    status ENUM('PENDING', 'PROCESSING', 'DONE', 'FAILED')
        NOT NULL DEFAULT 'PENDING',
    attempts INT NOT NULL DEFAULT 0,
    locked_by VARCHAR(128) NULL,
    locked_until DATETIME(6) NULL,
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
        ON UPDATE CURRENT_TIMESTAMP(6)
) ENGINE=InnoDB;

2. Общие утилиты для подключения и транзакций

from __future__ import annotations

import json
import logging
import os
import time
import uuid

from contextlib import contextmanager
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Any, Callable, Protocol

import mysql.connector


logging.basicConfig(level=logging.INFO)


def make_connection():
    """
    Создаём новое соединение с MySQL.

    В реальном приложении лучше использовать pool соединений,
    но для примера отдельная функция проще и нагляднее.

    autocommit=False важен:
    мы хотим сами решать, когда делать commit/rollback.
    """
    return mysql.connector.connect(
        host=os.getenv("MYSQL_HOST", "127.0.0.1"),
        port=int(os.getenv("MYSQL_PORT", "3306")),
        user=os.getenv("MYSQL_USER", "app"),
        password=os.getenv("MYSQL_PASSWORD", "secret"),
        database=os.getenv("MYSQL_DATABASE", "appdb"),
        autocommit=False,
    )


@contextmanager
def transaction(conn, isolation_level: str | None = None):
    """
    Маленький context manager для транзакций.

    Использование:

        with transaction(conn):
            cursor.execute(...)
            cursor.execute(...)

    Если внутри блока не было исключения — commit.
    Если было исключение — rollback.

    isolation_level можно передать, например: "READ COMMITTED".
    """
    try:
        if isolation_level:
            conn.start_transaction(isolation_level=isolation_level)
        else:
            conn.start_transaction()

        yield

        conn.commit()

    except Exception:
        conn.rollback()
        raise


def utc_now_iso() -> str:
    """
    Возвращаем время в ISO-формате.

    Для JSON payload удобнее хранить строку,
    а не datetime-объект Python.
    """
    return datetime.now(timezone.utc).isoformat()


def json_dumps(value: Any) -> str:
    """
    Сериализуем payload в JSON.

    ensure_ascii=False оставляет кириллицу читаемой:
    {"message": "Привет"} вместо escaped unicode.
    """
    return json.dumps(value, ensure_ascii=False, separators=(",", ":"))


@dataclass
class OrderItem:
    sku: str
    qty: int
    price_cents: int


@dataclass
class SagaPayload:
    order_id: int
    customer_id: int
    items: list[OrderItem]
    total_cents: int

3. Transactional Outbox: создание заказа + событие в одной транзакции

Главная идея: заказ и outbox-событие пишутся в одной транзакции. Если транзакция откатилась — нет ни заказа, ни события. Если закоммитилась — есть и заказ, и событие, которое потом заберёт outbox worker.

def insert_outbox_event_tx(
    cursor,
    *,
    aggregate_type: str,
    aggregate_id: int,
    event_type: str,
    payload: dict[str, Any],
    idempotency_key: str,
    ignore_duplicate: bool = False,
):
    """
    Вставляет событие в outbox_events.

    Важно:
    функция НЕ делает commit сама.
    Она должна вызываться внутри уже открытой транзакции.

    Почему так?
    Потому что outbox-событие должно коммититься вместе
    с бизнес-изменением: созданием заказа, списанием оплаты и т.д.
    """

    sql = """
        INSERT INTO outbox_events (
            aggregate_type,
            aggregate_id,
            event_type,
            payload,
            idempotency_key
        )
        VALUES (%s, %s, %s, %s, %s)
    """

    # Для компенсационных операций и повторов удобно сделать вставку идемпотентной:
    # если событие уже есть, второй раз оно не создаётся.
    if ignore_duplicate:
        sql += """
            ON DUPLICATE KEY UPDATE id = id
        """

    cursor.execute(
        sql,
        (
            aggregate_type,
            aggregate_id,
            event_type,
            json_dumps(payload),
            idempotency_key,
        ),
    )


def create_order_tx(
    cursor,
    *,
    customer_id: int,
    items: list[OrderItem],
) -> int:
    """
    Создаём заказ и пишем OrderCreated в outbox внутри уже открытой транзакции.

    Функция принимает cursor и НЕ делает commit сама.
    Это удобно для обычного create_order_with_outbox() и для
    process_idempotent_request(), где idempotency key должен коммититься
    вместе с заказом и outbox-событием.

    Операции:
    - orders INSERT
    - order_items INSERT
    - outbox_events INSERT
    """

    total_cents = sum(item.qty * item.price_cents for item in items)

    # 1. Создаём заказ.
    cursor.execute(
        """
        INSERT INTO orders (customer_id, status, total_cents)
        VALUES (%s, 'CREATED', %s)
        """,
        (customer_id, total_cents),
    )

    order_id = cursor.lastrowid

    # 2. Создаём позиции заказа.
    for item in items:
        cursor.execute(
            """
            INSERT INTO order_items (order_id, sku, qty, price_cents)
            VALUES (%s, %s, %s, %s)
            """,
            (order_id, item.sku, item.qty, item.price_cents),
        )

    # 3. Готовим payload события.
    payload = {
        "order_id": order_id,
        "customer_id": customer_id,
        "items": [asdict(item) for item in items],
        "total_cents": total_cents,
        "created_at": utc_now_iso(),
    }

    # 4. Пишем событие в outbox.
    # Оно будет опубликовано отдельным воркером позже.
    insert_outbox_event_tx(
        cursor,
        aggregate_type="order",
        aggregate_id=order_id,
        event_type="OrderCreated",
        payload=payload,
        idempotency_key=f"order:{order_id}:created",
    )

    return order_id


def create_order_with_outbox(
    conn,
    *,
    customer_id: int,
    items: list[OrderItem],
) -> int:
    """
    Самостоятельная операция создания заказа.

    Открывает транзакцию, вызывает create_order_tx() и коммитит результат.
    """

    with transaction(conn, isolation_level="READ COMMITTED"):
        cursor = conn.cursor()
        try:
            return create_order_tx(
                cursor,
                customer_id=customer_id,
                items=items,
            )
        finally:
            cursor.close()

4. Outbox worker для MySQL 8+

В MySQL 8+ удобно использовать FOR UPDATE SKIP LOCKED: один воркер блокирует выбранные строки, а другие воркеры их пропускают и берут следующие. Это прям отлично ложится на outbox-очередь. SKIP LOCKED не ждёт заблокированные строки, а исключает их из результата. (dev.mysql.com)

@dataclass
class OutboxEvent:
    id: int
    aggregate_type: str
    aggregate_id: int
    event_type: str
    payload: str
    idempotency_key: str


class EventPublisher(Protocol):
    """
    Протокол публикатора.

    В реальном проекте тут может быть Kafka, RabbitMQ, NATS,
    HTTP-вызов, gRPC и т.д.
    """

    def publish(self, event: OutboxEvent) -> None:
        ...


class LogPublisher:
    """
    Простейший publisher для примера.

    Он ничего реально не отправляет, а просто пишет в лог.
    Позже вместо него можно подставить KafkaPublisher.
    """

    def publish(self, event: OutboxEvent) -> None:
        logging.info(
            "publish event id=%s type=%s aggregate=%s:%s payload=%s",
            event.id,
            event.event_type,
            event.aggregate_type,
            event.aggregate_id,
            event.payload,
        )


def lock_outbox_events_mysql8(
    conn,
    *,
    worker_id: str,
    limit: int = 100,
    lock_seconds: int = 120,
) -> list[OutboxEvent]:
    """
    Забираем пачку событий из outbox.

    Алгоритм:
    1. Начинаем транзакцию.
    2. SELECT ... FOR UPDATE SKIP LOCKED выбирает незаблокированные события.
    3. Помечаем их как PROCESSING.
    4. Коммитим транзакцию.
    5. Возвращаем события вызывающему коду.

    Почему locked_until важен:
    если воркер умер после захвата событий, другой воркер сможет
    повторно забрать эти события после истечения locked_until.
    """

    lock_token = f"{worker_id}:{uuid.uuid4()}"

    with transaction(conn, isolation_level="READ COMMITTED"):
        # dictionary=True возвращает строки как dict:
        # row["id"], row["payload"], ...
        cursor = conn.cursor(dictionary=True)

        cursor.execute(
            """
            SELECT
                id,
                aggregate_type,
                aggregate_id,
                event_type,
                payload,
                idempotency_key
            FROM outbox_events
            WHERE status IN ('PENDING', 'FAILED')
              AND attempts < 10
              AND (locked_until IS NULL OR locked_until < NOW(6))
            ORDER BY id
            LIMIT %s
            FOR UPDATE SKIP LOCKED
            """,
            (limit,),
        )

        rows = cursor.fetchall()

        if not rows:
            cursor.close()
            return []

        ids = [row["id"] for row in rows]

        # Для IN (%s, %s, %s) нужно динамически создать placeholders.
        placeholders = ",".join(["%s"] * len(ids))

        cursor.execute(
            f"""
            UPDATE outbox_events
            SET
                status = 'PROCESSING',
                locked_by = %s,
                locked_until = DATE_ADD(NOW(6), INTERVAL %s SECOND)
            WHERE id IN ({placeholders})
            """,
            [lock_token, lock_seconds, *ids],
        )

        cursor.close()

    # Транзакция уже закоммичена, блокировки отпущены.
    # Но события помечены как PROCESSING и имеют locked_until.
    return [
        OutboxEvent(
            id=row["id"],
            aggregate_type=row["aggregate_type"],
            aggregate_id=row["aggregate_id"],
            event_type=row["event_type"],
            payload=row["payload"],
            idempotency_key=row["idempotency_key"],
        )
        for row in rows
    ]


def mark_outbox_published(conn, event_id: int) -> None:
    """
    Помечаем событие опубликованным.

    Важно:
    публикация во внешний брокер и UPDATE outbox_events
    не являются одной атомарной транзакцией.

    Поэтому возможен дубль:
    - событие отправилось в брокер;
    - приложение упало до mark_outbox_published;
    - после рестарта событие отправится ещё раз.

    Поэтому consumer должен быть идемпотентным.
    """

    with transaction(conn):
        cursor = conn.cursor()
        cursor.execute(
            """
            UPDATE outbox_events
            SET
                status = 'PUBLISHED',
                published_at = NOW(6),
                locked_by = NULL,
                locked_until = NULL,
                last_error = NULL
            WHERE id = %s
            """,
            (event_id,),
        )
        cursor.close()


def mark_outbox_failed(conn, event_id: int, exc: Exception) -> None:
    """
    Если публикация не удалась — увеличиваем attempts.

    До 10 попыток возвращаем событие в PENDING,
    чтобы воркер попробовал позже.

    После 10 попыток оставляем FAILED.
    В реальном проекте можно добавить dead-letter queue.
    """

    with transaction(conn):
        cursor = conn.cursor()
        cursor.execute(
            """
            UPDATE outbox_events
            SET
                status = CASE
                    WHEN attempts + 1 >= 10 THEN 'FAILED'
                    ELSE 'PENDING'
                END,
                attempts = attempts + 1,
                locked_by = NULL,
                locked_until = NULL,
                last_error = %s
            WHERE id = %s
            """,
            (str(exc), event_id),
        )
        cursor.close()


def process_outbox_batch(
    *,
    publisher: EventPublisher,
    worker_id: str,
    limit: int = 100,
) -> None:
    """
    Один проход outbox worker.

    1. Берём пачку событий.
    2. Публикуем каждое событие.
    3. Если успешно — PUBLISHED.
    4. Если ошибка — PENDING/FAILED с увеличением attempts.
    """

    conn = make_connection()

    try:
        events = lock_outbox_events_mysql8(
            conn,
            worker_id=worker_id,
            limit=limit,
        )

        for event in events:
            try:
                publisher.publish(event)
            except Exception as exc:
                logging.exception("failed to publish event id=%s", event.id)
                mark_outbox_failed(conn, event.id, exc)
            else:
                mark_outbox_published(conn, event.id)

    finally:
        conn.close()


def run_outbox_worker(
    *,
    publisher: EventPublisher,
    worker_id: str,
    sleep_seconds: float = 1.0,
) -> None:
    """
    Бесконечный цикл воркера.

    В production обычно добавляют:
    - graceful shutdown;
    - метрики;
    - exponential backoff;
    - dead-letter queue;
    - алерты на FAILED events.
    """

    while True:
        try:
            process_outbox_batch(
                publisher=publisher,
                worker_id=worker_id,
                limit=100,
            )
        except Exception:
            logging.exception("outbox worker error")

        time.sleep(sleep_seconds)

Важный момент: outbox почти всегда даёт at-least-once delivery, то есть событие может быть доставлено больше одного раза. Поэтому на стороне потребителя стоит хранить event_id или idempotency_key и не применять одно событие повторно.


5. Saga Compensation

Пример саги:

  1. Зарезервировать товар.
  2. Списать оплату.
  3. Создать доставку.

Если доставка не создалась:

  1. Отменяем оплату.
  2. Освобождаем резерв товара.

Это не одна большая транзакция. Это последовательность локальных транзакций + компенсации.

@dataclass
class SagaStep:
    """
    Описание одного шага Saga.

    action — прямое действие.
    compensate — обратное действие.
    """

    name: str
    action: Callable[[Any, str, SagaPayload], None]
    compensate: Callable[[Any, str, SagaPayload], None]


def start_order_saga(conn, *, saga_id: str, payload: SagaPayload) -> None:
    """
    Создаём запись saga_instances.

    correlation_id нужен, чтобы не создать две одинаковые саги
    для одного и того же заказа.
    """

    payload_json = {
        "order_id": payload.order_id,
        "customer_id": payload.customer_id,
        "items": [asdict(item) for item in payload.items],
        "total_cents": payload.total_cents,
    }

    with transaction(conn):
        cursor = conn.cursor()
        cursor.execute(
            """
            INSERT INTO saga_instances (
                id,
                saga_type,
                correlation_id,
                status,
                payload
            )
            VALUES (%s, 'CreateOrderSaga', %s, 'RUNNING', %s)
            ON DUPLICATE KEY UPDATE id = id
            """,
            (
                saga_id,
                f"order:{payload.order_id}",
                json_dumps(payload_json),
            ),
        )
        cursor.close()


def mark_saga_status(conn, saga_id: str, status: str) -> None:
    """
    Обновляем общий статус саги.
    """

    with transaction(conn):
        cursor = conn.cursor()
        cursor.execute(
            """
            UPDATE saga_instances
            SET status = %s
            WHERE id = %s
            """,
            (status, saga_id),
        )
        cursor.close()


def mark_saga_step(
    conn,
    *,
    saga_id: str,
    step_order: int,
    step_name: str,
    status: str,
    error: str | None = None,
) -> None:
    """
    Фиксируем состояние конкретного шага.

    ON DUPLICATE KEY UPDATE делает операцию повторяемой:
    если шаг уже был создан, мы просто обновим его статус.
    """

    with transaction(conn):
        cursor = conn.cursor()
        cursor.execute(
            """
            INSERT INTO saga_steps (
                saga_id,
                step_order,
                step_name,
                status,
                error
            )
            VALUES (%s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE
                status = VALUES(status),
                error = VALUES(error),
                updated_at = NOW(6)
            """,
            (
                saga_id,
                step_order,
                step_name,
                status,
                error,
            ),
        )
        cursor.close()


def set_order_status(conn, order_id: int, status: str) -> None:
    """
    Вспомогательная функция для изменения статуса заказа.
    """

    with transaction(conn):
        cursor = conn.cursor()
        cursor.execute(
            """
            UPDATE orders
            SET status = %s
            WHERE id = %s
            """,
            (status, order_id),
        )
        cursor.close()


def run_order_saga(conn, *, saga_id: str, payload: SagaPayload) -> None:
    """
    Основной исполнитель Saga.

    Логика:
    - идём по шагам слева направо;
    - каждый успешный шаг добавляем в completed_steps;
    - если шаг упал — компенсируем completed_steps справа налево.
    """

    steps = [
        SagaStep(
            name="ReserveStock",
            action=reserve_stock,
            compensate=release_stock,
        ),
        SagaStep(
            name="ChargePayment",
            action=charge_payment,
            compensate=refund_payment,
        ),
        SagaStep(
            name="CreateShipment",
            action=create_shipment,
            compensate=cancel_shipment,
        ),
    ]

    completed_steps: list[int] = []

    for index, step in enumerate(steps):
        step_order = index + 1

        try:
            mark_saga_step(
                conn,
                saga_id=saga_id,
                step_order=step_order,
                step_name=step.name,
                status="RUNNING",
            )

            step.action(conn, saga_id, payload)

            mark_saga_step(
                conn,
                saga_id=saga_id,
                step_order=step_order,
                step_name=step.name,
                status="COMPLETED",
            )

            completed_steps.append(index)

        except Exception as exc:
            logging.exception("saga step failed: %s", step.name)

            mark_saga_step(
                conn,
                saga_id=saga_id,
                step_order=step_order,
                step_name=step.name,
                status="FAILED",
                error=str(exc),
            )

            mark_saga_status(conn, saga_id, "COMPENSATING")

            try:
                compensate_completed_steps(
                    conn,
                    saga_id=saga_id,
                    payload=payload,
                    steps=steps,
                    completed_steps=completed_steps,
                )

                set_order_status(conn, payload.order_id, "CANCELLED")
                mark_saga_status(conn, saga_id, "COMPENSATED")

            except Exception as compensation_exc:
                logging.exception("saga compensation failed")

                mark_saga_status(conn, saga_id, "COMPENSATION_FAILED")

                raise RuntimeError(
                    f"saga failed: {exc}; "
                    f"compensation failed: {compensation_exc}"
                ) from compensation_exc

            raise RuntimeError(f"saga compensated after failure: {exc}") from exc

    mark_saga_status(conn, saga_id, "COMPLETED")


def compensate_completed_steps(
    conn,
    *,
    saga_id: str,
    payload: SagaPayload,
    steps: list[SagaStep],
    completed_steps: list[int],
) -> None:
    """
    Компенсируем только те шаги, которые уже были успешно выполнены.

    Идём в обратном порядке:
    если было:
        ReserveStock -> ChargePayment -> CreateShipment

    то компенсация:
        CancelShipment -> RefundPayment -> ReleaseStock

    Но если CreateShipment упал и не был COMPLETED,
    то cancelShipment вызываться не будет.
    """

    for step_index in reversed(completed_steps):
        step = steps[step_index]
        step_order = step_index + 1

        mark_saga_step(
            conn,
            saga_id=saga_id,
            step_order=step_order,
            step_name=step.name,
            status="COMPENSATING",
        )

        try:
            step.compensate(conn, saga_id, payload)

        except Exception as exc:
            mark_saga_step(
                conn,
                saga_id=saga_id,
                step_order=step_order,
                step_name=step.name,
                status="COMPENSATION_FAILED",
                error=str(exc),
            )
            raise

        mark_saga_step(
            conn,
            saga_id=saga_id,
            step_order=step_order,
            step_name=step.name,
            status="COMPENSATED",
        )

6. Шаги Saga

ReserveStock

Здесь используем SELECT ... FOR UPDATE, чтобы заблокировать строку остатка товара на время транзакции. Это защищает от ситуации, когда два заказа одновременно читают один и тот же остаток и оба думают, что товар доступен. В InnoDB locking read через FOR UPDATE блокирует выбранные строки до конца транзакции. (dev.mysql.com)

def reserve_stock(conn, saga_id: str, payload: SagaPayload) -> None:
    """
    Резервируем товар.

    Это локальная транзакция:
    - блокируем строку inventory через FOR UPDATE;
    - проверяем доступный остаток;
    - уменьшаем available;
    - увеличиваем reserved;
    - пишем StockReserved в outbox.
    """

    with transaction(conn, isolation_level="READ COMMITTED"):
        cursor = conn.cursor(dictionary=True)

        # Блокируем товары в стабильном порядке, чтобы снизить риск deadlock.
        for item in sorted(payload.items, key=lambda x: x.sku):
            cursor.execute(
                """
                SELECT status
                FROM inventory_reservations
                WHERE saga_id = %s
                  AND sku = %s
                FOR UPDATE
                """,
                (saga_id, item.sku),
            )

            reservation = cursor.fetchone()

            if reservation:
                if reservation["status"] == "RESERVED":
                    # Повторный запуск шага: резерв уже создан, второй раз
                    # available/reserved не меняем.
                    continue

                raise RuntimeError(
                    f"stock reservation already released: "
                    f"saga_id={saga_id}, sku={item.sku}"
                )

            cursor.execute(
                """
                SELECT available
                FROM inventory
                WHERE sku = %s
                FOR UPDATE
                """,
                (item.sku,),
            )

            row = cursor.fetchone()

            if row is None:
                raise RuntimeError(f"inventory item not found: sku={item.sku}")

            if row["available"] < item.qty:
                raise RuntimeError(
                    f"not enough stock for sku={item.sku}: "
                    f"available={row['available']}, required={item.qty}"
                )

            cursor.execute(
                """
                UPDATE inventory
                SET
                    available = available - %s,
                    reserved = reserved + %s
                WHERE sku = %s
                """,
                (item.qty, item.qty, item.sku),
            )

            cursor.execute(
                """
                INSERT INTO inventory_reservations (
                    saga_id,
                    sku,
                    qty,
                    status
                )
                VALUES (%s, %s, %s, 'RESERVED')
                """,
                (saga_id, item.sku, item.qty),
            )

        insert_outbox_event_tx(
            cursor,
            aggregate_type="order",
            aggregate_id=payload.order_id,
            event_type="StockReserved",
            payload={
                "saga_id": saga_id,
                "order_id": payload.order_id,
                "items": [asdict(item) for item in payload.items],
            },
            idempotency_key=f"saga:{saga_id}:stock-reserved",
            ignore_duplicate=True,
        )

        cursor.close()

ReleaseStock — компенсация

def release_stock(conn, saga_id: str, payload: SagaPayload) -> None:
    """
    Компенсация ReserveStock.

    Освобождаем резерв:
    - available увеличиваем;
    - reserved уменьшаем.

    Условие reserved >= qty защищает от ухода в отрицательный reserved.
    """

    with transaction(conn, isolation_level="READ COMMITTED"):
        cursor = conn.cursor(dictionary=True)

        cursor.execute(
            """
            SELECT sku, qty, status
            FROM inventory_reservations
            WHERE saga_id = %s
            FOR UPDATE
            """,
            (saga_id,),
        )

        reservations = cursor.fetchall()

        for reservation in reservations:
            if reservation["status"] != "RESERVED":
                continue

            cursor.execute(
                """
                UPDATE inventory
                SET
                    available = available + %s,
                    reserved = reserved - %s
                WHERE sku = %s
                  AND reserved >= %s
                """,
                (
                    reservation["qty"],
                    reservation["qty"],
                    reservation["sku"],
                    reservation["qty"],
                ),
            )

            if cursor.rowcount != 1:
                raise RuntimeError(
                    f"cannot release stock reservation: "
                    f"saga_id={saga_id}, sku={reservation['sku']}"
                )

            cursor.execute(
                """
                UPDATE inventory_reservations
                SET status = 'RELEASED'
                WHERE saga_id = %s
                  AND sku = %s
                  AND status = 'RESERVED'
                """,
                (saga_id, reservation["sku"]),
            )

        insert_outbox_event_tx(
            cursor,
            aggregate_type="order",
            aggregate_id=payload.order_id,
            event_type="StockReleased",
            payload={
                "saga_id": saga_id,
                "order_id": payload.order_id,
                "items": [asdict(item) for item in payload.items],
            },
            idempotency_key=f"saga:{saga_id}:stock-released",
            ignore_duplicate=True,
        )

        cursor.close()

ChargePayment

def charge_payment(conn, saga_id: str, payload: SagaPayload) -> None:
    """
    Списываем оплату.

    В примере это просто INSERT в payments.
    В реальности тут может быть внешний платёжный сервис.

    Если используется внешний сервис:
    - обязательно передавай idempotency key;
    - иначе при ретраях можно списать деньги дважды.
    """

    with transaction(conn):
        cursor = conn.cursor(dictionary=True)

        cursor.execute(
            """
            INSERT INTO payments (
                order_id,
                amount_cents,
                status
            )
            VALUES (%s, %s, 'CHARGED')
            ON DUPLICATE KEY UPDATE id = id
            """,
            (payload.order_id, payload.total_cents),
        )

        cursor.execute(
            """
            SELECT status
            FROM payments
            WHERE order_id = %s
            FOR UPDATE
            """,
            (payload.order_id,),
        )

        payment = cursor.fetchone()

        if payment is None or payment["status"] != "CHARGED":
            raise RuntimeError(
                f"payment is not chargeable: order_id={payload.order_id}"
            )

        insert_outbox_event_tx(
            cursor,
            aggregate_type="order",
            aggregate_id=payload.order_id,
            event_type="PaymentCharged",
            payload={
                "saga_id": saga_id,
                "order_id": payload.order_id,
                "amount_cents": payload.total_cents,
            },
            idempotency_key=f"saga:{saga_id}:payment-charged",
            ignore_duplicate=True,
        )

        cursor.close()

RefundPayment — компенсация

def refund_payment(conn, saga_id: str, payload: SagaPayload) -> None:
    """
    Компенсация ChargePayment.

    Если оплата была списана — переводим её в REFUNDED.

    Повторный вызов безопасен:
    WHERE status = 'CHARGED' не даст второй раз "вернуть" уже refunded payment.
    """

    with transaction(conn):
        cursor = conn.cursor(dictionary=True)

        cursor.execute(
            """
            UPDATE payments
            SET status = 'REFUNDED'
            WHERE order_id = %s
              AND status = 'CHARGED'
            """,
            (payload.order_id,),
        )

        insert_outbox_event_tx(
            cursor,
            aggregate_type="order",
            aggregate_id=payload.order_id,
            event_type="PaymentRefunded",
            payload={
                "saga_id": saga_id,
                "order_id": payload.order_id,
                "amount_cents": payload.total_cents,
            },
            idempotency_key=f"saga:{saga_id}:payment-refunded",
            ignore_duplicate=True,
        )

        cursor.close()

CreateShipment

def create_shipment(conn, saga_id: str, payload: SagaPayload) -> None:
    """
    Создаём доставку.

    В примере доставка создаётся локально в таблице shipments.
    В реальном проекте это может быть вызов службы доставки.

    После успешного создания доставки заказ переводим в CONFIRMED.
    """

    with transaction(conn):
        cursor = conn.cursor()

        cursor.execute(
            """
            INSERT INTO shipments (
                order_id,
                status
            )
            VALUES (%s, 'CREATED')
            ON DUPLICATE KEY UPDATE id = id
            """,
            (payload.order_id,),
        )

        cursor.execute(
            """
            SELECT status
            FROM shipments
            WHERE order_id = %s
            FOR UPDATE
            """,
            (payload.order_id,),
        )

        shipment = cursor.fetchone()

        if shipment is None or shipment["status"] != "CREATED":
            raise RuntimeError(
                f"shipment is not creatable: order_id={payload.order_id}"
            )

        cursor.execute(
            """
            UPDATE orders
            SET status = 'CONFIRMED'
            WHERE id = %s
            """,
            (payload.order_id,),
        )

        insert_outbox_event_tx(
            cursor,
            aggregate_type="order",
            aggregate_id=payload.order_id,
            event_type="ShipmentCreated",
            payload={
                "saga_id": saga_id,
                "order_id": payload.order_id,
            },
            idempotency_key=f"saga:{saga_id}:shipment-created",
            ignore_duplicate=True,
        )

        cursor.close()

CancelShipment — компенсация

def cancel_shipment(conn, saga_id: str, payload: SagaPayload) -> None:
    """
    Компенсация CreateShipment.

    Обычно этот шаг нужен, если доставка уже была создана,
    но следующий шаг Saga упал.

    В нашем примере CreateShipment последний,
    поэтому cancel_shipment почти не понадобится.
    Но в реальной Saga после доставки могут быть ещё шаги.
    """

    with transaction(conn):
        cursor = conn.cursor()

        cursor.execute(
            """
            UPDATE shipments
            SET status = 'CANCELLED'
            WHERE order_id = %s
              AND status = 'CREATED'
            """,
            (payload.order_id,),
        )

        insert_outbox_event_tx(
            cursor,
            aggregate_type="order",
            aggregate_id=payload.order_id,
            event_type="ShipmentCancelled",
            payload={
                "saga_id": saga_id,
                "order_id": payload.order_id,
            },
            idempotency_key=f"saga:{saga_id}:shipment-cancelled",
            ignore_duplicate=True,
        )

        cursor.close()

7. Пример запуска

def main():
    """
    Демо-сценарий:

    1. Создаём заказ.
    2. Создаём Saga instance.
    3. Запускаем Saga.
    4. Один раз прогоняем outbox worker.

    В production outbox worker обычно живёт отдельным процессом.
    """

    conn = make_connection()

    try:
        items = [
            OrderItem(
                sku="iphone-15",
                qty=1,
                price_cents=100_000,
            )
        ]

        order_id = create_order_with_outbox(
            conn,
            customer_id=123,
            items=items,
        )

        payload = SagaPayload(
            order_id=order_id,
            customer_id=123,
            items=items,
            total_cents=100_000,
        )

        saga_id = str(uuid.uuid4())

        start_order_saga(
            conn,
            saga_id=saga_id,
            payload=payload,
        )

        try:
            run_order_saga(
                conn,
                saga_id=saga_id,
                payload=payload,
            )
        except Exception as exc:
            logging.warning("saga finished with error: %s", exc)

    finally:
        conn.close()

    # Для примера — один проход outbox worker.
    # В реальном проекте это лучше запускать отдельным процессом.
    process_outbox_batch(
        publisher=LogPublisher(),
        worker_id="worker-1",
        limit=100,
    )


if __name__ == "__main__":
    main()

8. Вариант outbox worker для MySQL 5.7

Если у тебя MySQL 5.7, FOR UPDATE SKIP LOCKED лучше не использовать. Можно сделать захват пачки через UPDATE ... LIMIT, а потом выбрать события по уникальному lock_token.

def lock_outbox_events_mysql57(
    conn,
    *,
    worker_id: str,
    limit: int = 100,
    lock_seconds: int = 120,
) -> list[OutboxEvent]:
    """
    Вариант для MySQL 5.7.

    Вместо SELECT ... FOR UPDATE SKIP LOCKED делаем так:

    1. UPDATE ... LIMIT помечает пачку событий как PROCESSING.
    2. locked_by ставим уникальный lock_token.
    3. Потом SELECT выбирает только события с этим lock_token.

    Это чуть грубее, чем SKIP LOCKED, но рабочий паттерн.
    """

    lock_token = f"{worker_id}:{uuid.uuid4()}"

    with transaction(conn, isolation_level="READ COMMITTED"):
        cursor = conn.cursor()

        cursor.execute(
            """
            UPDATE outbox_events
            SET
                status = 'PROCESSING',
                locked_by = %s,
                locked_until = DATE_ADD(NOW(6), INTERVAL %s SECOND)
            WHERE status IN ('PENDING', 'FAILED')
              AND attempts < 10
              AND (locked_until IS NULL OR locked_until < NOW(6))
            ORDER BY id
            LIMIT %s
            """,
            (lock_token, lock_seconds, limit),
        )

        cursor.close()

    # После UPDATE выбираем именно те события, которые захватил этот воркер.
    cursor = conn.cursor(dictionary=True)

    cursor.execute(
        """
        SELECT
            id,
            aggregate_type,
            aggregate_id,
            event_type,
            payload,
            idempotency_key
        FROM outbox_events
        WHERE status = 'PROCESSING'
          AND locked_by = %s
        ORDER BY id
        """,
        (lock_token,),
    )

    rows = cursor.fetchall()
    cursor.close()

    return [
        OutboxEvent(
            id=row["id"],
            aggregate_type=row["aggregate_type"],
            aggregate_id=row["aggregate_id"],
            event_type=row["event_type"],
            payload=row["payload"],
            idempotency_key=row["idempotency_key"],
        )
        for row in rows
    ]

Если будешь использовать MySQL 5.7, в process_outbox_batch() просто замени:

events = lock_outbox_events_mysql8(...)

на:

events = lock_outbox_events_mysql57(...)

Главное, что стоит запомнить:

Transactional Outbox решает проблему “данные в БД записали, а событие отправить не успели”.

Saga Compensation решает проблему “часть распределённой операции уже выполнена, но один из следующих шагов упал”.

А вместе они дают довольно надёжную базу для микросервисной/модульной архитектуры: локальные транзакции + события + повторяемые компенсации.


9. Дополнительные слои надёжности

Transactional Outbox и Saga Compensation — это уже хорошая база, но для реальной надёжности обычно добавляют ещё несколько слоёв. Я бы разделил на 3 зоны: входящие запросы, исходящие запросы к другим сервисам, работа с БД/очередями.


9.1. Idempotency Key для входящих запросов

Самое важное: клиент может повторить запрос, а сервер должен не создать второй заказ/платёж/заявку.

Например клиент отправляет:

POST /orders
Idempotency-Key: 8d2a5d8f-9a3a-4f8a-8c2e-123456789abc

Эта таблица уже есть в общей схеме выше. Отдельно фрагмент выглядит так:

CREATE TABLE idempotency_keys (
    idempotency_key VARCHAR(255) PRIMARY KEY,
    request_hash CHAR(64) NOT NULL,
    status ENUM('PROCESSING', 'COMPLETED', 'FAILED') NOT NULL,
    response_body JSON NULL,
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
        ON UPDATE CURRENT_TIMESTAMP(6)
) ENGINE=InnoDB;

Пример логики:

import hashlib
import json


def request_hash(payload: dict) -> str:
    """
    Хэшируем тело запроса.

    Это нужно, чтобы один и тот же Idempotency-Key
    нельзя было случайно использовать с разными payload.
    """
    raw = json.dumps(payload, sort_keys=True, separators=(",", ":"))
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()


def process_idempotent_request(conn, idempotency_key: str, payload: dict):
    """
    Упрощённый пример.

    Идея:
    - если ключ новый — создаём PROCESSING;
    - если уже COMPLETED — возвращаем сохранённый ответ;
    - если PROCESSING — можно вернуть 409/202;
    - если ключ тот же, но payload другой — ошибка.
    """

    payload_hash = request_hash(payload)

    with transaction(conn):
        cursor = conn.cursor(dictionary=True)

        cursor.execute(
            """
            SELECT idempotency_key, request_hash, status, response_body
            FROM idempotency_keys
            WHERE idempotency_key = %s
            FOR UPDATE
            """,
            (idempotency_key,),
        )

        row = cursor.fetchone()

        if row:
            if row["request_hash"] != payload_hash:
                raise RuntimeError("same Idempotency-Key used with different payload")

            if row["status"] == "COMPLETED":
                return json.loads(row["response_body"])

            if row["status"] == "PROCESSING":
                raise RuntimeError("request is already processing")

        else:
            cursor.execute(
                """
                INSERT INTO idempotency_keys (
                    idempotency_key,
                    request_hash,
                    status
                )
                VALUES (%s, %s, 'PROCESSING')
                """,
                (idempotency_key, payload_hash),
            )

        items = [
            OrderItem(
                sku=item["sku"],
                qty=item["qty"],
                price_cents=item["price_cents"],
            )
            for item in payload["items"]
        ]

        order_id = create_order_tx(
            cursor,
            customer_id=payload["customer_id"],
            items=items,
        )

        response = {
            "order_id": order_id,
            "status": "CREATED",
        }

        cursor.execute(
            """
            UPDATE idempotency_keys
            SET status = 'COMPLETED',
                response_body = %s
            WHERE idempotency_key = %s
            """,
            (json_dumps(response), idempotency_key),
        )

        cursor.close()

    return response

Это защищает от повторного нажатия кнопки, сетевых ретраев, таймаутов клиента и ситуации “запрос выполнился, но клиент не получил ответ”.


9.2. Retry с exponential backoff + jitter

Повторять запросы полезно, но тупой retry “каждую секунду” может убить соседний сервис. Лучше так:

1-я попытка: сразу
2-я: через 0.5 сек
3-я: через 1 сек
4-я: через 2 сек
5-я: через 4 сек

И обязательно добавить jitter — случайный разброс, чтобы все воркеры не ломились одновременно.

import random
import time


def retry_with_backoff(
    func,
    *,
    max_attempts: int = 5,
    base_delay: float = 0.5,
    max_delay: float = 10.0,
    retryable_exceptions: tuple[type[Exception], ...] = (Exception,),
):
    """
    Универсальный retry.

    Подходит для:
    - временных HTTP-ошибок;
    - временной недоступности брокера;
    - deadlock/lock timeout в MySQL;
    - сетевых сбоев.

    Не подходит для бизнес-ошибок:
    - недостаточно денег;
    - нет товара;
    - неправильный токен;
    - validation error.
    """

    last_exc = None

    for attempt in range(1, max_attempts + 1):
        try:
            return func()

        except retryable_exceptions as exc:
            last_exc = exc

            if attempt == max_attempts:
                break

            delay = min(max_delay, base_delay * (2 ** (attempt - 1)))

            # jitter: случайно двигаем задержку,
            # чтобы много воркеров не повторяли запрос одновременно
            delay = delay * random.uniform(0.5, 1.5)

            time.sleep(delay)

    raise last_exc

Пример использования:

def publish_to_broker():
    # kafka/rabbit/nats/http/etc
    publisher.publish(event)


retry_with_backoff(
    publish_to_broker,
    max_attempts=5,
    base_delay=0.3,
    max_delay=5,
)

9.3. Таймауты везде

Очень частая ошибка — делать запросы без таймаута. Тогда один зависший внешний сервис может подвесить воркеры, очередь, API и всё приложение. Весёлый корпоративный домино-эффект, только без веселья.

Для HTTP:

import requests


def call_payment_service(payload: dict) -> dict:
    response = requests.post(
        "https://payment-service.local/payments",
        json=payload,
        timeout=(2, 5),
        #       │  └─ read timeout
        #       └──── connect timeout
    )

    response.raise_for_status()
    return response.json()

Для MySQL стоит задавать таймауты соединения:

mysql.connector.connect(
    host="127.0.0.1",
    user="app",
    password="secret",
    database="appdb",
    connection_timeout=5,
    autocommit=False,
)

И на уровне сессии можно ограничивать ожидание блокировок:

SET SESSION innodb_lock_wait_timeout = 5;

9.4. Retry для deadlock и lock timeout в MySQL

В MySQL нормальная ситуация: две транзакции схватили строки в разном порядке — получили deadlock. Это не всегда “баг”, иногда это просто конкурентная нагрузка.

Типовые коды:

1213 — Deadlock found
1205 — Lock wait timeout exceeded

Пример:

import mysql.connector


MYSQL_DEADLOCK = 1213
MYSQL_LOCK_WAIT_TIMEOUT = 1205


def is_retryable_mysql_error(exc: Exception) -> bool:
    """
    Проверяем, стоит ли повторять MySQL-операцию.

    Deadlock и lock timeout часто можно безопасно повторить,
    если вся операция идемпотентна.
    """

    if not isinstance(exc, mysql.connector.Error):
        return False

    return exc.errno in {
        MYSQL_DEADLOCK,
        MYSQL_LOCK_WAIT_TIMEOUT,
    }


def run_transaction_with_retry(conn_factory, operation, max_attempts: int = 5):
    """
    Повторяем всю транзакцию целиком.

    Важно:
    нельзя повторять только последний SQL-запрос.
    Повторять нужно всю бизнес-операцию, потому что транзакция
    могла быть полностью откатана.
    """

    last_exc = None

    for attempt in range(1, max_attempts + 1):
        conn = conn_factory()

        try:
            result = operation(conn)
            conn.close()
            return result

        except Exception as exc:
            conn.close()
            last_exc = exc

            if not is_retryable_mysql_error(exc):
                raise

            if attempt == max_attempts:
                break

            delay = min(5.0, 0.2 * (2 ** (attempt - 1)))
            delay = delay * random.uniform(0.5, 1.5)

            time.sleep(delay)

    raise last_exc

Использование:

order_id = run_transaction_with_retry(
    make_connection,
    lambda conn: create_order_with_outbox(
        conn,
        customer_id=123,
        items=[
            OrderItem(
                sku="iphone-15",
                qty=1,
                price_cents=100_000,
            )
        ],
    ),
)

9.5. Circuit Breaker

Если внешний сервис лежит, не надо мучить его тысячами ретраев. Лучше временно “открыть цепь” и сразу возвращать ошибку/ставить задачу в очередь.

Простейшая идея:

import time


class CircuitBreaker:
    """
    Упрощённый Circuit Breaker.

    CLOSED:
        всё нормально, запросы идут.

    OPEN:
        внешний сервис считается сломанным,
        запросы временно не отправляем.

    HALF_OPEN:
        пробуем один тестовый запрос.
        Если успешен — возвращаем CLOSED.
        Если нет — снова OPEN.
    """

    def __init__(
        self,
        *,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout

        self.failures = 0
        self.state = "CLOSED"
        self.opened_at = None

    def call(self, func):
        now = time.time()

        if self.state == "OPEN":
            if now - self.opened_at >= self.recovery_timeout:
                self.state = "HALF_OPEN"
            else:
                raise RuntimeError("circuit breaker is OPEN")

        try:
            result = func()

        except Exception:
            self.failures += 1

            if self.failures >= self.failure_threshold:
                self.state = "OPEN"
                self.opened_at = time.time()

            raise

        else:
            self.failures = 0
            self.state = "CLOSED"
            return result

Использование:

payment_breaker = CircuitBreaker(
    failure_threshold=5,
    recovery_timeout=30,
)


def charge_payment_via_service(payload):
    return payment_breaker.call(
        lambda: call_payment_service(payload)
    )

Circuit Breaker хорошо сочетается с outbox/saga: внешний сервис лежит — шаг Saga падает или откладывается, компенсация/ретрай выполняются контролируемо.


9.6. Transactional Inbox для входящих событий

Outbox решает проблему отправки событий. Но на принимающей стороне нужна защита от дублей.

Таблица:

CREATE TABLE inbox_events (
    event_id VARCHAR(255) PRIMARY KEY,
    event_type VARCHAR(128) NOT NULL,
    payload JSON NOT NULL,
    processed_at DATETIME(6) NULL,
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
) ENGINE=InnoDB;

Обработка:

def process_incoming_event(conn, event_id: str, event_type: str, payload: dict):
    """
    Transactional Inbox.

    Если событие пришло повторно — INSERT упадёт по PRIMARY KEY,
    и мы поймём, что уже обрабатывали его.

    Можно сделать INSERT IGNORE, но явная обработка часто понятнее.
    """

    with transaction(conn):
        cursor = conn.cursor()

        try:
            cursor.execute(
                """
                INSERT INTO inbox_events (
                    event_id,
                    event_type,
                    payload
                )
                VALUES (%s, %s, %s)
                """,
                (event_id, event_type, json_dumps(payload)),
            )

        except mysql.connector.IntegrityError:
            # Событие уже видели.
            # Можно просто проигнорировать.
            cursor.close()
            return

        # Здесь применяем бизнес-изменения.
        # Например: обновляем статус заказа, создаём начисление и т.д.
        cursor.execute(
            """
            UPDATE orders
            SET status = 'PAID'
            WHERE id = %s
            """,
            (payload["order_id"],),
        )

        cursor.execute(
            """
            UPDATE inbox_events
            SET processed_at = NOW(6)
            WHERE event_id = %s
            """,
            (event_id,),
        )

        cursor.close()

9.7. Dead Letter Queue / Dead Letter Table

Если событие не удалось обработать 10, 20, 50 раз — бесконечно ретраить его бессмысленно. Нужно складывать в отдельную таблицу.

CREATE TABLE dead_letter_events (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    source_event_id BIGINT NULL,
    event_type VARCHAR(128) NOT NULL,
    payload JSON NOT NULL,
    error TEXT NOT NULL,
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
) ENGINE=InnoDB;

В outbox после N попыток:

def move_to_dead_letter(conn, event: OutboxEvent, exc: Exception):
    with transaction(conn):
        cursor = conn.cursor()

        cursor.execute(
            """
            INSERT INTO dead_letter_events (
                source_event_id,
                event_type,
                payload,
                error
            )
            VALUES (%s, %s, %s, %s)
            """,
            (
                event.id,
                event.event_type,
                event.payload,
                str(exc),
            ),
        )

        cursor.execute(
            """
            UPDATE outbox_events
            SET status = 'FAILED',
                last_error = %s,
                locked_by = NULL,
                locked_until = NULL
            WHERE id = %s
            """,
            (str(exc), event.id),
        )

        cursor.close()

И обязательно нужен ручной/админский механизм “переотправить из DLQ”.


9.8. Версионирование строк / Optimistic Locking

Полезно, если несколько процессов могут менять один заказ.

Добавить колонку:

ALTER TABLE orders ADD COLUMN version INT NOT NULL DEFAULT 1;

Обновлять так:

def update_order_status_optimistic(
    conn,
    *,
    order_id: int,
    expected_version: int,
    new_status: str,
):
    """
    Обновляем заказ только если версия совпадает.

    Если rowcount == 0, значит кто-то уже изменил заказ раньше нас.
    """

    with transaction(conn):
        cursor = conn.cursor()

        cursor.execute(
            """
            UPDATE orders
            SET status = %s,
                version = version + 1
            WHERE id = %s
              AND version = %s
            """,
            (new_status, order_id, expected_version),
        )

        if cursor.rowcount == 0:
            raise RuntimeError("order was modified concurrently")

        cursor.close()

Это удобно для защиты от “потерянных обновлений”.


9.9. Правильный порядок блокировок

Если в одной транзакции обновляешь несколько строк, всегда блокируй их в одинаковом порядке.

Плохо:

Транзакция A: lock product 1, потом product 2
Транзакция B: lock product 2, потом product 1

Может быть deadlock.

Лучше:

for item in sorted(payload.items, key=lambda x: x.sku):
    cursor.execute(
        """
        SELECT available
        FROM inventory
        WHERE sku = %s
        FOR UPDATE
        """,
        (item.sku,),
    )

Мелочь, но под нагрузкой прям спасает.


9.10. Ограничение параллелизма / Bulkhead

Не надо давать одному медленному внешнему сервису съесть все воркеры.

Например:

from concurrent.futures import ThreadPoolExecutor


# Отдельный пул для платежей
payment_executor = ThreadPoolExecutor(max_workers=5)

# Отдельный пул для доставки
shipment_executor = ThreadPoolExecutor(max_workers=3)

# Отдельный пул для email/sms
notification_executor = ThreadPoolExecutor(max_workers=10)

Идея: если доставка лежит, платежи и обработка заказов не должны умереть вместе с ней.


9.11. Rate limiting

Если внешний сервис принимает максимум 100 запросов в минуту, надо уважать этот лимит. Иначе начнутся 429 Too Many Requests, ретраи, лавина, боль и графики в Grafana как кардиограмма после кофе.

Простейший локальный лимитер:

import threading
import time


class SimpleRateLimiter:
    """
    Простой token bucket.

    rate_per_second=10 означает максимум примерно 10 операций в секунду.
    """

    def __init__(self, rate_per_second: float):
        self.min_interval = 1.0 / rate_per_second
        self.lock = threading.Lock()
        self.next_allowed_at = 0.0

    def wait(self):
        with self.lock:
            now = time.time()

            if now < self.next_allowed_at:
                time.sleep(self.next_allowed_at - now)

            self.next_allowed_at = time.time() + self.min_interval

Использование:

payment_limiter = SimpleRateLimiter(rate_per_second=10)


def safe_call_payment_service(payload):
    payment_limiter.wait()
    return call_payment_service(payload)

9.12. Очередь команд вместо прямого вызова

Для особенно важных операций можно не делать внешний запрос сразу из API. API только кладёт команду в таблицу/очередь:

CREATE TABLE command_queue (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    command_type VARCHAR(128) NOT NULL,
    payload JSON NOT NULL,
    status ENUM('PENDING', 'PROCESSING', 'DONE', 'FAILED') NOT NULL DEFAULT 'PENDING',
    attempts INT NOT NULL DEFAULT 0,
    locked_by VARCHAR(128) NULL,
    locked_until DATETIME(6) NULL,
    created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
        ON UPDATE CURRENT_TIMESTAMP(6)
) ENGINE=InnoDB;

Это полезно, если:

запрос пользователя короткий,
а бизнес-процесс долгий и ненадёжный

Например:

POST /orders
→ создали заказ в CREATED
→ положили команду StartOrderSaga
→ вернули 202 Accepted
→ воркер дальше делает Saga

Так API не зависит напрямую от платежей, доставки, склада и прочего зоопарка.


9.13. Outbox cleaner / retention

Outbox со временем распухнет. Опубликованные события можно чистить или архивировать.

DELETE FROM outbox_events
WHERE status = 'PUBLISHED'
  AND published_at < NOW() - INTERVAL 30 DAY
LIMIT 10000;

Лучше запускать маленькими пачками, чтобы не устроить БД “генеральную уборку бензопилой”.


9.14. Мониторинг и алерты

Без мониторинга вся надёжность — это вера. А вера в проде обычно заканчивается SSH в 3 ночи.

Минимум метрик:

outbox_pending_total
outbox_failed_total
outbox_publish_attempts_total
outbox_oldest_pending_age_seconds

saga_running_total
saga_compensating_total
saga_compensation_failed_total

mysql_deadlocks_total
mysql_lock_timeouts_total

external_request_duration_seconds
external_request_errors_total
external_request_timeouts_total

Особенно важная метрика:

возраст самого старого PENDING outbox-события

Если оно висит 30 минут — воркер, брокер или внешний сервис, скорее всего, страдает.

SQL для проверки:

SELECT
    COUNT(*) AS pending_count,
    TIMESTAMPDIFF(SECOND, MIN(created_at), NOW()) AS oldest_pending_age_seconds
FROM outbox_events
WHERE status IN ('PENDING', 'FAILED');

9.15. Что я бы добавил в первую очередь

Для твоего примера с Transactional Outbox + Saga я бы добавил в таком порядке:

1. Idempotency-Key на входящие POST-запросы.
2. Idempotency на внешние запросы: payment, shipment, inventory.
3. Retry с exponential backoff + jitter.
4. Таймауты на все HTTP/MySQL/broker операции.
5. Retry deadlock/lock timeout для MySQL-транзакций.
6. Transactional Inbox на стороне потребителей событий.
7. Dead Letter Queue/Table.
8. Circuit Breaker для внешних сервисов.
9. Метрики и алерты.
10. Очистку/архивацию outbox.

Самый практичный стек получится такой:

API
 ├─ Idempotency-Key
 ├─ короткая транзакция в MySQL
 ├─ запись бизнес-данных
 ├─ запись outbox event
 └─ быстрый ответ клиенту

Outbox Worker
 ├─ lock событий
 ├─ retry + backoff + jitter
 ├─ circuit breaker
 ├─ publish
 ├─ mark PUBLISHED
 └─ DLQ после лимита попыток

Consumer
 ├─ Transactional Inbox
 ├─ idempotent обработка
 └─ локальная транзакция

Saga Worker
 ├─ шаги с retry
 ├─ idempotency на каждом шаге
 ├─ compensation в обратном порядке
 └─ compensation failed alert

Главное правило: ретраи безопасны только там, где операция идемпотентна. Без идемпотентности retry — это не надёжность, а генератор дублей с уверенным лицом.


10. Пошаговый разбор сценариев

Ниже распишу весь процесс как единый сценарий:

Клиент → API → MySQL → Outbox → Worker → Broker/Consumer → Saga → Compensation

Для примера пусть у нас операция: создать заказ.

Заказ должен пройти шаги:

1. Создать заказ
2. Зарезервировать товар
3. Списать оплату
4. Создать доставку
5. Опубликовать события

10.1. Успешный сценарий

Клиент отправляет запрос

Клиент делает запрос:

POST /orders
Idempotency-Key: abc-123

{
  "customer_id": 123,
  "items": [
    {
      "sku": "iphone-15",
      "qty": 1,
      "price_cents": 100000
    }
  ]
}

Что происходит

API получает запрос и первым делом проверяет Idempotency-Key.

Зачем

Чтобы защититься от повторной отправки запроса.

Например:

Клиент отправил POST /orders
Сервер заказ создал
Но ответ потерялся по сети
Клиент повторил запрос

Без Idempotency-Key можно случайно создать два заказа.

С Idempotency-Key сервер понимает:

ага, такой запрос уже был
надо вернуть старый результат, а не создавать новый заказ

API создаёт запись в idempotency_keys

В таблицу пишется:

idempotency_key = abc-123
request_hash = hash от тела запроса
status = PROCESSING
response_body = NULL

Зачем

Это значит:

запрос принят в обработку
повторный такой же запрос пока не должен создавать новую операцию

Если клиент сразу повторит запрос, API увидит:

abc-123 уже PROCESSING

И может ответить:

409 Conflict
или
202 Accepted
или
"запрос уже выполняется"

API начинает транзакцию в MySQL

Начинается транзакция:

BEGIN

Внутри неё будут выполнены:

INSERT INTO orders
INSERT INTO order_items
INSERT INTO outbox_events
UPDATE idempotency_keys
COMMIT

Зачем

Нам важно, чтобы заказ и событие были сохранены атомарно.

То есть нельзя допустить ситуацию:

заказ создали,
а событие OrderCreated не записали

Или наоборот:

событие записали,
а заказ не создали

Transactional Outbox как раз решает эту проблему.


Создаётся заказ

В orders появляется запись:

id = 1001
customer_id = 123
status = CREATED
total_cents = 100000

В order_items:

order_id = 1001
sku = iphone-15
qty = 1
price_cents = 100000

Зачем

Это основная бизнес-операция. Пока заказ только создан, но ещё не подтверждён.

Статус CREATED означает:

заказ существует,
но товар ещё не зарезервирован,
оплата ещё не списана,
доставка ещё не создана

В этой же транзакции пишется событие в outbox_events

Добавляется запись:

id = 501
aggregate_type = order
aggregate_id = 1001
event_type = OrderCreated
payload = {
  "order_id": 1001,
  "customer_id": 123,
  "items": [...]
}
status = PENDING
attempts = 0
idempotency_key = order:1001:created

Зачем

Мы пока не отправляем событие наружу.

Мы только сохраняем намерение:

после commit надо будет опубликовать событие OrderCreated

Почему не отправлять событие сразу из API?

Потому что может случиться такое:

1. API отправил событие в Kafka/RabbitMQ
2. Потом MySQL transaction rollback
3. В брокере уже есть событие о заказе
4. А заказа в БД нет

Это неприятная классика жанра.

Поэтому сначала:

MySQL commit

А потом отдельный worker публикует событие.


Обновляется idempotency_keys

В той же транзакции:

idempotency_key = abc-123
status = COMPLETED
response_body = {
  "order_id": 1001,
  "status": "CREATED"
}

Зачем

Если клиент повторит запрос с тем же ключом, API вернёт тот же ответ:

{
  "order_id": 1001,
  "status": "CREATED"
}

А не создаст новый заказ.


MySQL делает COMMIT

Транзакция фиксируется.

После commit в БД одновременно есть:

orders.id = 1001
order_items для заказа 1001
outbox_events OrderCreated со status = PENDING
idempotency_keys abc-123 со status = COMPLETED

Зачем

Теперь система находится в согласованном состоянии.

Даже если API упадёт сразу после commit — не страшно.

Почему?

Потому что заказ уже есть, событие уже есть, outbox worker потом его подберёт.


API возвращает ответ клиенту

Клиент получает:

{
  "order_id": 1001,
  "status": "CREATED"
}

Зачем

API не обязан ждать, пока товар зарезервируется, оплата спишется и доставка создастся.

Можно сделать асинхронно:

заказ принят
дальше его обработают фоновые процессы

Для долгих процессов это часто лучше, чем держать HTTP-запрос открытым.


10.2. Outbox worker публикует событие

Теперь отдельно работает outbox worker.

Worker ищет события PENDING

Он выбирает:

outbox_events.status IN ('PENDING', 'FAILED')
attempts < 10
locked_until IS NULL OR locked_until < NOW()

Находит событие:

id = 501
event_type = OrderCreated
status = PENDING

Worker блокирует событие

Для MySQL 8:

SELECT ...
FOR UPDATE SKIP LOCKED

Потом обновляет:

status = PROCESSING
locked_by = worker-1:uuid
locked_until = now + 2 minutes

Зачем

Чтобы два worker-а не отправили одно и то же событие одновременно.

locked_until нужен на случай, если worker умрёт.

Например:

worker взял событие
поставил PROCESSING
упал

Через 2 минуты другой worker увидит:

locked_until истёк
можно забрать событие повторно

Worker публикует событие

Например, отправляет в Kafka/RabbitMQ/NATS/HTTP:

{
  "event_id": 501,
  "event_type": "OrderCreated",
  "aggregate_id": 1001,
  "payload": {
    "order_id": 1001,
    "customer_id": 123
  }
}

Зачем

Другие части системы узнают:

создан заказ 1001
можно запускать обработку заказа

Например, Saga worker может начать процесс:

ReserveStock → ChargePayment → CreateShipment

Worker помечает событие опубликованным

После успешной публикации:

status = PUBLISHED
published_at = now
locked_by = NULL
locked_until = NULL

Зачем

Чтобы это событие больше не отправлялось.


10.3. Consumer получает событие

Допустим, Saga worker получил событие OrderCreated.

Consumer проверяет inbox_events

Он пытается вставить:

event_id = 501
event_type = OrderCreated
payload = ...

Зачем

Это Transactional Inbox.

Он защищает от дублей.

Потому что outbox даёт не “exactly once”, а обычно:

at-least-once delivery

То есть событие может прийти повторно.

Например:

worker отправил событие
но упал до mark_outbox_published
после рестарта отправил ещё раз

Consumer должен уметь сказать:

я event_id=501 уже видел
второй раз применять не буду

Consumer создаёт Saga

Создаётся запись:

saga_instances.id = saga-777
saga_type = CreateOrderSaga
correlation_id = order:1001
status = RUNNING
payload = {
  "order_id": 1001,
  "items": [...]
}

Зачем

Saga — это журнал выполнения долгого бизнес-процесса.

По нему видно:

какая операция выполняется
на каком шаге она сейчас
какие шаги уже прошли
нужна ли компенсация

10.4. Успешное выполнение Saga

Сага состоит из шагов:

1. ReserveStock
2. ChargePayment
3. CreateShipment

Saga начинает ReserveStock

В saga_steps:

saga_id = saga-777
step_order = 1
step_name = ReserveStock
status = RUNNING

Зачем

Мы фиксируем:

начали резервировать товар

Если процесс упадёт, по таблице будет видно, на каком шаге всё остановилось.


Резервируется товар

В транзакции:

SELECT inventory WHERE sku = iphone-15 FOR UPDATE

Допустим сейчас:

available = 10
reserved = 0

После резерва:

available = 9
reserved = 1

Также в outbox пишется событие:

event_type = StockReserved
status = PENDING
idempotency_key = saga:saga-777:stock-reserved

Зачем FOR UPDATE

Чтобы два заказа одновременно не прочитали один и тот же остаток.

Без блокировки могло бы быть так:

Заказ A видит available = 1
Заказ B видит available = 1

Оба думают, что товар есть
Оба резервируют
В итоге продали 2 штуки при наличии 1

FOR UPDATE заставляет второй процесс ждать, пока первый закончит.


Saga помечает шаг успешным

ReserveStock = COMPLETED

Зачем

Теперь если дальше что-то упадёт, Saga знает:

товар был зарезервирован
значит при компенсации надо вызвать release_stock

Saga начинает ChargePayment

step_order = 2
step_name = ChargePayment
status = RUNNING

Выполняется списание оплаты

В примере локально:

payments.order_id = 1001
amount_cents = 100000
status = CHARGED

Также пишется outbox event:

event_type = PaymentCharged
status = PENDING
idempotency_key = saga:saga-777:payment-charged

Зачем idempotency здесь

Если платёжный шаг будет повторён, нельзя списать деньги дважды.

Поэтому у платежа должен быть уникальный ключ:

order_id = 1001
или
payment_idempotency_key = saga:saga-777:charge-payment

Saga помечает оплату успешной

ChargePayment = COMPLETED

Теперь компенсация знает:

если дальше всё сломается, надо сделать refund_payment

Saga начинает CreateShipment

step_order = 3
step_name = CreateShipment
status = RUNNING

Создаётся доставка

В shipments:

order_id = 1001
status = CREATED

Заказ обновляется:

orders.status = CONFIRMED

Пишется outbox event:

event_type = ShipmentCreated
status = PENDING
idempotency_key = saga:saga-777:shipment-created

Saga завершается успешно

CreateShipment = COMPLETED
saga_instances.status = COMPLETED

Итоговое состояние:

orders.status = CONFIRMED

inventory:
  available уменьшен
  reserved увеличен

payments:
  status = CHARGED

shipments:
  status = CREATED

saga_instances:
  status = COMPLETED

outbox_events:
  OrderCreated      → PUBLISHED или PENDING
  StockReserved     → PENDING/PUBLISHED
  PaymentCharged    → PENDING/PUBLISHED
  ShipmentCreated   → PENDING/PUBLISHED

Некоторые outbox events могут ещё быть PENDING, если worker не успел их отправить. Это нормально.

Главное: бизнес-состояние уже сохранено, а события будут доставлены позже.


11. Неуспешные сценарии

Теперь самое вкусное: где может упасть и что тогда происходит.


Сценарий A. Клиент отправил запрос два раза

Что произошло

Клиент отправил:

POST /orders
Idempotency-Key: abc-123

Потом из-за таймаута повторил тот же запрос.


Что делает API

API смотрит в idempotency_keys.

Вариант 1:

status = PROCESSING

Значит первый запрос ещё выполняется.

API может ответить:

409 Conflict
или 202 Accepted

Вариант 2:

status = COMPLETED
response_body = {"order_id": 1001}

API возвращает сохранённый ответ.


Зачем это нужно

Без этого можно создать два заказа.

С этим механизмом:

один Idempotency-Key = один результат

Сценарий B. API упал до COMMIT

Что произошло

API начал транзакцию:

BEGIN
INSERT INTO orders
INSERT INTO outbox_events

Но потом приложение упало до:

COMMIT

Что будет в БД

MySQL откатит незавершённую транзакцию.

Итог:

orders — заказа нет
outbox_events — события нет
idempotency_keys — может быть PROCESSING, если писали вне общей транзакции

Что делать

Лучше писать idempotency_keys и бизнес-данные аккуратно, чтобы не оставить вечный PROCESSING.

Обычно добавляют:

processing_timeout

Например:

если PROCESSING старше 5 минут,
можно считать запрос зависшим и разрешить повтор

Зачем это безопасно

Потому что без commit нет частичного бизнес-состояния.

Не получится так, что:

заказ есть, а outbox события нет

Сценарий C. API сделал COMMIT, но упал до ответа клиенту

Что произошло

API успешно сделал:

COMMIT

В БД уже есть:

orders.id = 1001
outbox_events.OrderCreated = PENDING
idempotency_keys.abc-123 = COMPLETED

Но API упал до отправки HTTP-ответа.

Клиент видит таймаут и повторяет запрос.


Что делает API после повтора

API видит:

Idempotency-Key abc-123 уже COMPLETED

И возвращает:

{
  "order_id": 1001,
  "status": "CREATED"
}

Зачем это важно

Это один из главных сценариев, ради которого нужен Idempotency-Key.

Потому что с точки зрения клиента:

запрос как будто не прошёл

А с точки зрения сервера:

заказ уже создан

Idempotency соединяет эти два мира без дублей.


Сценарий D. Outbox worker не может опубликовать событие

Что произошло

В outbox_events есть:

OrderCreated status = PENDING

Worker взял событие:

status = PROCESSING

Но брокер недоступен.

Например:

Kafka лежит
RabbitMQ недоступен
HTTP endpoint отвечает 500

Что делает worker

Он пробует отправить событие с retry:

попытка 1
пауза 0.5 сек
попытка 2
пауза 1 сек
попытка 3
пауза 2 сек

С jitter, чтобы много worker-ов не долбились одновременно.

Если всё равно не вышло:

attempts = attempts + 1
status = PENDING или FAILED
last_error = "connection refused"
locked_by = NULL
locked_until = NULL

Зачем

Событие не теряется.

Оно остаётся в БД и будет повторено позже.


Что будет после 10 неудач

Например:

attempts >= 10
status = FAILED

Или событие переносится в dead_letter_events.


Зачем Dead Letter

Чтобы ядовитое событие не крутилось бесконечно.

Например payload битый, schema не та, consumer всегда падает.

Тогда лучше:

положить в DLQ
показать алерт
разобраться вручную
переотправить после исправления

Сценарий E. Worker опубликовал событие, но упал до mark PUBLISHED

Что произошло

Worker сделал:

publisher.publish(event)

Событие ушло в брокер.

Но сразу после этого worker упал и не сделал:

UPDATE outbox_events SET status = 'PUBLISHED'

Что будет

В БД событие останется:

status = PROCESSING
locked_until = now + 2 minutes

Через 2 минуты другой worker увидит:

locked_until истёк

И отправит событие повторно.


Это плохо?

Не идеально, но ожидаемо.

Transactional Outbox обычно гарантирует:

at-least-once delivery

То есть:

событие будет доставлено минимум один раз
но может быть доставлено больше одного раза

Как защититься

На стороне consumer должен быть Transactional Inbox.

Consumer видит:

event_id = 501

Если он уже обработал 501, второй раз бизнес-логику не применяет.


Сценарий F. Consumer получил одно событие дважды

Что произошло

Consumer дважды получил:

event_id = 501
event_type = OrderCreated

Что делает Consumer

Первый раз:

INSERT INTO inbox_events(event_id=501)
успешно
обрабатывает событие
processed_at = now

Второй раз:

INSERT INTO inbox_events(event_id=501)
ошибка duplicate key

Consumer понимает:

это дубль
ничего делать не надо

Зачем

Чтобы не запустить две Saga на один заказ.

Или не списать оплату дважды.

Или не создать две доставки.


Сценарий G. Saga упала на ReserveStock

Что произошло

Заказ создан:

orders.status = CREATED

Saga стартовала:

status = RUNNING

Начался шаг:

ReserveStock = RUNNING

Но товара нет.

Например:

available = 0
required = 1

Что делает Saga

Шаг помечается:

ReserveStock = FAILED

Так как до этого не было успешных шагов, компенсировать нечего.

Saga:

status = COMPENSATED или FAILED

Заказ:

orders.status = CANCELLED

Можно записать outbox event:

OrderCancelled

Зачем

Мы не списываем оплату, если товара нет.

И не создаём доставку.

Система останавливается на первом невозможном шаге.


Сценарий H. Saga упала на ChargePayment

Что произошло

Первый шаг прошёл:

ReserveStock = COMPLETED

Товар зарезервирован:

available = 9
reserved = 1

Потом начался шаг оплаты:

ChargePayment = RUNNING

Но платёж не прошёл.

Например:

недостаточно денег
payment service rejected
card declined

Что делает Saga

Шаг оплаты:

ChargePayment = FAILED

Saga переходит:

status = COMPENSATING

Она смотрит список успешных шагов:

ReserveStock

И вызывает компенсацию в обратном порядке:

release_stock

Что делает release_stock

Было:

available = 9
reserved = 1

Станет:

available = 10
reserved = 0

Пишется событие:

StockReleased

Шаг помечается:

ReserveStock = COMPENSATED

Заказ:

orders.status = CANCELLED

Saga:

status = COMPENSATED

Зачем

Если оплату списать не удалось, товар нельзя держать зарезервированным.

Компенсация возвращает систему в нормальное бизнес-состояние.


Сценарий I. Saga упала на CreateShipment

Что произошло

Успешно прошли:

ReserveStock = COMPLETED
ChargePayment = COMPLETED

То есть:

товар зарезервирован
оплата списана

Потом начался шаг:

CreateShipment = RUNNING

Но служба доставки недоступна.


Что делает Saga

CreateShipment = FAILED
saga.status = COMPENSATING

Теперь нужно откатить успешные шаги в обратном порядке:

1. refund_payment
2. release_stock

Почему в обратном порядке

Потому что действия были:

ReserveStock → ChargePayment → CreateShipment

Откатывать надо наоборот:

ChargePayment → ReserveStock

Это как снять носки и ботинки: сначала снимаешь ботинки, потом носки. Обратно — неудобно и подозрительно.


Компенсация 1: refund_payment

Было:

payments.status = CHARGED

Станет:

payments.status = REFUNDED

Пишется событие:

PaymentRefunded

Шаг:

ChargePayment = COMPENSATED

Компенсация 2: release_stock

Было:

available = 9
reserved = 1

Станет:

available = 10
reserved = 0

Пишется событие:

StockReleased

Шаг:

ReserveStock = COMPENSATED

Финальное состояние

orders.status = CANCELLED
payments.status = REFUNDED
shipments — нет записи или status = FAILED/CANCELLED
inventory вернулся назад
saga.status = COMPENSATED

Сценарий J. Компенсация тоже упала

Что произошло

Saga упала на доставке.

Она начала компенсацию:

refund_payment

Но платёжный сервис недоступен.


Что делает Saga

ChargePayment = COMPENSATION_FAILED
saga.status = COMPENSATION_FAILED

Почему нельзя просто забыть

Потому что система в опасном состоянии:

товар зарезервирован
деньги списаны
доставка не создана
возврат денег не выполнен

Это требует внимания.


Что дальше

Нужно:

1. Алерт в мониторинг
2. Запись в dead_letter / failed saga table
3. Ручной или автоматический повтор компенсации

Например отдельный worker ищет:

saga.status = COMPENSATION_FAILED

И пробует снова выполнить компенсацию.


Зачем

Компенсация — это не “best effort и забыли”.

Компенсация — критическая бизнес-операция.

Если не получилось вернуть деньги или освободить товар, это должно гореть красным в мониторинге.


Сценарий K. MySQL deadlock

Что произошло

Две Saga одновременно резервируют товары.

Заказ A:

lock sku-1
потом lock sku-2

Заказ B:

lock sku-2
потом lock sku-1

Получился deadlock.

MySQL убивает одну из транзакций.


Что делает приложение

Если ошибка:

1213 Deadlock found

или:

1205 Lock wait timeout

Мы повторяем всю транзакцию целиком.

Не последний SQL-запрос, а всю операцию:

reserve_stock заново

Зачем

После deadlock транзакция откатилась.

Частичное состояние не применилось.

Повторить операцию безопасно, если она идемпотентна.


Как уменьшить вероятность deadlock

Всегда блокировать строки в одном порядке.

Например товары сортировать по sku:

iphone-15
macbook-pro
watch-ultra

Тогда все транзакции берут блокировки одинаково.


Сценарий L. Внешний сервис долго не отвечает

Что произошло

Payment service завис.

Запрос оплаты висит:

10 секунд
30 секунд
2 минуты

Что должно быть

Таймауты.

Например:

connect timeout = 2s
read timeout = 5s

Если ответа нет:

ошибка timeout

Зачем

Чтобы worker не завис навсегда.

Без таймаута можно получить:

все worker-ы ждут payment service
очередь растёт
API начинает тормозить
всё приложение ползёт в болото

Таймаут — это способ сказать:

мы попробовали, ответа нет, идём по контролируемому failure path

Сценарий M. Внешний сервис лежит долго

Что произошло

Payment service отвечает ошибкой уже 5 минут.


Что делает retry

Сначала несколько повторов:

0.5s
1s
2s
4s

Но если сервис реально лежит, постоянные ретраи только ухудшат ситуацию.


Что делает Circuit Breaker

После N ошибок он открывается:

state = OPEN

И новые вызовы сразу получают ошибку:

payment service unavailable

Через некоторое время:

state = HALF_OPEN

Пробует один тестовый запрос.

Если успешно:

state = CLOSED

Если снова ошибка:

state = OPEN

Зачем

Чтобы не добивать лежащий сервис.

И чтобы свои worker-ы не тратили все ресурсы на бесполезные попытки.


Сценарий N. Outbox забился большим количеством событий

Что произошло

Брокер лежал час.

В outbox_events накопилось:

100000 PENDING событий

Что делают worker-ы

Они берут события пачками:

LIMIT 100

Не пытаются забрать всё сразу.


Зачем

Чтобы не убить MySQL огромной транзакцией.

Лучше обработать:

100
100
100
100
...

чем одним махом:

100000

Что мониторить

Очень важная метрика:

oldest_pending_age_seconds

Например:

SELECT
    COUNT(*) AS pending_count,
    TIMESTAMPDIFF(SECOND, MIN(created_at), NOW()) AS oldest_pending_age_seconds
FROM outbox_events
WHERE status IN ('PENDING', 'FAILED');

Если самое старое событие висит 30 минут — что-то сломалось.


12. Общая карта состояний

Заказ

CREATED
  ↓
CONFIRMED

Либо при ошибке:

CREATED
  ↓
CANCELLED

Outbox event

PENDING
  ↓
PROCESSING
  ↓
PUBLISHED

При временной ошибке:

PENDING
  ↓
PROCESSING
  ↓
PENDING
  ↓
PROCESSING
  ↓
PUBLISHED

При постоянной ошибке:

PENDING
  ↓
PROCESSING
  ↓
FAILED
  ↓
dead_letter_events

Saga

Успешно:

RUNNING
  ↓
COMPLETED

Ошибка с успешной компенсацией:

RUNNING
  ↓
COMPENSATING
  ↓
COMPENSATED

Ошибка компенсации:

RUNNING
  ↓
COMPENSATING
  ↓
COMPENSATION_FAILED

Saga step

Успешно:

PENDING
  ↓
RUNNING
  ↓
COMPLETED

Ошибка:

PENDING
  ↓
RUNNING
  ↓
FAILED

Компенсированный шаг:

COMPLETED
  ↓
COMPENSATING
  ↓
COMPENSATED

Ошибка компенсации:

COMPLETED
  ↓
COMPENSATING
  ↓
COMPENSATION_FAILED

13. Самая важная логика “зачем всё это”

Transactional Outbox нужен, чтобы не потерять событие

Проблема без outbox:

1. Записали заказ в БД
2. Приложение упало до отправки события
3. Другие сервисы не узнали о заказе

С outbox:

1. Записали заказ
2. В той же транзакции записали событие
3. Приложение может падать
4. Worker потом всё равно отправит событие

Idempotency Key нужен, чтобы не создать дубль

Проблема без idempotency:

клиент повторил POST
получили два заказа

С idempotency:

клиент повторил POST
получил старый ответ
новый заказ не создан

Transactional Inbox нужен, чтобы не обработать дубль события

Проблема без inbox:

одно событие пришло дважды
consumer дважды списал деньги

С inbox:

event_id уже есть
повтор игнорируем

Saga нужна, чтобы управлять долгим процессом

Проблема без Saga:

товар зарезервировали
оплату списали
доставка упала
непонятно, кто и что должен откатывать

С Saga:

известно, какие шаги прошли
известно, какие компенсации вызвать
известно, где процесс сломался

Compensation нужна, потому что распределённый rollback невозможен

Нельзя просто сделать:

ROLLBACK всего заказа

если часть действий была во внешних сервисах:

платёжный сервис
служба доставки
склад
уведомления

Поэтому вместо rollback используются бизнес-компенсации:

списали деньги → сделать возврат
зарезервировали товар → снять резерв
создали доставку → отменить доставку

14. Итоговые сценарии коротко

14.1. Итоговый успешный сценарий

1. Клиент отправляет POST /orders с Idempotency-Key.
2. API проверяет, что такого ключа ещё нет.
3. API создаёт idempotency_keys PROCESSING.
4. API начинает MySQL transaction.
5. API создаёт orders + order_items.
6. API пишет OrderCreated в outbox_events.
7. API сохраняет response_body в idempotency_keys.
8. API делает COMMIT.
9. API возвращает order_id клиенту.
10. Outbox worker берёт OrderCreated.
11. Worker публикует событие.
12. Worker ставит PUBLISHED.
13. Consumer получает событие.
14. Consumer пишет event_id в inbox_events.
15. Consumer запускает Saga.
16. Saga резервирует товар.
17. Saga списывает оплату.
18. Saga создаёт доставку.
19. Saga ставит order CONFIRMED.
20. Saga завершается COMPLETED.
21. Outbox worker постепенно публикует StockReserved, PaymentCharged, ShipmentCreated.

14.2. Итоговый неуспешный сценарий

Например доставка упала:

1. Заказ создан.
2. OrderCreated записан в outbox.
3. Saga запущена.
4. ReserveStock успешен.
5. ChargePayment успешен.
6. CreateShipment упал.
7. Saga переходит в COMPENSATING.
8. Вызывает refund_payment.
9. Вызывает release_stock.
10. Заказ переводится в CANCELLED.
11. Saga становится COMPENSATED.
12. В outbox пишутся PaymentRefunded и StockReleased.
13. Worker публикует события.
14. Мониторинг видит, что Saga завершилась компенсированно, а не зависла.

Если компенсация тоже упала:

1. Saga становится COMPENSATION_FAILED.
2. Создаётся алерт.
3. Операция попадает в ручную/автоматическую очередь разбирательства.
4. Компенсацию надо повторить позже.

Самая короткая формула всей схемы:

Idempotency-Key защищает API от повторов.
Transactional Outbox защищает события от потери.
Outbox Worker доставляет события с retry.
Transactional Inbox защищает consumer от дублей.
Saga управляет долгим процессом.
Compensation откатывает уже выполненные бизнес-шаги.
DLQ и мониторинг показывают то, что само не починилось.

И прям главный принцип: каждый шаг должен быть либо атомарным, либо идемпотентным, либо компенсируемым. Лучше, конечно, всё сразу — но это уже архитектурный премиум-пакет.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment