Skip to content

Instantly share code, notes, and snippets.

@jeanphix
Created June 17, 2015 06:20
Show Gist options
  • Save jeanphix/53af6d5912fd6cc1f22f to your computer and use it in GitHub Desktop.
Save jeanphix/53af6d5912fd6cc1f22f to your computer and use it in GitHub Desktop.
alembic check_revision
# -*- coding: utf-8 -*-
import os
import re
import subprocess
from copy import copy
from difflib import unified_diff
from sqlalchemy import create_engine
from alembic.script import ScriptDirectory
from alembic.environment import MigrationContext
from alembic.operations import Operations
alembic_version_table = 'alembic_version'
script = ScriptDirectory(os.path.dirname(__file__))
def order_columns(sql):
r = re.compile(r'CREATE TABLE ?["a-z_]*? \s*\([^;]+\);', re.MULTILINE)
for match in r.findall(sql):
match_lines = match.splitlines()
sorted_lines = copy(match_lines)
sorted_lines = sorted_lines[1:-1]
sorted_lines[-1] = "%s," % sorted_lines[-1]
sorted_lines.sort()
sorted_lines.insert(0, match_lines[0])
sorted_lines.append(match_lines[-1])
sql = sql.replace("\n".join(match_lines), "\n".join(sorted_lines))
return sql
def validate_line(line):
excludes = ('-', 'COPY', '\\', 'SELECT')
if len(line) == 1:
return False
for exclude in excludes:
if line.startswith(exclude):
return False
return True
def dump_sql_create(engine):
url = engine.url
sql = subprocess.check_output([
'pg_dump',
url.database,
'-U',
url.username,
'-h',
url.host,
'-x',
'-O',
'-T',
alembic_version_table,
])
sql = order_columns(sql)
lines = []
for line in sql.splitlines(1):
if validate_line(line):
lines.append(line)
return lines
def get_context(connection, revision='head'):
def upgrade(rev, context):
return script._upgrade_revs(revision, rev)
return MigrationContext.configure(
connection, opts=dict(
script=script,
destination_rev=revision, fn=upgrade
)
)
def run_migrations(context):
with Operations.context(context):
context.run_migrations()
def migrate(connection, revision):
"""Upgrades database to given `revision`.
:param Connection connection: The SQLAlchemy connection.
:param str revision: An optional revision.
"""
context = get_context(connection, revision)
run_migrations(context)
def check_revision(
migrated_database_uri,
from_meta_database_uri,
metadata,
revision,
):
"""Checks if upgrading to given revision produces expected
schema by comparing to metadata.
:param string migrated_database_uri: The database uri where migrations
are ran in order to compare schemas.
:param string from_meta_database_uri: The database uri where tables are
built from metadata.
:param MetaData metadata: The sqlalchemy metadata.
:param string revision: The revision to check.
"""
migrated_engine = create_engine(migrated_database_uri)
from_meta_engine = create_engine(from_meta_database_uri)
# Empties databases
metadata.drop_all(migrated_engine)
migrated_engine.connect().execute(
'drop table if exists %s;' % alembic_version_table)
metadata.drop_all(from_meta_engine)
# Builds from meta
from_meta_engine.connect().execute(
'create extension if not exists hstore; commit'
)
metadata.create_all(from_meta_engine)
from_meta_sql = dump_sql_create(from_meta_engine)
# Builds from migrations
connection = migrated_engine.connect()
migrate(connection, revision)
migrated_sql = dump_sql_create(migrated_engine)
return [line for line in unified_diff(from_meta_sql,
migrated_sql, fromfile='metadata',
tofile='migrations')]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment