Skip to content

Instantly share code, notes, and snippets.

@mmautner
Last active July 11, 2023 17:41

Revisions

  1. mmautner revised this gist Aug 19, 2014. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions redshift.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,4 @@
    import uuid
    import psycopg2
    from secret import REDSHIFT_CREDS
    from secret import AWS_ACCESS_KEY, AWS_SECRET_KEY
  2. mmautner created this gist Aug 19, 2014.
    53 changes: 53 additions & 0 deletions redshift.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,53 @@
    import psycopg2
    from secret import REDSHIFT_CREDS
    from secret import AWS_ACCESS_KEY, AWS_SECRET_KEY

    def get_primary_keys(tablename, db):
    c = db.cursor()
    sql = "select indexdef from pg_indexes where tablename = '%s';" % tablename
    c.execute(sql)
    result = c.fetchall()[0][0]
    rfields = result.split('(')[1].strip(')').split(',')
    fields = [field.strip().strip('"') for field in rfields]
    return fields

    def load_s3_file_into_redshift(s3path, tablename, delimiter='|'):
    """File must be gzipped:
    s3path on Amazon S3 - (str)
    tablename - (str)
    field delimiter - (str)"""

    temp_tablename = 'temp_%s' % uuid.uuid4().get_hex()

    db = psycopg2.connect(**REDSHIFT_CREDS)

    primary_keys = get_primary_keys(tablename, db)
    equals_clause = '{dest}.%s = {src}.%s'
    join_clause = ' AND '.join([equals_clause % (pk, pk) for pk in primary_keys])
    join_clause = join_clause.format(dest=tablename, src=temp_tablename)

    upsert_qry = """\
    CREATE TEMPORARY TABLE {src} (LIKE {dest});
    COPY {src} FROM '{s3path}'
    CREDENTIALS 'aws_access_key_id={access_key};aws_secret_access_key={secret_key}'
    DELIMITER '{delimiter}' gzip;
    BEGIN;
    LOCK {dest};
    DELETE FROM {dest} USING {src} WHERE {join_clause};
    INSERT INTO {dest} SELECT * FROM {src};
    END;
    """.format(dest=tablename,
    src=temp_tablename,
    s3path=s3path,
    join_clause=join_clause,
    access_key=AWS_ACCESS_KEY,
    secret_key=AWS_SECRET_KEY,
    delimiter=delimiter)

    c = db.cursor()
    c.execute(upsert_qry)
    db.commit()
    return s3path