Skip to content

Instantly share code, notes, and snippets.

@h4
Last active June 24, 2025 13:37
Show Gist options
  • Save h4/fc9b6d350544ff66491308b535762fee to your computer and use it in GitHub Desktop.
Save h4/fc9b6d350544ff66491308b535762fee to your computer and use it in GitHub Desktop.
Setup alembic to work properly with PostgreSQL schemas
from __future__ import with_statement
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
from models import Base
config = context.config
fileConfig(config.config_file_name)
"""
Load models metadata. We should define schema in this class firstly,
or set schema implicit with `__table_args__ = {'schema' : 'test'}` in model class
"""
target_metadata = Base.metadata
def run_migrations_offline():
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
with connectable.connect() as connection:
"""
Configure migration context
1. Pass our models metadata
2. Set schema for alembic_version table
3. Load all available schemas
"""
context.configure(
connection=connection,
target_metadata=target_metadata,
version_table_schema=target_metadata.schema,
include_schemas=True
)
with context.begin_transaction():
"""
By default search_path is setted to "$user",public
that why alembic can't create foreign keys correctly
"""
context.execute('SET search_path TO public')
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
@Ilyes-git
Copy link

Ilyes-git commented Feb 16, 2024

After spending a lot of time struggling to get my alembic to work, with a schema_name other than "public", I've come up with the following solution:
[The only solution that worked for me on February 16, 2024]

def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)

        connection.execute(text('set search_path to "%s"' % settings.postgres_db_schema)) #  <-- The magic line

        with context.begin_transaction():
            context.run_migrations()

Hope this helps

@thiagoolsilva
Copy link

After spending a lot of time struggling to get my alembic to work, with a schema_name other than "public", I've come up with the following solution: [The only solution that worked for me on February 16, 2024]

def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)

        connection.execute(text('set search_path to "%s"' % settings.postgres_db_schema)) #  <-- The magic line

        with context.begin_transaction():
            context.run_migrations()

Hope this helps

This solution works for me. Thanks to share it!

@Jamim
Copy link

Jamim commented Jan 18, 2025

        connection.execute(text('set search_path to "%s"' % settings.postgres_db_schema)) #  <-- The magic line

        with context.begin_transaction():
            context.run_migrations()

Hope this helps

Thank you, @Ilyes-git! This definitely helps a lot! 🙇🏼

This solution, in case your app uses a single custom schema, allows you to avoid adding schema configuration to each model in the application.

However, there's a minor thing that can be improved here. Although it works perfectly fine with psycopg, when you use asyncpg the whole migration rolls back since, I assume, connection.execute implicitly starts a transaction. So it's worth moving the magic line under context.run_migrations() just like this:

with context.begin_transaction():
    context.execute(text(f'SET search_path TO {settings.db_schema}'))
    context.run_migrations()

And it fixes the issue for asyncpg.

@farzbood
Copy link

farzbood commented May 6, 2025

I'm having a concern with the odd behavior of the alembic revision --autogenerate that doesn't respect the version_table_schema argument of context.configure() ,that sets to db_schema so detect the version_table as an external table and add a drop_table command for that in the initial migration script (the genesis)!
I came across some discussion threads, but no resolution for that (like its kind of a technical debt). anyone has any idea of the fix?

@rafay-ceoai
Copy link

A working solution for me with async pg:

My base.py

"""This module contains the base class for all models in the application."""

from sqlalchemy.ext.declarative import declarative_base
from app.core.config import settings

class Base:
    __table_args__ = {"schema": settings.SCHEMA}

Base = declarative_base(cls=Base)

My env.py

import asyncio
from logging.config import fileConfig

from sqlalchemy import pool, text
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config

from alembic import context
from app.core.config import settings
from app.db.models import *
from app.db.base import Base


# Database Schmea for SecurityBot
SCHEMA = settings.SCHEMA  # <<<<<<<< THIS COMES FORM MY CONFIG SYSTEM config.py

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


# update database URL
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)

def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def do_run_migrations(connection: Connection) -> None:
    context.configure(connection=connection, target_metadata=target_metadata)

    with context.begin_transaction():
        # Set search path to your schema
        context.execute(text(f'CREATE SCHEMA IF NOT EXISTS {SCHEMA}'))
        # Create schema if it doesn't exist
        context.execute(text(f'SET search_path TO {SCHEMA}, public'))
        context.run_migrations()


async def run_async_migrations() -> None:
    """In this scenario we need to create an Engine
    and associate a connection with the context.

    """

    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()


def run_migrations_online() -> None:
    """Run migrations in 'online' mode."""

    asyncio.run(run_async_migrations())


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Cheers hope it helps

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment