Created
March 5, 2025 07:08
-
-
Save ZediWards/a084c130ecb4269c4df4f86839cf43ea to your computer and use it in GitHub Desktop.
SQLAlchemy notes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uuid | |
from io import StringIO | |
import pandas as pd | |
import sqlalchemy as sql | |
import sqlalchemy.dialects.mssql as mssql | |
from sqlalchemy import ( | |
Column, | |
Identity, | |
Integer, | |
MetaData, | |
String, | |
Table, | |
create_engine, | |
text, | |
) | |
# engine = create_engine("sqlite:///:memory") | |
engine = create_engine("sqlite:///database.db") | |
# Create table | |
metadata = MetaData() | |
user_table = Table( | |
'user', | |
metadata, | |
Column( | |
'id', mssql.UNIQUEIDENTIFIER, primary_key=True, default=uuid.uuid4 | |
), # might be overkill? | |
# Column('id', Integer, Identity(start=1, increment=1), primary_key=True), # explicit Identity with controls | |
# Column('id', Integer, primary_key=True, autoincrement=True), # makes Identity | |
Column('name', mssql.NVARCHAR(100), nullable=False), | |
Column('date', mssql.DATE(), nullable=False), | |
Column('age', mssql.INTEGER(), nullable=False), | |
) | |
user_table.drop(engine, checkfirst=True) | |
user_table.create(engine, checkfirst=True) | |
csv_content = """name,date,age | |
John Doe,2023-01-15,35 | |
Jane Smith,2022-11-20,28 | |
Michael Johnson,2023-03-10,42 | |
Emily Brown,2022-09-05,31 | |
David Wilson,2023-02-14,45 | |
Sarah Lee,2022-12-01,27 | |
Robert Taylor,2023-04-22,39 | |
Lisa Martinez,2022-10-18,33 | |
William Anderson,2023-01-30,36 | |
Jennifer Garcia,2022-08-12,29""" | |
df = pd.read_csv(StringIO(csv_content)) | |
df.to_sql(user_table.name, engine, if_exists='fail', index=False) | |
# insert_stmt = insert(user_table).values(df.to_dict('records')) # meh | |
# common operations | |
with engine.connect() as conn: | |
# select | |
query = sql.select(user_table) | |
# filter | |
filter_query = sql.select(user_table).where(user_table.c.age > 30) | |
# ordering | |
ordered_query = sql.select(user_table).order_by(user_table.c.name) | |
# inserting | |
insert_query = user_table.insert().values( | |
[{"name": "Willy Nelson", 'date': '03-05-2025', 'age': 75}] | |
) | |
# limiting | |
count_query = sql.select(sql.func.count()).select_from(user_table) | |
# execute statement | |
results = conn.execute(query).fetchall() | |
# parameterized queries -- text() allows named params and passing dict as values | |
param_query = sql.text("SELECT * FROM user WHERE age > :min_age") | |
# param results and passing params | |
param_results = conn.execute(param_query, {"min_age": 30}) | |
# read sql file into text() | |
with open('writing_files/sqlalchemy/tables.sql', 'r') as file: | |
file_query = text(file.read()) | |
# use file query with named params | |
pd.read_sql(file_query, engine, params={"column": "name"}) | |
# using context manager for connections | |
with engine.connect() as conn: | |
max_date_query = text("SELECT MAX(date) FROM user WHERE age > :min_age;") | |
# scalar returns first col of first row | |
max_date = conn.execute(max_date_query, {"min_age": 50}).scalar() | |
# conn.commit() # if transaction | |
# inserting -- .connect() commit as you go, .begin() auto commit at end if no exceptions (all or nothing) | |
with engine.connect() as conn: | |
insert_query = text( | |
"INSERT INTO user (name, date, age) VALUES (:name, :date, :age)" | |
) | |
result = conn.execute(insert_query, {"name": "John", "date": "03-05-2025"}) | |
# conn.rollback() | |
conn.commit() | |
# getting result | |
with engine.connect() as conn: | |
# fetch all rows as tuples | |
query = sql.select(user_table) | |
result = conn.execute(query) | |
rows_as_tuples = result.fetchall() | |
# fetch single row (expected query result) - returns single tuple | |
# query for single result | |
result = conn.execute(query) | |
row = result.fetchone() | |
# convert to df | |
df = pd.read_sql(query, engine) | |
# iterate over results | |
result = conn.execute(query) | |
for row in result: | |
print(row.name, row.age) # handle data row by row | |
# mapped to dict | |
result = conn.execute(query) | |
result_list = result.mappings().all() | |
# list comprehension instead of mappings | |
result = conn.execute(query) | |
result_list = [dict(row) for row in result] | |
# list of column values | |
query = sql.select(user_table.c.name) | |
result = conn.execute(query) | |
# fetch all values for the 'name' column | |
names = result.scalars().all() | |
# fetchall() - tuples | |
# fetchone() - tuple | |
# scalars() - returns first column | |
# mappings() - rows as list of dict | |
# list comprehensions | |
# iteration |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment