Skip to content

Instantly share code, notes, and snippets.

@asaf400
Last active May 10, 2026 13:48
Show Gist options
  • Select an option

  • Save asaf400/71f2882fc46bc6dee95ae3bc7d1ed54a to your computer and use it in GitHub Desktop.

Select an option

Save asaf400/71f2882fc46bc6dee95ae3bc7d1ed54a to your computer and use it in GitHub Desktop.
Pulumi Provider Implementation of AWS S3 batch replication: https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-batch-replication-batch.html
"""
S3 Batch Replication Job Provider (Pulumi Dynamic Resource)
This module provides a Pulumi dynamic resource to create and manage AWS S3 Batch Replication
jobs using the S3 Control CreateJob API via Boto3.
While standard "live" replication rules only apply to newly created objects, S3 Batch Replication
allows you to backfill and replicate existing objects.
Reference: https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-batch-replication-batch.html
Note: SSE-KMS or cross-account replication setups may require additional kms:* or bucket-policy
statements beyond the helper functions provided in this module.
"""
from __future__ import annotations
import hashlib
import json
import logging
from typing import Any, Dict, List, Optional
import boto3
import pulumi
import pulumi.dynamic as dynamic
import pulumi_aws as aws
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
# -------------------------------------------------------------------------
# Helper Functions & Constants
# -------------------------------------------------------------------------
# Keys that are considered "inputs" for the Pulumi resource state.
# We explicitly define these so that during a Pulumi `diff`, we exclude
# derived outputs (like job_id or job_status) from triggering unnecessary replacements.
_INPUT_KEYS = frozenset(
{
"account_id",
"client_request_token_seed",
"description",
"eligible_for_replication",
"enable_manifest_output",
"expected_bucket_owner_report",
"expected_bucket_owner_source",
"manifest_output_bucket",
"manifest_output_expected_bucket_owner",
"manifest_output_format",
"manifest_output_prefix",
"manifest_output_sse_kms_key_id",
"object_replication_statuses",
"priority",
"report_bucket",
"report_enabled",
"report_format",
"report_prefix",
"report_scope",
"role_arn",
"source_bucket_arn",
"source_region",
"tags",
}
)
def _control_api_bucket_arn(bucket: str) -> str:
"""
Ensures the bucket is formatted as a full ARN.
The S3 Control CreateJob API strictly requires bucket ARNs for fields like
JobReport.Bucket and ManifestOutputLocation.Bucket, rather than bare bucket names.
"""
b = bucket.strip()
if b.startswith("arn:"):
return b
return f"arn:aws:s3:::{b}"
def _client_request_token(seed: str) -> str:
"""Generates a deterministic 64-character token to ensure idempotency in CreateJob calls."""
digest = hashlib.sha256(seed.encode("utf-8")).hexdigest()
return digest[:64]
def _s3control_client(region: str):
"""Initializes and returns a boto3 client for S3 Control in the specified region."""
return boto3.client("s3control", region_name=region)
def _json_normalize(value: Any) -> str:
"""Normalizes JSON values for accurate comparison during the Pulumi diff phase."""
if value is None:
return "null"
try:
return json.dumps(value, sort_keys=True, default=str)
except TypeError:
# Fallback for non-serializable objects
return json.dumps(str(value))
# -------------------------------------------------------------------------
# IAM Policy Helpers
# -------------------------------------------------------------------------
def batch_operations_trust_policy_json() -> str:
"""
Generates the IAM trust policy document required for S3 Batch Operations.
Allows the batch operations service to assume the execution role.
"""
return aws.iam.get_policy_document(
statements=[
{
"effect": "Allow",
"principals": [{"type": "Service", "identifiers": ["batchoperations.s3.amazonaws.com"]}],
"actions": ["sts:AssumeRole"],
}
]
).json
def batch_replication_job_execution_policy_statements(
source_bucket_name: str,
destination_bucket_name: str,
report_bucket_name: str,
) -> List[Dict[str, Any]]:
"""
Generates inline policy statements needed by the IAM role executing the batch replication.
Includes permissions for reading source objects, generating manifests, replicating objects,
and writing the final completion report.
"""
src_arn = f"arn:aws:s3:::{source_bucket_name}"
dest_arn = f"arn:aws:s3:::{destination_bucket_name}"
report_arn = f"arn:aws:s3:::{report_bucket_name}"
return [
{
"sid": "BatchReplicationSourceBucket",
"effect": "Allow",
"actions": [
"s3:GetReplicationConfiguration",
"s3:ListBucket",
"s3:PutInventoryConfiguration", # Required for manifest preparation
],
"resources": [src_arn],
},
{
"sid": "BatchReplicationSourceObjects",
"effect": "Allow",
"actions": [
"s3:InitiateReplication",
"s3:GetObject",
"s3:GetObjectVersion",
"s3:GetObjectVersionForReplication",
"s3:GetObjectVersionAcl",
"s3:GetObjectVersionTagging",
],
"resources": [f"{src_arn}/*"],
},
{
"sid": "BatchReplicationDestinationBucket",
"effect": "Allow",
"actions": ["s3:PutInventoryConfiguration", "s3:ListBucket"],
"resources": [dest_arn],
},
{
"sid": "BatchReplicationDestinationObjects",
"effect": "Allow",
"actions": [
"s3:ReplicateObject",
"s3:ReplicateDelete",
"s3:ReplicateTags",
"s3:GetObjectVersionTagging",
"s3:ObjectOwnerOverrideToBucketOwner",
],
"resources": [f"{dest_arn}/*"],
},
{
"sid": "BatchOperationsReport",
"effect": "Allow",
"actions": ["s3:PutObject", "s3:GetBucketLocation", "s3:GetBucketVersioning"],
"resources": [report_arn, f"{report_arn}/*"],
},
]
# -------------------------------------------------------------------------
# Request Builders
# -------------------------------------------------------------------------
def _build_manifest_generator(props: Dict[str, Any]) -> Dict[str, Any]:
"""
Constructs the ManifestGenerator payload for the S3 Control API.
Configures how S3 should build the list of objects to replicate.
"""
filt: Dict[str, Any] = {}
# By default, only replicate objects that have never been replicated or failed previously.
statuses = props.get("object_replication_statuses") or ["NONE", "FAILED"]
filt["ObjectReplicationStatuses"] = statuses
# 'EligibleForReplication' must be True for S3ReplicateObject operations.
filt["EligibleForReplication"] = True
gen: Dict[str, Any] = {
"SourceBucket": props["source_bucket_arn"],
"EnableManifestOutput": bool(props.get("enable_manifest_output", False)),
"Filter": filt,
}
if expected_owner := props.get("expected_bucket_owner_source"):
gen["ExpectedBucketOwner"] = str(expected_owner)
# Configure where the generated manifest should be saved if output is enabled.
if gen["EnableManifestOutput"]:
mob = props.get("manifest_output_bucket") or props["report_bucket"]
mol: Dict[str, Any] = {
"Bucket": _control_api_bucket_arn(str(mob)),
"ManifestPrefix": str(props.get("manifest_output_prefix", "batch-manifests/")),
"ManifestFormat": str(props.get("manifest_output_format", "S3InventoryReport_CSV_20211130")),
}
if mobo := props.get("manifest_output_expected_bucket_owner"):
mol["ExpectedManifestBucketOwner"] = str(mobo)
# Handle SSE encryption for the manifest
if sse_kms := props.get("manifest_output_sse_kms_key_id"):
mol["ManifestEncryption"] = {"SSEKMS": {"KeyId": str(sse_kms)}}
else:
mol["ManifestEncryption"] = {"SSES3": {}}
gen["ManifestOutputLocation"] = mol
return {"S3JobManifestGenerator": gen}
def _build_report(props: Dict[str, Any]) -> Dict[str, Any]:
"""
Constructs the Report configuration payload.
Configures where and how the final job completion report is delivered.
"""
rep: Dict[str, Any] = {
"Bucket": _control_api_bucket_arn(str(props["report_bucket"])),
"Enabled": bool(props.get("report_enabled", True)),
"Format": str(props.get("report_format", "Report_CSV_20180820")),
"Prefix": str(props.get("report_prefix", "s3-batch-operations-reports")),
"ReportScope": str(props.get("report_scope", "AllTasks")),
}
if ebo := props.get("expected_bucket_owner_report"):
rep["ExpectedBucketOwner"] = str(ebo)
return rep
# -------------------------------------------------------------------------
# Pulumi Dynamic Provider Implementation
# -------------------------------------------------------------------------
class S3BatchReplicationJobProvider(dynamic.ResourceProvider):
"""
The implementation of the Pulumi CRUD operations for an S3 Batch Replication Job.
"""
def create(self, props: Dict[str, Any]) -> dynamic.CreateResult:
"""Creates the batch job in AWS using the provided properties."""
props = dict(props)
# Setup idempotency token
seed = str(props.get("client_request_token_seed", "job"))
token = _client_request_token(seed)
props["client_request_token"] = token
client = _s3control_client(props["source_region"])
# Construct the API payload
req: Dict[str, Any] = {
"AccountId": props["account_id"],
"ConfirmationRequired": False, # Start immediately upon creation
"Operation": {"S3ReplicateObject": {}},
"ManifestGenerator": _build_manifest_generator(props),
"Report": _build_report(props),
"Priority": int(props.get("priority", 10)),
"RoleArn": props["role_arn"],
"ClientRequestToken": token,
}
if desc := props.get("description"):
req["Description"] = str(desc)[:256]
if tags := props.get("tags"):
req["Tags"] = tags
# Execute API call
try:
resp = client.create_job(**req)
except ClientError as e:
err = e.response.get("Error", {})
rid = e.response.get("ResponseMetadata", {}).get("RequestId", "")
raise RuntimeError(
f"S3 CreateJob failed: {err.get('Code', '')}: {err.get('Message', e)} (RequestId={rid})"
) from e
job_id = resp["JobId"]
# Fetch initial status
dj = client.describe_job(AccountId=props["account_id"], JobId=job_id)
status = dj["Job"]["Status"]
outs = {
**props,
"job_id": job_id,
"job_status": status,
}
return dynamic.CreateResult(id_=job_id, outs=outs)
def diff(self, _id: str, olds: Dict[str, Any], news: Dict[str, Any]) -> dynamic.DiffResult:
"""
Determines if the resource needs to be updated.
S3 Batch Jobs are immutable, so any change in inputs forces a replacement.
"""
replaces: List[str] = []
for k in _INPUT_KEYS:
o = olds.get(k)
n = news.get(k)
if _json_normalize(o) != _json_normalize(n):
replaces.append(k)
return dynamic.DiffResult(changes=bool(replaces), replaces=replaces or None)
def delete(self, _id: str, props: Dict[str, Any]) -> None:
"""
Attempts to cancel the job when the Pulumi resource is deleted.
Completed jobs cannot be cancelled, so exceptions related to invalid states are ignored.
"""
client = _s3control_client(props["source_region"])
try:
client.update_job_status(
AccountId=props["account_id"],
JobId=_id,
RequestedJobStatus="Cancelled",
StatusUpdateReason="Cancelled due to Pulumi resource deletion",
)
except ClientError as e:
code = e.response.get("Error", {}).get("Code", "")
# Ignore errors if the job is already finished, cancelled, or doesn't exist
if code not in (
"JobStatusException",
"InvalidRequestException",
"InvalidJobState",
"NoSuchJob",
):
logger.warning("S3 Batch update_job_status(Cancelled) failed for %s: %s", _id, e)
def read(self, id_: str, props: Dict[str, Any]) -> dynamic.ReadResult:
"""Reads the current state of the job from AWS."""
client = _s3control_client(props["source_region"])
try:
resp = client.describe_job(AccountId=props["account_id"], JobId=id_)
except ClientError as e:
if e.response.get("Error", {}).get("Code") == "NoSuchJob":
raise RuntimeError(f"S3 Batch job not found: {id_}") from e
raise
status = resp["Job"]["Status"]
merged = {**props, "job_id": id_, "job_status": status}
return dynamic.ReadResult(id_=id_, props=merged)
def update(self, _id: str, _olds: Dict[str, Any], _news: Dict[str, Any]) -> dynamic.UpdateResult:
"""Raises an error because S3 Batch Jobs cannot be updated in place."""
raise NotImplementedError("S3 Batch jobs are immutable; modify inputs to trigger a replacement via diff.")
# -------------------------------------------------------------------------
# Pulumi Resource Component
# -------------------------------------------------------------------------
class S3BatchReplicationJob(dynamic.Resource):
"""
Pulumi resource wrapping an S3 Batch Replication job.
Required Properties:
- account_id: AWS Account ID
- source_region: Region where the API calls should be routed
- source_bucket_arn: ARN of the bucket containing objects to replicate
- report_bucket: Bucket to store completion reports
- role_arn: IAM role assumed by S3 to perform the batch job
"""
def __init__(
self,
name: str,
props: Dict[str, Any],
opts: Optional[pulumi.ResourceOptions] = None,
) -> None:
super().__init__(S3BatchReplicationJobProvider(), name, props, opts)
def create_s3_batch_replication_job(
name: str,
props: Dict[str, Any],
opts: Optional[pulumi.ResourceOptions] = None,
) -> S3BatchReplicationJob:
"""Factory function to easily instantiate the S3 Batch Replication job resource."""
return S3BatchReplicationJob(name, props, opts)
@asaf400
Copy link
Copy Markdown
Author

asaf400 commented May 10, 2026

Usage snippet:

from s3_batch_replication_job import (
    S3BatchReplicationJob,
    batch_operations_trust_policy_json,
    batch_replication_job_execution_policy_statements,
)
_caller_identity = aws.get_caller_identity()
pulumi.log.info(
    f"S3 Batch Replication ({src} -> {dest}): creating dedicated batchoperations IAM Role "
    f"(set batch_operations_role_arn on this replication entry to use an existing role instead)."
)
batch_role = aws.iam.Role(
    f"s3-batch-repl-{logical_suffix}-role",
    assume_role_policy=batch_operations_trust_policy_json(),
    name=f"s3-batch-repl-{logical_suffix}"[:64],
)
exec_policy = aws.iam.get_policy_document(
    statements=batch_replication_job_execution_policy_statements(src, dest, report_bn)
)
aws.iam.RolePolicy(
    f"s3-batch-repl-{logical_suffix}-exec",
    role=batch_role.id,
    policy=exec_policy.json,
)

def make_managed_job(
    args: list[Any],
    *,
    ls=logical_suffix,
    rc=repl_cfg,
    br=batch_role,
    bpf=build_batch_props,
):
    return S3BatchReplicationJob(
        f"s3-batch-repl-{ls}",
        bpf(args[0], args[1]),
        opts=pulumi.ResourceOptions(depends_on=[rc, br]),
    )

batch_job = pulumi.Output.all(_caller_identity.account_id, batch_role.arn).apply(
    make_managed_job
)

batch_replication_job_ids[logical_suffix] = batch_job.apply(lambda j: j.id)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment