Skip to content

Instantly share code, notes, and snippets.

@sharkinsspatial
Created December 18, 2023 19:34
Show Gist options
  • Save sharkinsspatial/e7253aa82b0ddee8607a09d0a8134c28 to your computer and use it in GitHub Desktop.
Save sharkinsspatial/e7253aa82b0ddee8607a09d0a8134c28 to your computer and use it in GitHub Desktop.
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.io.gcp import gcsio
# Define your GCS source and destination buckets and prefixes
source_bucket = 'your-source-bucket'
destination_bucket = 'your-destination-bucket'
source_prefix = 'source-directory/'
destination_prefix = 'destination-directory/'
# Custom ParDo function to copy objects from source to destination
class CopyObjects(beam.DoFn):
def process(self, element):
# element is the path of an object in the source bucket
source_path = f'gs://{source_bucket}/{element}'
destination_path = f'gs://{destination_bucket}/{destination_prefix}{element[len(source_prefix):]}'
gcs = gcsio.GcsIO()
with gcs.open(source_path) as source_file, gcs.open(destination_path, 'w') as destination_file:
destination_file.write(source_file.read())
# Create a Pipeline
with beam.Pipeline() as p:
source_objects = (
p
| "ListSourceObjects" >> beam.io.gcp.gcsio.ListObjects(source_bucket, prefix=source_prefix)
| "CopyObjects" >> beam.ParDo(CopyObjects())
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment