Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save jordotech/70a6f5638419299e0c714a48e129b5aa to your computer and use it in GitHub Desktop.

Select an option

Save jordotech/70a6f5638419299e0c714a48e129b5aa to your computer and use it in GitHub Desktop.
ENG-1492 Direct Parquet Upload — Implementation Plan (13-task TDD)

ENG-1492 Direct Parquet Upload — Plan 1 Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Wire the PR-448 multipart upload endpoints into the full dataset pipeline (S3 combine → Parquet schema inference → Glue + collection dataset registration) as a 202-Accepted background pipeline, backed by a completion_status state machine with TTL-based DDB row retention.

Architecture: POST /complete returns 202 immediately. Work runs post-response via BackgroundTasks as a three-step chain (combine → infer → register) with per-step try/except rollback. Status polled via GET /multipart/{upload_id}. Terminal multipart_uploads_v1 rows expire via DynamoDB TTL (30d success/abort, 90d failure, 7d orphan). Terraform PR adds S3 AbortIncompleteMultipartUpload lifecycle rule + enables DDB TTL.

Tech Stack: FastAPI, DynamoDB (via moto in tests), PyArrow, boto3 S3 + Glue, pytest, Typer CLI, httpx.

Spec: docs/superpowers/specs/2026-05-09-eng-1492-direct-parquet-upload.md


File Structure

Modified (platform-api):

  • src/routes/uploads.pycomplete_multipart_upload() becomes 202-returning; new _run_multipart_completion_pipeline(), _set_completion_status(), _record_completion_failure(), _append_dataset_to_collection(), _rollback_s3_object(), _rollback_glue_and_schema() helpers; get_multipart_upload_status() returns new fields.
  • src/dynamodb_client/multipart_uploads.py — add COMPLETION_STATUS_* constants, set_completion_status(), append_completion_step_log(), record_failure_reason(), set_ttl() helpers. Existing create_session() gains ttl attribute. mark_completed() / mark_aborted() / mark_failed() rewrite ttl per §6a.
  • src/utils/dataset_parquet_sync.py — extend register_parquet_source() to return glue_table_name/glue_database_name via GlueCatalogManager.register_parquet_table(), accept already-computed preview to avoid double-read.
  • src/cli.pyupload-parquet command polls completion_status instead of only parquet_schema_preview.
  • src/utils/multipart_uploader.py — rename poll_schema_preview()poll_completion_status() with wider status semantics (preserve backward-compatible alias).
  • docs/structured-collections/multipart-upload-runbook.md (new doc).

Modified (cap/terraform, separate PR):

  • platform-ingestion-pipeline/s3.tf — add abort_incomplete_multipart_upload rule.
  • platform-ingestion-pipeline/dynamodb.tf — add ttl block to multipart_uploads_v1 resource.

Modified (cap/platform-frontend, minimal compat shim):

  • src/services/data-sources/multipart-upload.ts — handle 202, poll completion_status until terminal.

New tests:

  • src/tests/test_routes_multipart_pipeline.py — new file, 8 unit tests for the pipeline + status contract.
  • src/tests/test_cli_upload_parquet.py — new file, CLI integration test.

Task 1: TF — S3 abort-incomplete lifecycle rule

Repo: cap/terraform (separate from platform-api). Same branch name feature/ENG-1492-s3-multipart.

Files:

  • Modify: platform-ingestion-pipeline/s3.tf:143-159 — extend the aws_s3_bucket_lifecycle_configuration.client_data_bucket resource.

  • Step 1: Checkout the branch in terraform repo

cd /Users/admin/dev/cap/terraform
git fetch origin
git checkout -B feature/ENG-1492-s3-multipart origin/main
  • Step 2: Add the abort-incomplete rule

Open platform-ingestion-pipeline/s3.tf and after the existing rule block ending at line ~158, add:

  rule {
    id     = "abort-incomplete-multipart-uploads"
    status = "Enabled"

    abort_incomplete_multipart_upload {
      days_after_initiation = 7
    }
  }
  • Step 3: Plan it
cd platform-ingestion-pipeline
terraform workspace select staging
terraform plan -target=aws_s3_bucket_lifecycle_configuration.client_data_bucket

Expected: one in-place update on aws_s3_bucket_lifecycle_configuration.client_data_bucket adding the new rule. No replacement.

  • Step 4: Commit
cd /Users/admin/dev/cap/terraform
git add platform-ingestion-pipeline/s3.tf
git commit -m "feat(ENG-1492): add abort-incomplete-multipart lifecycle rule

Orphaned multipart parts expire 7 days after initiation. Matches the S3
MAX_OBJECT_SIZE upload window ceiling and complements the platform-api
session TTL + best-effort DELETE abort."

Task 2: TF — DynamoDB TTL on multipart_uploads_v1

Repo: cap/terraform, same branch as Task 1.

Files:

  • Modify: platform-ingestion-pipeline/dynamodb.tf — find the aws_dynamodb_table.multipart_uploads resource (or equivalent name).

  • Step 1: Locate the resource

cd /Users/admin/dev/cap/terraform/platform-ingestion-pipeline
grep -n "multipart_uploads" dynamodb.tf

If the resource does not exist yet (i.e. table currently auto-created by platform-api in local dev only), create it now; otherwise patch in place.

  • Step 2: Add ttl block

Inside the aws_dynamodb_table.multipart_uploads resource, append:

  ttl {
    attribute_name = "ttl"
    enabled        = true
  }

Full resource (if creating new):

resource "aws_dynamodb_table" "multipart_uploads" {
  name         = "${local.global_prefix}-multipart-uploads_v1"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "orgid"
  range_key    = "upload_id"

  attribute {
    name = "orgid"
    type = "S"
  }
  attribute {
    name = "upload_id"
    type = "S"
  }

  ttl {
    attribute_name = "ttl"
    enabled        = true
  }

  tags = {
    Environment = terraform.workspace
    Purpose     = "S3 multipart upload session tracking"
  }
}
  • Step 3: Plan
terraform plan -target=aws_dynamodb_table.multipart_uploads

Expected: in-place update (+ ttl) or new resource creation depending on Step 1. No destroy.

  • Step 4: Commit + open PR
cd /Users/admin/dev/cap/terraform
git add platform-ingestion-pipeline/dynamodb.tf
git commit -m "feat(ENG-1492): enable TTL on multipart_uploads_v1

Pure progress-tracker rows expire via DDB TTL so scans don't slow over
time. Platform-api writes epoch-seconds to the \`ttl\` attribute per the
spec retention policy (30d success/abort, 90d failure, 7d orphan)."
git push -u origin feature/ENG-1492-s3-multipart
gh pr create --title "ENG-1492: multipart lifecycle rule + DDB TTL" --body "See platform-api spec docs/superpowers/specs/2026-05-09-eng-1492-direct-parquet-upload.md §8 / §6a."

GATE: TF PR merges + applies to every workspace before platform-api PR merges.


Task 3: Add TTL support to MultipartUploadsClient (failing test)

Repo: cap/platform-api from now on.

Files:

  • Test: src/tests/test_routes_multipart.py — add to existing file.

  • Modify later: src/dynamodb_client/multipart_uploads.py.

  • Step 1: Write the failing test

Append to src/tests/test_routes_multipart.py:

class TestMultipartTTL:
    def test_create_session_writes_orphan_ttl(self, multipart_table):
        client = MultipartUploadsClient()
        client.create_session(
            orgid=ORG_ID,
            upload_id="u-ttl-1",
            collection_id=COLLECTION_ID,
            s3_key="k",
            s3_bucket="b",
            filename="f.parquet",
            content_type="application/vnd.apache.parquet",
            part_size=8 * 1024 * 1024,
            part_count=1,
            total_size=1,
        )
        row = client.get_session(orgid=ORG_ID, upload_id="u-ttl-1")
        now = int(arrow.utcnow().timestamp())
        assert int(row["ttl"]) >= now + 6 * 24 * 3600
        assert int(row["ttl"]) <= now + 8 * 24 * 3600

    def test_mark_completed_extends_ttl_to_30_days(self, multipart_table):
        client = MultipartUploadsClient()
        client.create_session(
            orgid=ORG_ID, upload_id="u-ttl-2", collection_id=COLLECTION_ID,
            s3_key="k", s3_bucket="b", filename="f.parquet",
            content_type="application/vnd.apache.parquet",
            part_size=8 * 1024 * 1024, part_count=1, total_size=1,
        )
        client.mark_completed(orgid=ORG_ID, upload_id="u-ttl-2")
        row = client.get_session(orgid=ORG_ID, upload_id="u-ttl-2")
        now = int(arrow.utcnow().timestamp())
        assert int(row["ttl"]) >= now + 29 * 24 * 3600
        assert int(row["ttl"]) <= now + 31 * 24 * 3600

    def test_mark_failed_extends_ttl_to_90_days(self, multipart_table):
        client = MultipartUploadsClient()
        client.create_session(
            orgid=ORG_ID, upload_id="u-ttl-3", collection_id=COLLECTION_ID,
            s3_key="k", s3_bucket="b", filename="f.parquet",
            content_type="application/vnd.apache.parquet",
            part_size=8 * 1024 * 1024, part_count=1, total_size=1,
        )
        client.mark_failed(orgid=ORG_ID, upload_id="u-ttl-3", error="boom")
        row = client.get_session(orgid=ORG_ID, upload_id="u-ttl-3")
        now = int(arrow.utcnow().timestamp())
        assert int(row["ttl"]) >= now + 89 * 24 * 3600
        assert int(row["ttl"]) <= now + 91 * 24 * 3600

    def test_mark_aborted_extends_ttl_to_30_days(self, multipart_table):
        client = MultipartUploadsClient()
        client.create_session(
            orgid=ORG_ID, upload_id="u-ttl-4", collection_id=COLLECTION_ID,
            s3_key="k", s3_bucket="b", filename="f.parquet",
            content_type="application/vnd.apache.parquet",
            part_size=8 * 1024 * 1024, part_count=1, total_size=1,
        )
        client.mark_aborted(orgid=ORG_ID, upload_id="u-ttl-4")
        row = client.get_session(orgid=ORG_ID, upload_id="u-ttl-4")
        now = int(arrow.utcnow().timestamp())
        assert int(row["ttl"]) >= now + 29 * 24 * 3600
        assert int(row["ttl"]) <= now + 31 * 24 * 3600
  • Step 2: Run tests — expect failure
just test tests/test_routes_multipart.py::TestMultipartTTL -v

Expected: all four tests FAIL with KeyError: 'ttl' (attribute not written).

  • Step 3: Implement TTL writes

Open src/dynamodb_client/multipart_uploads.py. Add retention constants near the top after existing status constants:

TTL_SECONDS_PRE_TERMINAL = 7 * 24 * 3600
TTL_SECONDS_REGISTERED = 30 * 24 * 3600
TTL_SECONDS_ABORTED = 30 * 24 * 3600
TTL_SECONDS_FAILED = 90 * 24 * 3600

In create_session(), after item["updated_at"] = now and before self.table.put_item(Item=item):

        item["ttl"] = int(arrow.utcnow().timestamp()) + TTL_SECONDS_PRE_TERMINAL

In mark_completed(), extend expr_values + update_expr_parts:

        update_expr_parts.append("#ttl = :ttl")
        expr_values[":ttl"] = int(arrow.utcnow().timestamp()) + TTL_SECONDS_REGISTERED

And the ExpressionAttributeNames dict now also carries "#ttl": "ttl":

        self.table.update_item(
            Key={"orgid": orgid, "upload_id": upload_id},
            UpdateExpression="SET " + ", ".join(update_expr_parts),
            ExpressionAttributeNames={"#status": "status", "#ttl": "ttl"},
            ExpressionAttributeValues=expr_values,
        )

In mark_aborted(), extend the update expression:

        expr += ", #ttl = :ttl"
        values[":ttl"] = int(arrow.utcnow().timestamp()) + TTL_SECONDS_ABORTED

And:

        ExpressionAttributeNames={"#status": "status", "#ttl": "ttl"},

In mark_failed():

        self.table.update_item(
            Key={"orgid": orgid, "upload_id": upload_id},
            UpdateExpression="SET #status = :status, error = :error, updated_at = :updated_at, #ttl = :ttl",
            ExpressionAttributeNames={"#status": "status", "#ttl": "ttl"},
            ExpressionAttributeValues={
                ":status": MULTIPART_STATUS_FAILED,
                ":error": error[:1024],
                ":updated_at": now,
                ":ttl": int(arrow.utcnow().timestamp()) + TTL_SECONDS_FAILED,
            },
        )
  • Step 4: Run tests — expect pass
just test tests/test_routes_multipart.py::TestMultipartTTL -v

Expected: 4 PASS.

  • Step 5: Commit
git add src/dynamodb_client/multipart_uploads.py src/tests/test_routes_multipart.py
git commit -m "feat(ENG-1492): write DDB TTL on multipart_uploads_v1 rows

create_session writes 7d orphan TTL; mark_completed bumps to 30d;
mark_aborted 30d; mark_failed 90d. Retention policy per spec §6a."

Task 4: Add completion_status state machine to MultipartUploadsClient

Files:

  • Test: src/tests/test_routes_multipart.py.

  • Modify: src/dynamodb_client/multipart_uploads.py.

  • Step 1: Write the failing tests

Append to src/tests/test_routes_multipart.py:

from dynamodb_client.multipart_uploads import (
    COMPLETION_STATUS_COMBINING,
    COMPLETION_STATUS_COMBINED,
    COMPLETION_STATUS_INFERRING,
    COMPLETION_STATUS_INFERRED,
    COMPLETION_STATUS_REGISTERING,
    COMPLETION_STATUS_REGISTERED,
    COMPLETION_STATUS_FAILED,
)


class TestCompletionStatus:
    def test_set_completion_status_updates_row_and_appends_log(self, multipart_table):
        client = MultipartUploadsClient()
        client.create_session(
            orgid=ORG_ID, upload_id="c-1", collection_id=COLLECTION_ID,
            s3_key="k", s3_bucket="b", filename="f.parquet",
            content_type="application/vnd.apache.parquet",
            part_size=8 * 1024 * 1024, part_count=1, total_size=1,
        )
        client.set_completion_status(
            orgid=ORG_ID, upload_id="c-1",
            status=COMPLETION_STATUS_COMBINING, step="combine", phase="start",
        )
        row = client.get_session(orgid=ORG_ID, upload_id="c-1")
        assert row["completion_status"] == COMPLETION_STATUS_COMBINING
        assert len(row["completion_steps_log"]) == 1
        assert row["completion_steps_log"][0]["step"] == "combine"
        assert row["completion_steps_log"][0]["phase"] == "start"
        assert row["completion_steps_log"][0]["started_at"]

    def test_set_completion_status_registered_sets_success_fields(self, multipart_table):
        client = MultipartUploadsClient()
        client.create_session(
            orgid=ORG_ID, upload_id="c-2", collection_id=COLLECTION_ID,
            s3_key="k", s3_bucket="b", filename="f.parquet",
            content_type="application/vnd.apache.parquet",
            part_size=8 * 1024 * 1024, part_count=1, total_size=1,
        )
        client.set_completion_status(
            orgid=ORG_ID, upload_id="c-2",
            status=COMPLETION_STATUS_REGISTERED, step="register", phase="end",
            extra={
                "dataset_id": "ds-abc",
                "glue_table_name": "t_abc",
                "glue_database_name": "capitol_datasets",
            },
        )
        row = client.get_session(orgid=ORG_ID, upload_id="c-2")
        assert row["completion_status"] == COMPLETION_STATUS_REGISTERED
        assert row["dataset_id"] == "ds-abc"
        assert row["glue_table_name"] == "t_abc"
        assert row["glue_database_name"] == "capitol_datasets"
        # registered also writes 30d TTL
        now = int(arrow.utcnow().timestamp())
        assert int(row["ttl"]) >= now + 29 * 24 * 3600

    def test_record_failure_reason_writes_failed_with_step(self, multipart_table):
        client = MultipartUploadsClient()
        client.create_session(
            orgid=ORG_ID, upload_id="c-3", collection_id=COLLECTION_ID,
            s3_key="k", s3_bucket="b", filename="f.parquet",
            content_type="application/vnd.apache.parquet",
            part_size=8 * 1024 * 1024, part_count=1, total_size=1,
        )
        client.record_failure_reason(
            orgid=ORG_ID, upload_id="c-3", step="infer",
            exc_str="ValueError: footer unreadable",
        )
        row = client.get_session(orgid=ORG_ID, upload_id="c-3")
        assert row["completion_status"] == COMPLETION_STATUS_FAILED
        assert row["failure_reason"]["step"] == "infer"
        assert "footer" in row["failure_reason"]["exc_str"]
        now = int(arrow.utcnow().timestamp())
        assert int(row["ttl"]) >= now + 89 * 24 * 3600  # 90d retention
  • Step 2: Run — expect fail
just test tests/test_routes_multipart.py::TestCompletionStatus -v

Expected: ImportError: cannot import name 'COMPLETION_STATUS_COMBINING' ....

  • Step 3: Implement

In src/dynamodb_client/multipart_uploads.py, below the existing status constants:

COMPLETION_STATUS_COMBINING = "combining"
COMPLETION_STATUS_COMBINED = "combined"
COMPLETION_STATUS_INFERRING = "inferring"
COMPLETION_STATUS_INFERRED = "inferred"
COMPLETION_STATUS_REGISTERING = "registering"
COMPLETION_STATUS_REGISTERED = "registered"
COMPLETION_STATUS_FAILED = "failed"

TERMINAL_COMPLETION_STATUSES = {
    COMPLETION_STATUS_REGISTERED,
    COMPLETION_STATUS_FAILED,
}

Add two methods on MultipartUploadsClient:

    def set_completion_status(
        self,
        *,
        orgid: str,
        upload_id: str,
        status: str,
        step: str,
        phase: str,  # "start" | "end"
        extra: Optional[Dict[str, Any]] = None,
    ) -> None:
        """Transition completion_status, append to completion_steps_log, optionally write extra fields.

        Writes 30d TTL once status becomes REGISTERED.
        """
        now = arrow.utcnow().isoformat()
        log_entry: Dict[str, Any] = {
            "step": step,
            "phase": phase,
            ("started_at" if phase == "start" else "finished_at"): now,
            "status": status,
        }
        set_parts = [
            "completion_status = :cs",
            "updated_at = :now",
            "completion_steps_log = list_append(if_not_exists(completion_steps_log, :empty), :log)",
        ]
        values: Dict[str, Any] = {
            ":cs": status,
            ":now": now,
            ":empty": [],
            ":log": [log_entry],
        }
        names: Dict[str, str] = {}
        if status == COMPLETION_STATUS_REGISTERED:
            set_parts.append("#ttl = :ttl")
            values[":ttl"] = int(arrow.utcnow().timestamp()) + TTL_SECONDS_REGISTERED
            names["#ttl"] = "ttl"
        if extra:
            for idx, (key, val) in enumerate(extra.items()):
                placeholder = f":x{idx}"
                name_ph = f"#x{idx}"
                set_parts.append(f"{name_ph} = {placeholder}")
                values[placeholder] = val
                names[name_ph] = key
        kwargs = {
            "Key": {"orgid": orgid, "upload_id": upload_id},
            "UpdateExpression": "SET " + ", ".join(set_parts),
            "ExpressionAttributeValues": values,
        }
        if names:
            kwargs["ExpressionAttributeNames"] = names
        self.table.update_item(**kwargs)

    def record_failure_reason(
        self,
        *,
        orgid: str,
        upload_id: str,
        step: str,
        exc_str: str,
    ) -> None:
        """Write completion_status=failed + failure_reason payload + 90d TTL."""
        now_iso = arrow.utcnow().isoformat()
        log_entry = {
            "step": step,
            "phase": "end",
            "finished_at": now_iso,
            "status": COMPLETION_STATUS_FAILED,
            "error": exc_str[:1024],
        }
        self.table.update_item(
            Key={"orgid": orgid, "upload_id": upload_id},
            UpdateExpression=(
                "SET completion_status = :cs, failure_reason = :fr, "
                "updated_at = :now, #ttl = :ttl, "
                "completion_steps_log = list_append(if_not_exists(completion_steps_log, :empty), :log)"
            ),
            ExpressionAttributeNames={"#ttl": "ttl"},
            ExpressionAttributeValues={
                ":cs": COMPLETION_STATUS_FAILED,
                ":fr": {"step": step, "exc_str": exc_str[:1024], "at": now_iso},
                ":now": now_iso,
                ":ttl": int(arrow.utcnow().timestamp()) + TTL_SECONDS_FAILED,
                ":empty": [],
                ":log": [log_entry],
            },
        )
  • Step 4: Run — expect pass
just test tests/test_routes_multipart.py::TestCompletionStatus -v

Expected: 3 PASS.

  • Step 5: Commit
git add src/dynamodb_client/multipart_uploads.py src/tests/test_routes_multipart.py
git commit -m "feat(ENG-1492): completion_status state machine on multipart sessions

Adds set_completion_status() + record_failure_reason() on
MultipartUploadsClient. Appends per-step log entries and writes
dataset_id/glue_table_name/ttl on terminal transitions."

Task 5: Extend register_parquet_source to accept a precomputed preview + return Glue names

Currently register_parquet_source() re-reads the parquet footer. The completion pipeline already has the preview from the infer step, so we thread it through. It also must return the Glue database/table names by calling GlueCatalogManager.register_parquet_table().

Files:

  • Test: src/tests/test_utils_dataset_parquet_sync.py (new).

  • Modify: src/utils/dataset_parquet_sync.py:307-381.

  • Step 1: Write the failing test

Create src/tests/test_utils_dataset_parquet_sync.py:

"""Tests for register_parquet_source — direct Parquet upload path."""
from __future__ import annotations

from unittest.mock import MagicMock, patch

import pytest

from utils.dataset_parquet_sync import register_parquet_source


@pytest.fixture
def preview():
    return {
        "columns": ["Foo Bar", "Foo-Bar"],  # collides case-insensitively after sanitization
        "schema_fields": [
            {"name": "Foo Bar", "type": "string"},
            {"name": "Foo-Bar", "type": "string"},
        ],
        "row_count": 42,
        "num_row_groups": 1,
        "size_bytes": 1024,
    }


def test_register_parquet_source_uses_preview_when_supplied(preview):
    schemas_client = MagicMock()
    glue_manager = MagicMock()
    glue_manager.register_parquet_table.return_value = {
        "glue_database_name": "capitol_datasets",
        "glue_table_name": "t_test",
        "action": "created",
    }
    with patch("utils.dataset_parquet_sync.extract_parquet_schema_from_s3") as extractor:
        result = register_parquet_source(
            dataset_id="ds-1",
            parquet_object_key="org/coll/file.parquet",
            schemas_client=schemas_client,
            glue_manager=glue_manager,
            s3_client=MagicMock(bucket="b"),
            preview=preview,
        )
    extractor.assert_not_called()
    schemas_client.put_schema.assert_called_once()
    glue_manager.register_parquet_table.assert_called_once_with(
        dataset_id="ds-1", csv_object_key="org/coll/file.parquet",
    )
    assert result["glue_table_name"] == "t_test"
    assert result["glue_database_name"] == "capitol_datasets"
    assert result["row_count"] == 42


def test_register_parquet_source_reads_footer_when_preview_absent(preview):
    schemas_client = MagicMock()
    glue_manager = MagicMock()
    glue_manager.register_parquet_table.return_value = {
        "glue_database_name": "capitol_datasets",
        "glue_table_name": "t_test2",
    }
    with patch(
        "utils.dataset_parquet_sync.extract_parquet_schema_from_s3",
        return_value=preview,
    ) as extractor:
        result = register_parquet_source(
            dataset_id="ds-2",
            parquet_object_key="org/coll/file2.parquet",
            schemas_client=schemas_client,
            glue_manager=glue_manager,
            s3_client=MagicMock(bucket="b"),
        )
    extractor.assert_called_once()
    assert result["glue_table_name"] == "t_test2"
  • Step 2: Run — expect fail
just test tests/test_utils_dataset_parquet_sync.py -v

Expected: TypeError: register_parquet_source() got an unexpected keyword argument 'preview'.

  • Step 3: Update register_parquet_source signature + body

Replace the function in src/utils/dataset_parquet_sync.py starting at line 307:

def register_parquet_source(
    *,
    dataset_id: str,
    parquet_object_key: str,
    s3_client: Optional[s3Client] = None,
    schemas_client: Optional[CollectionDatasetSchemasClient] = None,
    glue_manager: Optional[GlueCatalogManager] = None,
    preview: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
    """Register an already-uploaded Parquet file as a dataset source.

    If ``preview`` is supplied (from an upstream footer read), the footer is
    not re-read. Otherwise the footer is read inline. Glue registration runs
    after schema persistence and the resulting database/table names are
    returned.

    Raises:
        ValueError: if the object is missing, unreadable, or contains column
            names that cannot be normalized to valid Glue identifiers.
    """
    s3_client = s3_client or s3Client(bucket=settings.S3_UPLOADS_BUCKET)
    schemas_client = schemas_client or CollectionDatasetSchemasClient()
    glue_manager = glue_manager or GlueCatalogManager(schemas_client=schemas_client)
    bucket_name = getattr(s3_client, "bucket", settings.S3_UPLOADS_BUCKET)

    if preview is None:
        preview = extract_parquet_schema_from_s3(
            s3_client=s3_client.s3,
            bucket=bucket_name,
            key=parquet_object_key,
        )

    raw_columns: List[str] = list(preview.get("columns") or [])
    if not raw_columns:
        raise ValueError(f"Parquet file has no columns: s3://{bucket_name}/{parquet_object_key}")

    sanitized_columns = _sanitize_and_dedupe_glue_columns(raw_columns)

    schema = {
        "columns": sanitized_columns,
        "row_count": preview["row_count"],
        "column_count": len(sanitized_columns),
        "primary_key": [],
        "foreign_keys": [],
    }

    metadata = {
        "parquet_object_key": parquet_object_key,
        "parquet_row_count": preview["row_count"],
        "parquet_synced_at": arrow.utcnow().isoformat(),
        "parquet_size_bytes": preview.get("size_bytes"),
        "parquet_num_row_groups": preview.get("num_row_groups"),
        "schema_fields": sanitized_columns,
        "schema_types": [field["type"] for field in preview.get("schema_fields") or []],
        "column_name_mapping": {
            "original": raw_columns,
            "resolved": sanitized_columns,
        },
        "source": "multipart_parquet_upload",
    }

    schemas_client.put_schema(
        dataset_id=dataset_id,
        csv_object_key=parquet_object_key,
        schema=schema,
        metadata=metadata,
    )

    glue_result = glue_manager.register_parquet_table(
        dataset_id=dataset_id,
        csv_object_key=parquet_object_key,
    )

    return {
        "dataset_id": dataset_id,
        "csv_object_key": parquet_object_key,
        "parquet_object_key": parquet_object_key,
        "row_count": preview["row_count"],
        "glue_table_name": glue_result.get("glue_table_name"),
        "glue_database_name": glue_result.get("glue_database_name"),
        "schema": schema,
        "preview": preview,
    }
  • Step 4: Run — expect pass
just test tests/test_utils_dataset_parquet_sync.py -v

Expected: 2 PASS.

  • Step 5: Commit
git add src/utils/dataset_parquet_sync.py src/tests/test_utils_dataset_parquet_sync.py
git commit -m "refactor(ENG-1492): register_parquet_source accepts precomputed preview + returns Glue names

Avoids re-reading the Parquet footer when the completion pipeline already
has the preview. Runs Glue registration and returns database/table names
so the pipeline can persist them on the multipart session row."

Task 6: Pipeline helpers — _append_dataset_to_collection, _rollback_s3_object, _rollback_glue_and_schema

Files:

  • Modify: src/routes/uploads.py (append helpers near _run_multipart_schema_inference).

  • Test: src/tests/test_routes_multipart_pipeline.py (new).

  • Step 1: Create the new test file with helper tests

Write src/tests/test_routes_multipart_pipeline.py:

"""Unit tests for the multipart completion pipeline helpers.

These tests cover the small units wired together by
``_run_multipart_completion_pipeline``. The pipeline itself is tested in
test_routes_multipart_pipeline_flow.py (later tasks).
"""
from __future__ import annotations

from unittest.mock import MagicMock, patch

import pytest
from botocore.exceptions import ClientError

from routes.uploads import (
    _append_dataset_to_collection,
    _rollback_glue_and_schema,
    _rollback_s3_object,
)


@pytest.fixture
def collection_client():
    with patch("routes.uploads.CollectionsDynamoDbClient") as cls:
        instance = MagicMock()
        cls.return_value = instance
        yield instance


def _collection_with_datasets(datasets):
    coll = MagicMock()
    coll.config = {"datasets": list(datasets)}
    return coll


def test_append_dataset_to_collection_writes_new_entry(collection_client):
    collection_client.get_org_collection_by_id.return_value = _collection_with_datasets([])
    entry = {"id": "ds-new", "source_type": "multipart_parquet_upload"}
    _append_dataset_to_collection(orgid="o", collection_id="c", dataset=entry)
    collection_client.update_collection.assert_called_once()
    kwargs = collection_client.update_collection.call_args.kwargs
    assert kwargs["orgid"] == "o"
    assert kwargs["collection_id"] == "c"
    # update_data.config.datasets contains the new entry
    update_data = kwargs["update_data"]
    assert update_data.config["datasets"] == [entry]


def test_append_dataset_preserves_existing_datasets(collection_client):
    existing = [{"id": "ds-old"}]
    collection_client.get_org_collection_by_id.return_value = _collection_with_datasets(existing)
    new_entry = {"id": "ds-new"}
    _append_dataset_to_collection(orgid="o", collection_id="c", dataset=new_entry)
    update_data = collection_client.update_collection.call_args.kwargs["update_data"]
    assert update_data.config["datasets"] == [{"id": "ds-old"}, {"id": "ds-new"}]


def test_rollback_s3_object_is_best_effort():
    s3 = MagicMock()
    s3.s3.delete_object.side_effect = ClientError({"Error": {"Code": "NoSuchKey"}}, "DeleteObject")
    # Must not raise
    _rollback_s3_object(s3_client=s3, bucket="b", key="k")
    s3.s3.delete_object.assert_called_once_with(Bucket="b", Key="k")


def test_rollback_glue_and_schema_runs_all_substeps_even_if_one_fails(collection_client):
    glue = MagicMock()
    glue.drop_table.side_effect = RuntimeError("glue exploded")
    schemas = MagicMock()
    collection_client.get_org_collection_by_id.return_value = _collection_with_datasets(
        [{"id": "ds-x", "source_type": "multipart_parquet_upload"}]
    )
    _rollback_glue_and_schema(
        glue_manager=glue,
        schemas_client=schemas,
        dataset_id="ds-x",
        parquet_object_key="k",
        glue_table_name="t_x",
        glue_database_name="capitol_datasets",
        orgid="o",
        collection_id="c",
    )
    glue.drop_table.assert_called_once_with(table_name="t_x", database_name="capitol_datasets")
    schemas.delete_schema.assert_called_once_with(dataset_id="ds-x", csv_object_key="k")
    # collection write should have removed the entry
    update_data = collection_client.update_collection.call_args.kwargs["update_data"]
    assert update_data.config["datasets"] == []
  • Step 2: Run — expect fail
just test tests/test_routes_multipart_pipeline.py -v

Expected: ImportError: cannot import name '_append_dataset_to_collection' from 'routes.uploads'.

  • Step 3: Implement helpers in src/routes/uploads.py

Near the existing _run_multipart_schema_inference function (around line 1939), add after it:

def _append_dataset_to_collection(
    *,
    orgid: str,
    collection_id: str,
    dataset: Dict[str, Any],
) -> None:
    """Append a dataset entry onto collection.config['datasets'] idempotently (by id)."""
    client = CollectionsDynamoDbClient()
    collection = client.get_org_collection_by_id(orgid, collection_id)
    if not collection:
        raise RuntimeError(
            f"Collection {collection_id} not found for org {orgid} during dataset append"
        )
    config = dict(collection.config or {})
    datasets = list(config.get("datasets") or [])
    # Idempotent replace if same id exists (supports pipeline resume).
    datasets = [d for d in datasets if d.get("id") != dataset["id"]]
    datasets.append(dataset)
    config["datasets"] = datasets
    client.update_collection(
        orgid=orgid,
        collection_id=collection_id,
        update_data=CollectionUpdate(config=config),
    )


def _rollback_s3_object(*, s3_client: Any, bucket: str, key: str) -> None:
    """Best-effort S3 delete; never raises."""
    try:
        s3_client.s3.delete_object(Bucket=bucket, Key=key)
    except Exception as exc:
        logger.warning("Rollback S3 delete failed for s3://%s/%s: %s", bucket, key, exc)


def _rollback_glue_and_schema(
    *,
    glue_manager: Any,
    schemas_client: Any,
    dataset_id: str,
    parquet_object_key: str,
    glue_table_name: Optional[str],
    glue_database_name: Optional[str],
    orgid: str,
    collection_id: str,
) -> None:
    """Best-effort rollback of Glue + schema + dataset-entry writes.

    Each sub-step wrapped individually — later sub-steps still run if an
    earlier one fails. Errors are logged, never re-raised (the caller has
    already handled the primary exception).
    """
    if glue_table_name:
        try:
            glue_manager.drop_table(
                table_name=glue_table_name,
                database_name=glue_database_name,
            )
        except Exception as exc:
            logger.warning("Rollback Glue drop_table failed: %s", exc)
    try:
        schemas_client.delete_schema(dataset_id=dataset_id, csv_object_key=parquet_object_key)
    except Exception as exc:
        logger.warning("Rollback schema delete failed: %s", exc)
    try:
        client = CollectionsDynamoDbClient()
        collection = client.get_org_collection_by_id(orgid, collection_id)
        if collection:
            config = dict(collection.config or {})
            datasets = [d for d in (config.get("datasets") or []) if d.get("id") != dataset_id]
            config["datasets"] = datasets
            client.update_collection(
                orgid=orgid,
                collection_id=collection_id,
                update_data=CollectionUpdate(config=config),
            )
    except Exception as exc:
        logger.warning("Rollback collection dataset-entry removal failed: %s", exc)

Check imports at top of src/routes/uploads.py — ensure CollectionUpdate is imported; if not:

from schemas.collections import CollectionUpdate  # add if missing
  • Step 4: Run — expect pass
just test tests/test_routes_multipart_pipeline.py -v

Expected: 4 PASS.

  • Step 5: Commit
git add src/routes/uploads.py src/tests/test_routes_multipart_pipeline.py
git commit -m "feat(ENG-1492): pipeline rollback + dataset-append helpers

Small building blocks for the background completion pipeline:
_append_dataset_to_collection, _rollback_s3_object,
_rollback_glue_and_schema. Each rollback is best-effort and never
re-raises."

Task 7: Implement _run_multipart_completion_pipeline

Files:

  • Modify: src/routes/uploads.py.

  • Test: src/tests/test_routes_multipart_pipeline.py (append).

  • Step 1: Write the failing tests for the pipeline

Append to src/tests/test_routes_multipart_pipeline.py:

from dynamodb_client.multipart_uploads import (
    COMPLETION_STATUS_COMBINED,
    COMPLETION_STATUS_FAILED,
    COMPLETION_STATUS_INFERRED,
    COMPLETION_STATUS_REGISTERED,
)


class _FakeSession(dict):
    """dict that mimics the minimal multipart session shape used by the pipeline."""


@pytest.fixture
def sessions_client():
    with patch("routes.uploads.MultipartUploadsClient") as cls:
        instance = MagicMock()
        cls.return_value = instance
        instance.get_session.return_value = _FakeSession(
            orgid="o", upload_id="u",
            collection_id="c",
            s3_bucket="b", s3_key="k",
            filename="f.parquet",
            total_size=1024,
        )
        yield instance


def _preview():
    return {
        "columns": ["a", "b"],
        "schema_fields": [{"name": "a", "type": "string"}, {"name": "b", "type": "string"}],
        "row_count": 10,
        "num_row_groups": 1,
        "size_bytes": 1024,
    }


def test_pipeline_happy_path(sessions_client, collection_client):
    from routes.uploads import _run_multipart_completion_pipeline

    collection_client.get_org_collection_by_id.return_value = _collection_with_datasets([])

    with (
        patch("routes.uploads.s3Client") as s3_cls,
        patch("routes.uploads.extract_parquet_schema_from_s3", return_value=_preview()),
        patch("routes.uploads.register_parquet_source") as reg,
    ):
        s3_cls.return_value.s3.complete_multipart_upload.return_value = {"ETag": "etag1"}
        s3_cls.return_value.s3.head_object.return_value = {"ContentLength": 1024}
        reg.return_value = {
            "dataset_id": "will-be-overridden",
            "glue_table_name": "t_ok",
            "glue_database_name": "capitol_datasets",
            "row_count": 10,
            "preview": _preview(),
            "schema": {"columns": ["a", "b"]},
        }
        _run_multipart_completion_pipeline(
            orgid="o", collection_id="c", upload_id="u",
            parts=[{"ETag": "e1", "PartNumber": 1}],
        )

    # combine -> combined, infer -> inferred, register -> registered
    statuses = [c.kwargs["status"] for c in sessions_client.set_completion_status.call_args_list]
    assert "combining" in statuses and COMPLETION_STATUS_COMBINED in statuses
    assert "inferring" in statuses and COMPLETION_STATUS_INFERRED in statuses
    assert "registering" in statuses and COMPLETION_STATUS_REGISTERED in statuses
    # dataset was appended
    update_data = collection_client.update_collection.call_args.kwargs["update_data"]
    datasets = update_data.config["datasets"]
    assert len(datasets) == 1
    assert datasets[0]["source_type"] == "multipart_parquet_upload"
    assert datasets[0]["parquet_object_keys"] == ["k"]
    assert datasets[0]["glue_table_name"] == "t_ok"


def test_pipeline_combine_failure_marks_failed(sessions_client, collection_client):
    from routes.uploads import _run_multipart_completion_pipeline

    with patch("routes.uploads.s3Client") as s3_cls:
        s3_cls.return_value.s3.complete_multipart_upload.side_effect = ClientError(
            {"Error": {"Code": "InvalidPart"}}, "CompleteMultipartUpload",
        )
        _run_multipart_completion_pipeline(
            orgid="o", collection_id="c", upload_id="u",
            parts=[{"ETag": "e1", "PartNumber": 1}],
        )

    assert sessions_client.record_failure_reason.called
    kwargs = sessions_client.record_failure_reason.call_args.kwargs
    assert kwargs["step"] == "combine"


def test_pipeline_infer_failure_deletes_s3_object(sessions_client, collection_client):
    from routes.uploads import _run_multipart_completion_pipeline

    with (
        patch("routes.uploads.s3Client") as s3_cls,
        patch("routes.uploads.extract_parquet_schema_from_s3", side_effect=ValueError("bad footer")),
    ):
        s3_cls.return_value.s3.complete_multipart_upload.return_value = {"ETag": "e"}
        s3_cls.return_value.s3.head_object.return_value = {"ContentLength": 1024}
        _run_multipart_completion_pipeline(
            orgid="o", collection_id="c", upload_id="u",
            parts=[{"ETag": "e1", "PartNumber": 1}],
        )

    # infer step recorded failure
    assert sessions_client.record_failure_reason.call_args.kwargs["step"] == "infer"
    # rolled back S3
    s3_cls.return_value.s3.delete_object.assert_called_once_with(Bucket="b", Key="k")


def test_pipeline_register_failure_rolls_back_glue(sessions_client, collection_client):
    from routes.uploads import _run_multipart_completion_pipeline

    collection_client.get_org_collection_by_id.return_value = _collection_with_datasets([])
    with (
        patch("routes.uploads.s3Client") as s3_cls,
        patch("routes.uploads.extract_parquet_schema_from_s3", return_value=_preview()),
        patch("routes.uploads.register_parquet_source", side_effect=RuntimeError("glue 500")),
        patch("routes.uploads.GlueCatalogManager") as glue_cls,
        patch("routes.uploads.CollectionDatasetSchemasClient"),
    ):
        s3_cls.return_value.s3.complete_multipart_upload.return_value = {"ETag": "e"}
        s3_cls.return_value.s3.head_object.return_value = {"ContentLength": 1024}
        _run_multipart_completion_pipeline(
            orgid="o", collection_id="c", upload_id="u",
            parts=[{"ETag": "e1", "PartNumber": 1}],
        )

    assert sessions_client.record_failure_reason.call_args.kwargs["step"] == "register"
    # S3 object stays per spec §7 rule 2; no delete called
    s3_cls.return_value.s3.delete_object.assert_not_called()
  • Step 2: Run — expect fail
just test tests/test_routes_multipart_pipeline.py -v

Expected: ImportError: cannot import name '_run_multipart_completion_pipeline'.

  • Step 3: Implement _run_multipart_completion_pipeline

In src/routes/uploads.py, replace existing _run_multipart_schema_inference with a single pipeline entry point. Add these imports at the top of the file if not already present:

from utils.glue_catalog import GlueCatalogManager
from utils.dataset_parquet_sync import register_parquet_source
from dynamodb_client.collection_dataset_schemas import CollectionDatasetSchemasClient
from dynamodb_client.multipart_uploads import (
    COMPLETION_STATUS_COMBINED,
    COMPLETION_STATUS_COMBINING,
    COMPLETION_STATUS_FAILED,
    COMPLETION_STATUS_INFERRED,
    COMPLETION_STATUS_INFERRING,
    COMPLETION_STATUS_REGISTERED,
    COMPLETION_STATUS_REGISTERING,
    MultipartUploadsClient,
    MULTIPART_STATUS_INITIATED,
)

Add the pipeline function below the helpers from Task 6:

def _run_multipart_completion_pipeline(
    *,
    orgid: str,
    collection_id: str,
    upload_id: str,
    parts: List[Dict[str, Any]],
) -> None:
    """Run the combine → infer → register chain for a completed multipart upload.

    Each step updates completion_status + completion_steps_log. Step failures
    are terminal: write failure_reason + run targeted rollback + return. The
    function never raises.
    """
    sessions = MultipartUploadsClient()
    session = sessions.get_session(orgid=orgid, upload_id=upload_id)
    if not session:
        logger.error(
            "Multipart session not found during completion pipeline",
            extra={"orgid": orgid, "upload_id": upload_id},
        )
        return
    bucket = session["s3_bucket"]
    s3_key = session["s3_key"]
    s3 = s3Client(bucket=bucket)

    # Step 1: combine
    try:
        sessions.set_completion_status(
            orgid=orgid, upload_id=upload_id,
            status=COMPLETION_STATUS_COMBINING, step="combine", phase="start",
        )
        complete_resp = s3.s3.complete_multipart_upload(
            Bucket=bucket,
            Key=s3_key,
            UploadId=upload_id,
            MultipartUpload={"Parts": sorted(parts, key=lambda p: p["PartNumber"])},
        )
        etag = complete_resp.get("ETag")
        try:
            head = s3.s3.head_object(Bucket=bucket, Key=s3_key)
            total_size = int(head.get("ContentLength") or session.get("total_size") or 0)
        except ClientError:
            total_size = int(session.get("total_size") or 0)
        sessions.set_completion_status(
            orgid=orgid, upload_id=upload_id,
            status=COMPLETION_STATUS_COMBINED, step="combine", phase="end",
            extra={"final_etag": etag, "total_size": total_size},
        )
    except Exception as exc:
        capture_exception(exc)
        sessions.record_failure_reason(
            orgid=orgid, upload_id=upload_id, step="combine", exc_str=repr(exc),
        )
        return

    # Step 2: infer
    try:
        sessions.set_completion_status(
            orgid=orgid, upload_id=upload_id,
            status=COMPLETION_STATUS_INFERRING, step="infer", phase="start",
        )
        preview = extract_parquet_schema_from_s3(
            s3_client=s3.s3,
            bucket=bucket,
            key=s3_key,
        )
        sessions.set_schema_preview(
            orgid=orgid, upload_id=upload_id, schema_preview=preview,
        )
        sessions.set_completion_status(
            orgid=orgid, upload_id=upload_id,
            status=COMPLETION_STATUS_INFERRED, step="infer", phase="end",
        )
    except Exception as exc:
        capture_exception(exc)
        sessions.record_failure_reason(
            orgid=orgid, upload_id=upload_id, step="infer", exc_str=repr(exc),
        )
        _rollback_s3_object(s3_client=s3, bucket=bucket, key=s3_key)
        return

    # Step 3: register
    dataset_id = str(uuid.uuid4())
    schemas_client = CollectionDatasetSchemasClient()
    glue_manager = GlueCatalogManager(schemas_client=schemas_client)
    glue_table_name: Optional[str] = None
    glue_database_name: Optional[str] = None
    dataset_appended = False
    try:
        sessions.set_completion_status(
            orgid=orgid, upload_id=upload_id,
            status=COMPLETION_STATUS_REGISTERING, step="register", phase="start",
        )
        reg_result = register_parquet_source(
            dataset_id=dataset_id,
            parquet_object_key=s3_key,
            s3_client=s3,
            schemas_client=schemas_client,
            glue_manager=glue_manager,
            preview=preview,
        )
        glue_table_name = reg_result.get("glue_table_name")
        glue_database_name = reg_result.get("glue_database_name")
        now_iso = arrow.utcnow().isoformat()
        dataset_entry = {
            "id": dataset_id,
            "source_type": "multipart_parquet_upload",
            "parquet_object_keys": [s3_key],
            "csv_object_keys": [],
            "glue_database_name": glue_database_name,
            "glue_table_name": glue_table_name,
            "schema": {
                "columns": preview.get("columns"),
                "schema_fields": preview.get("schema_fields"),
            },
            "row_count": preview.get("row_count"),
            "size_bytes": preview.get("size_bytes"),
            "upload_id": upload_id,
            "filename": session.get("filename"),
            "created_at": now_iso,
            "updated_at": now_iso,
            "deleted_at": None,
            "deletion_requested_at": None,
        }
        _append_dataset_to_collection(
            orgid=orgid, collection_id=collection_id, dataset=dataset_entry,
        )
        dataset_appended = True
        sessions.set_completion_status(
            orgid=orgid, upload_id=upload_id,
            status=COMPLETION_STATUS_REGISTERED, step="register", phase="end",
            extra={
                "dataset_id": dataset_id,
                "glue_table_name": glue_table_name,
                "glue_database_name": glue_database_name,
            },
        )
    except Exception as exc:
        capture_exception(exc)
        sessions.record_failure_reason(
            orgid=orgid, upload_id=upload_id, step="register", exc_str=repr(exc),
        )
        _rollback_glue_and_schema(
            glue_manager=glue_manager,
            schemas_client=schemas_client,
            dataset_id=dataset_id,
            parquet_object_key=s3_key,
            glue_table_name=glue_table_name,
            glue_database_name=glue_database_name,
            orgid=orgid,
            collection_id=collection_id if dataset_appended else collection_id,
        )
        return

Delete the old _run_multipart_schema_inference function — it's replaced by the infer step above. Update any remaining references to import from the new name.

  • Step 4: Run — expect pass
just test tests/test_routes_multipart_pipeline.py -v

Expected: 8 PASS (4 helper tests from Task 6 + 4 pipeline-flow tests).

  • Step 5: Commit
git add src/routes/uploads.py src/tests/test_routes_multipart_pipeline.py
git commit -m "feat(ENG-1492): background completion pipeline (combine -> infer -> register)

Replaces _run_multipart_schema_inference with a three-step chain that
takes an uploaded multipart from parts-only to first-class dataset.
Per-step status transitions on the multipart row; per-step rollback on
failure per spec §7."

Task 8: Convert /complete endpoint to return 202 + enqueue pipeline

Files:

  • Modify: src/routes/uploads.py:2131-2208.

  • Test: src/tests/test_routes_multipart_pipeline.py (append).

  • Step 1: Write the failing test

Append to src/tests/test_routes_multipart_pipeline.py:

class TestCompleteEndpoint:
    def test_complete_returns_202_and_schedules_pipeline(
        self, test_client, mock_s3_setup, mock_dynamodb_client, multipart_table, no_schema_inference
    ):
        # init a session first
        payload = {"filename": "big.parquet", "total_size": 12 * 1024 * 1024}
        init = test_client.post(
            f"/public/uploads/{ORG_ID}/{COLLECTION_ID}/multipart/init",
            json=payload, headers={"X-User-Token": "test-token"},
        )
        assert init.status_code == 200
        upload_id = init.json()["upload_id"]

        with patch("routes.uploads._run_multipart_completion_pipeline") as pipeline:
            response = test_client.post(
                f"/public/uploads/{ORG_ID}/{COLLECTION_ID}/multipart/{upload_id}/complete",
                json={"parts": [{"part_number": 1, "etag": "e1"}, {"part_number": 2, "etag": "e2"}]},
                headers={"X-User-Token": "test-token"},
            )

        assert response.status_code == 202, response.text
        body = response.json()
        assert body["upload_id"] == upload_id
        assert body["completion_status"] == "combining"
        assert body["s3_bucket"]
        assert body["s3_key"]
        assert body["part_count"] == 2
        assert pipeline.called

The import lines at the top of the new test file must bring in what it uses from test_routes_multipart:

from tests.test_routes_multipart import (
    ORG_ID, COLLECTION_ID, MULTIPART_DEFAULT_TABLE,
    structured_collection, unstructured_collection, mock_dynamodb_client,
    multipart_table, test_client, mock_s3_setup, no_schema_inference,
)

(pytest picks up fixture-imports because they're just function re-exports; the fixtures are already defined at module scope in test_routes_multipart.py. If fixture collision happens, move the fixtures into conftest.py under tests/.)

  • Step 2: Run — expect fail
just test tests/test_routes_multipart_pipeline.py::TestCompleteEndpoint -v

Expected: status 200 (not 202), body missing completion_status.

  • Step 3: Update the endpoint

Replace the body of complete_multipart_upload() in src/routes/uploads.py:2131-2208:

@uploads_router.post(
    "/{orgid}/{collection_id}/multipart/{upload_id}/complete",
    tags=["Uploads", "Multipart"],
    status_code=202,
)
async def complete_multipart_upload(
    payload: MultipartCompleteRequest,
    background_tasks: BackgroundTasks,
    orgid: str = orgid_path_param,
    collection_id: str = collection_id_path_param,
    upload_id: str = Path(..., description="S3 multipart UploadId from /init"),
    _access=Depends(require_collection_permission("edit")),
) -> dict:
    """Finalize a multipart upload asynchronously.

    Returns 202 immediately. Background pipeline combines parts, infers the
    Parquet schema, and registers the dataset. Poll GET /multipart/{upload_id}
    until ``completion_status`` reaches ``registered`` or ``failed``.
    """
    try:
        uuid.UUID(orgid)
    except ValueError:
        raise HTTPException(status_code=400, detail="Invalid organization ID format. Must be a valid UUID.")

    sessions = MultipartUploadsClient()
    session = sessions.get_session(orgid=orgid, upload_id=upload_id)
    if not session:
        raise HTTPException(status_code=404, detail="Multipart upload session not found.")
    if session.get("collection_id") != collection_id:
        raise HTTPException(status_code=403, detail="Multipart session does not belong to this collection.")
    if session.get("status") != MULTIPART_STATUS_INITIATED:
        raise HTTPException(
            status_code=409,
            detail=f"Multipart session is in status '{session.get('status')}'; cannot complete.",
        )

    parts_payload = [
        {"ETag": ref.etag, "PartNumber": ref.part_number}
        for ref in sorted(payload.parts, key=lambda p: p.part_number)
    ]

    # Hand off to the background pipeline. The pipeline writes all further
    # state transitions (combining → combined → ... → registered|failed).
    sessions.set_completion_status(
        orgid=orgid, upload_id=upload_id,
        status=COMPLETION_STATUS_COMBINING, step="combine", phase="start",
    )
    background_tasks.add_task(
        _run_multipart_completion_pipeline,
        orgid=orgid,
        collection_id=collection_id,
        upload_id=upload_id,
        parts=parts_payload,
    )

    return {
        "upload_id": upload_id,
        "completion_status": COMPLETION_STATUS_COMBINING,
        "s3_bucket": session["s3_bucket"],
        "s3_key": session["s3_key"],
        "part_count": int(session.get("part_count") or 0),
        "message": "Combining parts. Poll GET /multipart/{upload_id} for progress.",
    }
  • Step 4: Run — expect pass
just test tests/test_routes_multipart_pipeline.py::TestCompleteEndpoint -v

Expected: PASS.

  • Step 5: Commit
git add src/routes/uploads.py src/tests/test_routes_multipart_pipeline.py
git commit -m "feat(ENG-1492): /complete returns 202 and hands off to background pipeline

Endpoint writes completion_status=combining, enqueues the pipeline, and
returns immediately. Avoids Cloudflare 60s cap on the synchronous
CompleteMultipartUpload call for 3 GB+ objects."

Task 9: Expand status endpoint response

Files:

  • Modify: src/routes/uploads.py:2264-2303.

  • Test: src/tests/test_routes_multipart_pipeline.py (append).

  • Step 1: Failing test

Append to src/tests/test_routes_multipart_pipeline.py:

class TestStatusEndpoint:
    def test_status_exposes_new_fields_after_registered(
        self, test_client, mock_s3_setup, mock_dynamodb_client, multipart_table
    ):
        # seed a row manually via the client
        sessions = MultipartUploadsClient()
        sessions.create_session(
            orgid=ORG_ID, upload_id="status-1", collection_id=COLLECTION_ID,
            s3_key="k", s3_bucket="b", filename="f.parquet",
            content_type="application/vnd.apache.parquet",
            part_size=8 * 1024 * 1024, part_count=1, total_size=1,
        )
        sessions.set_completion_status(
            orgid=ORG_ID, upload_id="status-1",
            status=COMPLETION_STATUS_REGISTERED, step="register", phase="end",
            extra={
                "dataset_id": "ds-xyz",
                "glue_table_name": "t_xyz",
                "glue_database_name": "capitol_datasets",
            },
        )
        resp = test_client.get(
            f"/public/uploads/{ORG_ID}/{COLLECTION_ID}/multipart/status-1",
            headers={"X-User-Token": "test-token"},
        )
        assert resp.status_code == 200
        body = resp.json()
        assert body["completion_status"] == "registered"
        assert body["dataset_id"] == "ds-xyz"
        assert body["glue_table_name"] == "t_xyz"
        assert body["glue_database_name"] == "capitol_datasets"
        assert isinstance(body["completion_steps_log"], list)
        assert body["completion_steps_log"][0]["step"] == "register"
  • Step 2: Run — expect fail
just test tests/test_routes_multipart_pipeline.py::TestStatusEndpoint -v

Expected: missing completion_status, dataset_id, etc.

  • Step 3: Update the handler

Replace the return dict in get_multipart_upload_status():

    return {
        "orgid": orgid,
        "collection_id": collection_id,
        "upload_id": upload_id,
        "status": session.get("status"),
        "completion_status": session.get("completion_status"),
        "s3_key": session.get("s3_key"),
        "s3_bucket": session.get("s3_bucket"),
        "filename": session.get("filename"),
        "part_count": session.get("part_count"),
        "part_size": session.get("part_size"),
        "total_size": session.get("total_size"),
        "created_at": session.get("created_at"),
        "updated_at": session.get("updated_at"),
        "completed_at": session.get("completed_at"),
        "parquet_schema_preview": session.get("parquet_schema_preview"),
        "schema_inferred_at": session.get("schema_inferred_at"),
        "dataset_id": session.get("dataset_id"),
        "glue_table_name": session.get("glue_table_name"),
        "glue_database_name": session.get("glue_database_name"),
        "completion_steps_log": session.get("completion_steps_log") or [],
        "failure_reason": session.get("failure_reason"),
        "error": session.get("error"),
    }
  • Step 4: Run — expect pass
just test tests/test_routes_multipart_pipeline.py::TestStatusEndpoint -v

Expected: PASS.

  • Step 5: Commit
git add src/routes/uploads.py src/tests/test_routes_multipart_pipeline.py
git commit -m "feat(ENG-1492): status endpoint exposes completion pipeline state

Adds completion_status, dataset_id, glue_table_name, glue_database_name,
completion_steps_log, failure_reason to GET /multipart/{upload_id}."

Task 10: CLI — poll completion_status instead of preview-only

Files:

  • Modify: src/utils/multipart_uploader.py:229-263.

  • Modify: src/cli.py — the upload-parquet command (around line 2144+).

  • Test: src/tests/test_cli_upload_parquet.py (new).

  • Step 1: Failing test

Create src/tests/test_cli_upload_parquet.py:

"""Integration test for the upload-parquet CLI command."""
from __future__ import annotations

import io
import os
import tempfile
from unittest.mock import MagicMock, patch

import pyarrow as pa
import pyarrow.parquet as pq
import pytest
from typer.testing import CliRunner

from cli import app


def _make_tiny_parquet(path: str) -> None:
    table = pa.table({"a": [1, 2, 3], "b": ["x", "y", "z"]})
    pq.write_table(table, path)


def test_upload_parquet_cli_polls_until_registered():
    runner = CliRunner()
    with tempfile.TemporaryDirectory() as tmp:
        parquet_path = os.path.join(tmp, "tiny.parquet")
        _make_tiny_parquet(parquet_path)

        init_response = {
            "upload_id": "cli-u-1",
            "s3_bucket": "b",
            "s3_key": "k",
            "part_size": 8 * 1024 * 1024,
            "part_count": 1,
            "total_size": os.path.getsize(parquet_path),
            "expires_in": 900,
            "part_urls": [{"part_number": 1, "url": "https://fake", "expires_in": 900}],
        }
        complete_response = {
            "upload_id": "cli-u-1",
            "completion_status": "combining",
            "s3_bucket": "b",
            "s3_key": "k",
            "part_count": 1,
        }
        status_sequence = [
            {"completion_status": "combining", "completion_steps_log": []},
            {"completion_status": "combined", "completion_steps_log": []},
            {
                "completion_status": "registered",
                "dataset_id": "ds-1",
                "glue_table_name": "t_1",
                "glue_database_name": "capitol_datasets",
                "completion_steps_log": [],
            },
        ]

        with patch("utils.multipart_uploader.httpx.Client") as client_cls:
            c = MagicMock()
            client_cls.return_value.__enter__.return_value = c
            # POST /init, PUT part, POST /complete, then N GETs
            c.post.side_effect = [
                MagicMock(status_code=200, json=lambda: init_response),
                MagicMock(status_code=202, json=lambda: complete_response),
            ]
            c.put.return_value = MagicMock(status_code=200, headers={"ETag": "e1"})
            c.get.side_effect = [
                MagicMock(status_code=200, json=lambda s=s: s) for s in status_sequence
            ]
            result = runner.invoke(
                app, [
                    "upload-parquet",
                    "--file", parquet_path,
                    "--api-base-url", "http://fake",
                    "--orgid", "550e8400-e29b-41d4-a716-446655440000",
                    "--collection-id", "coll-1",
                    "--token", "t",
                ],
            )
        assert result.exit_code == 0, result.stdout
        assert "registered" in result.stdout
        assert "ds-1" in result.stdout
  • Step 2: Run — expect fail
just test tests/test_cli_upload_parquet.py -v

Expected: missing polling of completion_status or missing --file flag (depending on current CLI shape).

  • Step 3: Update the uploader helper

Open src/utils/multipart_uploader.py. Replace poll_schema_preview with poll_completion_status:

def poll_completion_status(
    *,
    api_base_url: str,
    orgid: str,
    collection_id: str,
    upload_id: str,
    token: str,
    token_header: str = "X-User-Token",
    timeout_seconds: float = 600.0,
    poll_interval_seconds: float = 2.0,
    on_status: Optional[Callable[[Dict[str, Any]], None]] = None,
) -> Dict[str, Any]:
    """Poll the status endpoint until completion_status is terminal.

    Returns the final status payload. Raises MultipartUploadError if the
    terminal state is "failed" or if the deadline elapses.
    """
    import time

    api_base_url = api_base_url.rstrip("/")
    url = f"{api_base_url}/public/uploads/{orgid}/{collection_id}/multipart/{upload_id}"
    headers = _auth_headers(token, token_header)
    deadline = time.monotonic() + timeout_seconds
    terminal = {"registered", "failed"}

    with httpx.Client(timeout=timeout_seconds) as client:
        last: Dict[str, Any] = {}
        while time.monotonic() < deadline:
            response = client.get(url, headers=headers)
            if response.status_code >= 400:
                raise MultipartUploadError(
                    f"GET {url} failed ({response.status_code}): {response.text}"
                )
            last = response.json()
            if on_status:
                on_status(last)
            if last.get("completion_status") in terminal:
                if last.get("completion_status") == "failed":
                    reason = last.get("failure_reason") or {}
                    raise MultipartUploadError(
                        f"Upload {upload_id} failed at step={reason.get('step')!r}: {reason.get('exc_str')}"
                    )
                return last
            time.sleep(poll_interval_seconds)
    raise MultipartUploadError(
        f"Upload {upload_id} did not reach terminal state within {timeout_seconds}s "
        f"(last completion_status={last.get('completion_status')!r})"
    )


# Backwards-compat alias for callers that only want the preview.
def poll_schema_preview(
    *,
    api_base_url: str,
    orgid: str,
    collection_id: str,
    upload_id: str,
    token: str,
    token_header: str = "X-User-Token",
    timeout_seconds: float = 60.0,
    poll_interval_seconds: float = 1.5,
) -> Optional[Dict[str, Any]]:
    try:
        payload = poll_completion_status(
            api_base_url=api_base_url,
            orgid=orgid, collection_id=collection_id, upload_id=upload_id,
            token=token, token_header=token_header,
            timeout_seconds=timeout_seconds,
            poll_interval_seconds=poll_interval_seconds,
        )
    except MultipartUploadError:
        return None
    return payload.get("parquet_schema_preview")

Open src/cli.py and find the upload-parquet command. After the existing upload_parquet_multipart() call returns the UploadResult, replace the polling block with:

    from utils.multipart_uploader import poll_completion_status

    def _print_status(payload: Dict[str, Any]) -> None:
        typer.echo(f"  completion_status={payload.get('completion_status')}")

    final = poll_completion_status(
        api_base_url=api_base_url,
        orgid=orgid,
        collection_id=collection_id,
        upload_id=result.upload_id,
        token=token,
        on_status=_print_status,
        timeout_seconds=600,
    )
    typer.echo(f"Dataset registered: id={final.get('dataset_id')} table={final.get('glue_table_name')}")
  • Step 4: Run — expect pass
just test tests/test_cli_upload_parquet.py -v

Expected: PASS.

  • Step 5: Commit
git add src/utils/multipart_uploader.py src/cli.py src/tests/test_cli_upload_parquet.py
git commit -m "feat(ENG-1492): CLI polls completion_status until registered

poll_completion_status waits for terminal state (registered|failed) and
raises on failure with the step + exception string. Old
poll_schema_preview retained as compat alias."

Task 11: FE compat shim — handle 202 + poll

Repo: cap/platform-frontend.

Files:

  • Modify: src/services/data-sources/multipart-upload.ts.

  • Step 1: Update completeMultipart

Open the file and find the existing completeMultipart function. Replace its body:

async function completeMultipart(params: {
  orgid: string;
  collectionId: string;
  uploadId: string;
  parts: Array<{ part_number: number; etag: string }>;
  signal?: AbortSignal;
}): Promise<MultipartCompletionResult> {
  const res = await apiRequest(
    `/public/uploads/${params.orgid}/${params.collectionId}/multipart/${params.uploadId}/complete`,
    { method: "POST", body: JSON.stringify({ parts: params.parts }) },
  );
  if (res.status !== 202 && res.status !== 200) {
    throw new MultipartUploadError(`complete failed: ${res.status} ${await res.text()}`);
  }
  const completeBody = await res.json();
  // Poll for terminal completion_status.
  return await pollCompletionStatus({
    orgid: params.orgid,
    collectionId: params.collectionId,
    uploadId: params.uploadId,
    signal: params.signal,
  });
}

async function pollCompletionStatus(params: {
  orgid: string;
  collectionId: string;
  uploadId: string;
  signal?: AbortSignal;
  timeoutMs?: number;
  intervalMs?: number;
}): Promise<MultipartCompletionResult> {
  const deadline = Date.now() + (params.timeoutMs ?? 10 * 60 * 1000);
  const interval = params.intervalMs ?? 2000;
  while (Date.now() < deadline) {
    if (params.signal?.aborted) throw new DOMException("aborted", "AbortError");
    const res = await apiRequest(
      `/public/uploads/${params.orgid}/${params.collectionId}/multipart/${params.uploadId}`,
      { method: "GET" },
    );
    if (!res.ok) throw new MultipartUploadError(`status fetch failed: ${res.status}`);
    const body = await res.json();
    if (body.completion_status === "registered") {
      return {
        uploadId: params.uploadId,
        datasetId: body.dataset_id,
        glueTableName: body.glue_table_name,
        glueDatabaseName: body.glue_database_name,
        schemaPreview: body.parquet_schema_preview,
      };
    }
    if (body.completion_status === "failed") {
      const reason = body.failure_reason ?? {};
      throw new MultipartUploadError(
        `upload failed at step=${reason.step}: ${reason.exc_str}`,
      );
    }
    await new Promise((r) => setTimeout(r, interval));
  }
  throw new MultipartUploadError(
    "upload completed but processing took longer than expected; refresh to see the dataset when it appears.",
  );
}

Add the new type next to other exports:

export interface MultipartCompletionResult {
  uploadId: string;
  datasetId?: string;
  glueTableName?: string;
  glueDatabaseName?: string;
  schemaPreview?: unknown;
}
  • Step 2: Build and lint
cd /Users/admin/dev/cap/platform-frontend
just build || yarn build

Expected: compiles green.

  • Step 3: Commit
git add src/services/data-sources/multipart-upload.ts
git commit -m "feat(ENG-1492): handle 202 Accepted + poll completion_status

Compat shim for the async /complete endpoint. No new UI — Plan 2 adds
byte-progress, parallel parts, cancel, and a step-aware progress modal."

Task 12: Runbook documentation

Files:

  • Create: docs/structured-collections/multipart-upload-runbook.md.

  • Step 1: Write the runbook

# Runbook: Direct Parquet Upload (ENG-1492)

Staging QA procedure for validating the direct Parquet upload path end-to-end.

## Prereqs

- Staging frontend + platform-api deployed from the feature branch.
- Access to the structured collection "QA Medicaid" (create via UI if absent).
- Local copy of `medicaid-provider-spending.parquet` (3.1 GB, 238M rows).
- AWS credentials with Athena + Glue read.

## Steps

1. Open the collection in the UI: `https://app-staging.capitol.ai/collections/{id}`.
2. Drag `medicaid-provider-spending.parquet` into the upload zone.
3. Browser DevTools → Network. Confirm:
   - One POST to `/public/uploads/{orgid}/{coll}/multipart/init` (200).
   - ~387 PUTs to `https://<bucket>.s3.amazonaws.com/...` (each 200).
   - One POST to `/multipart/{upload_id}/complete` — status code **202**.
4. Observe polling GETs to `/multipart/{upload_id}`. Statuses should appear in order:
   `combining``combined``inferring``inferred``registering``registered`.
5. On `registered`, refresh the collection detail page. Dataset shows 7 columns and row_count ≈ 238,015,729.
6. Run the following Athena query (workgroup: `staging`):

   ```sql
   SELECT COUNT(*) FROM capitol_datasets.t_<first 8 chars of dataset id>_medicaid_provider_spending;

Expected: 238,015,729. 7. Verify the multipart_uploads_v1 row has completion_status=registered, dataset_id set, completion_steps_log with 3 entries, and ttl roughly 30 days in the future.

Rollback / cleanup

  • Abort an in-flight upload: DELETE /multipart/{upload_id} — row status becomes aborted, 30d TTL.
  • Orphan parts: S3 lifecycle rule abort-incomplete-multipart-uploads expires them after 7 days.
  • Failed row: inspect failure_reason.step + failure_reason.exc_str before retrying a fresh /init.

Known issues

  • Polling from the FE currently does not show byte-level progress during the PUT phase (Plan 2).
  • No in-UI cancel during part upload (Plan 2 — use Network tab "Stop request" as a workaround).

- [ ] **Step 2: Commit**

```bash
git add docs/structured-collections/multipart-upload-runbook.md
git commit -m "docs(ENG-1492): add multipart upload runbook for staging QA"

Task 13: Full regression run + PR

  • Step 1: Run full test suite
just test-all

Expected: all tests PASS.

  • Step 2: Push branch
git push -u origin feature/ENG-1492-s3-multipart
  • Step 3: Open PR
gh pr create --title "ENG-1492: direct Parquet upload — full dataset integration" --body "$(cat <<'EOF'
## Summary

Extends PR #448 so multipart Parquet uploads become first-class datasets.

- `/complete` returns 202 + background pipeline (combine → infer → register)
- `completion_status` state machine on `multipart_uploads_v1`
- DDB TTL retention (30d registered/aborted, 90d failed, 7d orphan)
- `register_parquet_source` now runs Glue registration + returns table names
- `collection.config.datasets[]` gains `source_type: "multipart_parquet_upload"` entry on success
- CLI + FE compat shim poll `completion_status` until terminal

Spec: `docs/superpowers/specs/2026-05-09-eng-1492-direct-parquet-upload.md`
Runbook: `docs/structured-collections/multipart-upload-runbook.md`

Requires terraform PR (same branch name in cap/terraform) merged + applied first.

## Test plan

- [ ] `just test-all` green on CI
- [ ] Staging QA per runbook (medicaid 3.1 GB file)
- [ ] Sentry during QA: no new issues
EOF
)"

Self-Review Notes

  • Spec §1 problem, §2 goals/non-goals, §3 approach, §4 endpoint contract, §5 pipeline, §6 data model, §6a TTL, §7 error handling, §8 terraform, §9 testing, §10 CLI + FE shim, §11 out-of-scope, §12 rollout — each has a task (Tasks 1–13).
  • No placeholders in code blocks (checked).
  • Type names consistent: _run_multipart_completion_pipeline, set_completion_status, record_failure_reason, _append_dataset_to_collection, _rollback_s3_object, _rollback_glue_and_schema, poll_completion_status. Status enum strings match spec §4 table.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment