Skip to content

Instantly share code, notes, and snippets.

@polyvertex
Last active January 14, 2026 14:26
Show Gist options
  • Select an option

  • Save polyvertex/e5dacc97350910f080fc85c61af20192 to your computer and use it in GitHub Desktop.

Select an option

Save polyvertex/e5dacc97350910f080fc85c61af20192 to your computer and use it in GitHub Desktop.
sqlite.py - sqlite3 with nested transactions for real
# Copyright (c) Jean-Charles Lefebvre
# SPDX-License-Identifier: MIT
"""
An API-compatible module wrapping standard package ``sqlite3``, with full
control over transactions, support of nested transactions in a pythonic way by
the means of context managers, as well as extra convenience features.
See `sqlite_connect()` as a single and most simple entry point.
Class `Sqlite` is a wrapper around `sqlite3.Connection`, which uses
`SqliteCursor` as its default Cursor factory.
"""
import contextlib
import logging
import os
import re
import sqlite3
import threading
__all__ = [
"Sqlite",
"SqliteCursor",
"SqliteProxy",
"sqlite_connect",
"sqlite_iter_script_statements",
"sqlite_statement_repr"]
try:
_pid = os.getpid()
except Exception:
_pid = 0
class SqliteCursor(sqlite3.Cursor):
"""
A wrapper around `sqlite3.Cursor` with extra features, as well as a
re-implementation of method ``executescript()`` so to ensure no
transaction-related SQL statement is emitted either by
`sqlite3.Cursor.executescript()`, or by the provided script itself.
"""
#: Allows to make `sqlite3.Cursor.arraysize` configurable once for all from
#: client code.
#:
#: Defaults to `None`, which hints to keep relying on sqlite3's own default,
#: advertised as ``1`` (one) by documentation.
#:
#: This value is read and applied by the constructor, so it is only useful
#: as a class property.
ARRAY_SIZE = None
#: Define whether a debug note should be added on-the-fly to an exception
#: raised by `execute()`, but not other execute-like methods.
#:
#: Value must be either `None` (add a note only in `__debug__` mode), or
#: strictly a `bool` to manually control this behavior.
#:
#: Added note includes database's URI and the SQL query.
#:
#: This value can be modified at class and instance level, since it is read
#: only when `execute()` raises an exception.
VERBOSE_ERROR = None
#: Optionally specify a `logging.Logger` compatible object to log a message
#: with level defined by `LOGGER_LEVEL`, that contains the SQL statement to
#: be executed.
#:
#: Assign `None` (default) to disable logging.
LOGGER = None
#: The log level of messages emitted with `LOGGER` object if specified.
#:
#: Defaults to `logging.DEBUG`.
LOGGER_LEVEL = logging.DEBUG
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
assert isinstance(self.connection, Sqlite)
if self.ARRAY_SIZE is not None:
if not isinstance(self.ARRAY_SIZE, int) or self.ARRAY_SIZE < 1:
raise ValueError("ARRAY_SIZE")
self.arraysize = self.ARRAY_SIZE
def __del__(self):
self.close()
def __repr__(self):
try:
# assume Sqlite instead of sqlite3.Connection
uri = self.connection.uri
except AttributeError:
uri = f"id:{id(self.connection)}"
return f"<{type(self).__name__} {uri!r}>"
def execute(self, sql, /, *args, **kwargs):
self.log("execute", sql)
return self.execute_impl(
super().execute, "execute", sql, *args, **kwargs)
def executemany(self, sql, /, *args, **kwargs):
self.log("executemany", sql)
return self.execute_impl(
super().executemany, "executemany", sql, *args, **kwargs)
def executescript(self, sql_script, /, *args, **kwargs):
self.log("executescript", sql_script)
# break script into single statements, so we can call execute() instead
# of executescript(), since the later does insert a COMMIT statement,
# resulting in unexpected state of property Connection.in_transaction
# wich in turn breaks the handling of transactions as implemented by
# class Sqlite
for keyword, stmt in sqlite_iter_script_statements(
sql_script, with_keyword=True):
if keyword == "SELECT":
raise sqlite3.ProgrammingError(
"SELECT statements not permitted from executescript() "
"methods")
elif keyword in (
"BEGIN", "COMMIT", "END",
"SAVEPOINT", "RELEASE", "ROLLBACK"):
raise sqlite3.ProgrammingError(
f"transaction-related statements not permitted from "
f"executescript() methods (got a {keyword} statement)")
self.execute_impl(super().execute, "executescript", stmt)
def execute_impl(self, method, method_name, sql, /, *args, **kwargs):
try:
return method(sql, *args, **kwargs)
except Exception as exc:
if self.VERBOSE_ERROR is True or (
__debug__ and self.VERBOSE_ERROR is None):
uri = getattr(self.connection, "uri", None) or "???"
exc.add_note(
f"SQL statement failed; "
# f"error {type(exc)}: {exc}; "
f"db: {uri}; "
f"method: {method_name}; "
f"statement: {sqlite_statement_repr(sql)}")
raise
def log(self, method_name, sql):
if self.LOGGER is not None:
self.LOGGER.log(
self.LOGGER_LEVEL,
f"SQL {method_name.upper()}: {sqlite_statement_repr(sql)}")
class SqliteProxy:
"""
Lightweight proxy to an instance of either `Sqlite` or `SqliteCursor`, so to
avoid caller site keeping a live `Sqlite` object even after leaving a
context manager.
Instantiated with context manager `Sqlite.proxy()`.
This proxy allows to get, set and delete the attributes of the proxified
object as long as the connection is not closed.
"""
__slots__ = ("_proxified_object_", )
def __init__(self, proxified):
if not isinstance(proxified, Sqlite | SqliteCursor):
raise ValueError("proxified")
self._proxified_object_ = proxified
def __del__(self):
self.close()
def __repr__(self):
return f"<{type(self).__name__} {self._proxified_object_!r}>"
def __bool__(self):
return bool(self._proxified_object_)
def __getattr__(self, name):
self.__check_proxified_object()
return getattr(self._proxified_object_, name)
def __setattr__(self, name, value):
if name == "_proxified_object_":
return object.__setattr__(self, name, value)
self.__check_proxified_object()
return setattr(self._proxified_object_, name, value)
def __delattr__(self, name):
self.__check_proxified_object()
return self._proxified_object_.__delattr__(name)
def __dir__(self, name):
self.__check_proxified_object()
return dir(self._proxified_object_)
def __enter__(self):
# feature disabled since this class is meant to be used via
# Sqlite.proxy(), which is a context manager already
raise NotImplementedError
def __exit__(self, *args):
# feature disabled since this class is meant to be used via
# Sqlite.proxy(), which is a context manager already
raise NotImplementedError
def close(self):
if self._proxified_object_ is not None:
try:
self._proxified_object_.close()
finally:
self._proxified_object_ = None
def __check_proxified_object(self):
if self._proxified_object_ is None:
raise RuntimeError(
"trying to operate on a closed database connection")
class Sqlite(sqlite3.Connection):
"""
A wrapper around `sqlite3.Connection` with extra utility features and
support of thread-safe nested `transaction()` with context management by
the means of sqlite's
`savepoints <https://sqlite.org/lang_savepoint.html>`_ using ``SAVEPOINT``,
``RELEASE SAVEPOINT`` and ``ROLLBACK TO SAVEPOINT`` commands.
Recommended usage via `Sqlite.proxy()`. See also `transaction()` and
`SqliteCursor` which is the default Cursor class of this connection wrapper.
.. warning::
Due to this wrapper's nested transactions feature, the following
properties are set by constructor and MUST NOT be changed:
* ``autocommit`` is set to `False`
* ``isolation_level`` is set to `None`
.. seealso::
Python's `issue16958 <https://bugs.python.org/issue16958>`_ about using
`sqlite3.Connection` as a context manager.
"""
IDENTIFIER_MINLEN = 1
IDENTIFIER_MAXLEN = 255 # arbitrary soft-limit
IDENTIFIER_REGEX = re.compile(
rf"\A"
rf"[_a-zA-Z][_a-zA-Z0-9]"
rf"{{{IDENTIFIER_MINLEN - 1},{IDENTIFIER_MAXLEN - 1}}}"
rf"\Z")
# convenience shorthands - exceptions
Warning = sqlite3.Warning
Error = sqlite3.Error
InterfaceError = sqlite3.InterfaceError
DatabaseError = sqlite3.DatabaseError
DataError = sqlite3.DataError
OperationalError = sqlite3.OperationalError
IntegrityError = sqlite3.IntegrityError
InternalError = sqlite3.InternalError
ProgrammingError = sqlite3.ProgrammingError
NotSupportedError = sqlite3.NotSupportedError
# convenience shorthands - standard utility classes
Blob = sqlite3.Blob
Cursor = SqliteCursor
Row = sqlite3.Row
# convenience shorthands - custom utility classes
Proxy = SqliteProxy
def __init__(self, database, **kwargs):
# we want full control over transactions
# no auto-commit, no auto-transaction
autocommit = kwargs.pop("autocommit", sqlite3.LEGACY_TRANSACTION_CONTROL)
isolation_level = kwargs.pop("isolation_level", None)
if (autocommit != sqlite3.LEGACY_TRANSACTION_CONTROL or
isolation_level is not None):
raise ValueError(
"full control over transactions required, autocommit and "
"isolation_level arguments cannot be modified by client code")
try:
database = os.fspath(database)
except TypeError:
pass
if not isinstance(database, str): # i.e. not bytes
raise ValueError(
f"argument database must translate to a str object, got type "
f"{type(database)}")
super().__init__(
database,
autocommit=autocommit,
isolation_level=isolation_level,
**kwargs)
# safety check to ensure underlying layers (sqlite3.Connection and
# sqlite3 backend itself) are NOT in either auto-commit or
# auto-transaction mode
if (super().autocommit != sqlite3.LEGACY_TRANSACTION_CONTROL or
super().isolation_level is not None or
super().in_transaction):
raise RuntimeError("unexpected state of sqlite3.Connection object")
self.__uri = database # cache this locally
self.__savepoint_lock = threading.RLock()
self.__savepoint_id = 0
self.__savepoint_stack = []
def __del__(self):
self.close()
def __bool__(self):
return self.is_open
def __repr__(self):
return f"<{type(self).__name__} {self.__uri!r}>"
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
@property
def uri(self):
"""Cached `str` value of constructor's argument *database*."""
return self.__uri
@property
def is_open(self):
"""
A `bool` to indicate whether connection down to the backend is still up.
Calls `check_open()`.
"""
try:
self.check_open()
return True
except Exception:
return False
@property
def autocommit(self):
return super().autocommit
@autocommit.setter
def autocommit(self, value):
raise NotImplementedError("autocommit setter disabled")
@property
def isolation_level(self):
return super().isolation_level
@isolation_level.setter
def isolation_level(self, value):
raise NotImplementedError("isolation_level setter disabled")
def close(self):
"""
Close connection to database.
Pending transactions are discarded, if any.
"""
with self.__savepoint_lock:
try:
super().close()
finally:
if self.__savepoint_stack:
self.__savepoint_stack = []
def check_open(self):
"""
Raise `sqlite3.ProgrammingError` when connection is closed.
.. note::
`sqlite3.Connection` does not expose an explicit way to check
whether a connection is still up, so this property does a dummy read
of `sqlite3.Connection.autocommit`, which seems to be the most
lightweight way to do so.
Under the hood, `sqlite3.Connection.autocommit` is implemented in C
and only checks current status of its internal structure (function
``pysqlite_check_connection()``) before returning internal value of
*autocommit*.
If you think there is a more lightweight and efficient way to check
whether the connection down to the backend itself is still up,
please contact the developer of this class.
"""
_ = super().autocommit
def cursor(self, factory=None):
"""
Wrapper around `sqlite3.Connection.cursor()` with *factory* defaulting
to `SqliteCursor`.
If specified, *factory* must either be `SqliteCursor` or a sub-class of
it, or a callable that returns an instance of `SqliteCursor`, or derived
from it.
"""
if factory is None:
factory = SqliteCursor
elif isinstance(factory, type) and not issubclass(factory, SqliteCursor):
raise ValueError("factory class not derived from SqliteCursor")
elif not callable(factory):
raise ValueError("factory not a class or a callable")
cursor = super().cursor(factory=factory)
if not isinstance(cursor, SqliteCursor):
raise ValueError("factory returns invalid object type")
return cursor
@contextlib.contextmanager
def cursorctx(self, factory=None):
"""
Like `cursor()` but as a context manager, such that the yielded cursor
object gets automatically ``cursor.close()`` upon exiting, which is a
more pythonic way than implicitly relying on ``Cursor.__del__``.
"""
cursor = self.cursor(factory)
# try:
# yield cursor
# except BaseException:
# try:
# cursor.close()
# except Exception:
# pass
# raise
# else:
# cursor.close()
try:
yield cursor
finally:
cursor.close()
def commit(self):
"""
Commit current `transaction()` if not already commited or released.
This method must be called from a `transaction()` context only. It
raises `sqlite3.OperationalError` otherwise.
"""
with self.__savepoint_lock:
if not self.__savepoint_stack:
raise sqlite3.OperationalError(
"commit() called outside of a transaction context")
else:
self.check_open()
self._savepoint_pop(commit=True, pop=False)
def rollback(self):
"""
Rollback current `transaction()` if not already commited or released.
This method must be called from a `transaction()` context only. It
raises `sqlite3.OperationalError` otherwise.
"""
with self.__savepoint_lock:
if not self.__savepoint_stack:
raise sqlite3.OperationalError(
"rollback() called outside of a transaction context")
else:
self.check_open()
self._savepoint_pop(commit=False, pop=False)
def execute(self, *args, **kwargs):
"""
Override of `sqlite3.Connection.execute()`, so to ensure derived
`cursor()` is used instead of the lower-level one.
API compatibility preserved.
"""
cursor = self.cursor()
try:
cursor.execute(*args, **kwargs)
except Exception:
cursor.close()
raise
return cursor
def executemany(self, *args, **kwargs):
"""
Override of `sqlite3.Connection.executemany()`, so to ensure derived
`cursor()` is used instead of the lower-level one.
API compatibility preserved.
"""
cursor = self.cursor()
try:
cursor.executemany(*args, **kwargs)
except Exception:
cursor.close()
raise
return cursor
def executescript(self, *args, **kwargs):
"""
Override of `sqlite3.Connection.executescript()`, so to ensure derived
`cursor()` is used instead of the lower-level one.
API compatibility preserved.
"""
cursor = self.cursor()
try:
cursor.executescript(*args, **kwargs)
except Exception:
cursor.close()
raise
return cursor
def fetchone(self, sql, parameters=(), /):
"""
Convenience method that wraps a call to ``execute()``, followed by a
call to ``fetchone()`` from which result is returned.
"""
with self.cursorctx() as cursor:
cursor.execute(sql, parameters)
return cursor.fetchone()
def fetchmany(self, sql, parameters=(), /, *, size=None):
"""
Convenience method that wraps a call to ``execute()``, followed by a
**single** call to ``fetchmany()`` from which result is returned.
"""
with self.cursorctx() as cursor:
if not size:
size = cursor.arraysize
cursor.execute(sql, parameters)
return cursor.fetchmany(size)
def fetchall(self, sql, parameters=(), /):
"""
Convenience method that wraps a call to ``execute()``, followed by a
call to ``fetchall()`` from which result is returned.
"""
with self.cursorctx() as cursor:
cursor.execute(sql, parameters)
return cursor.fetchall()
@contextlib.contextmanager
def transaction(self, *, exc_type=BaseException):
"""
Context manager to begin a database transaction that yields object
*self*.
Thread-safety and nested transactions supported.
Transaction is committed with a ``RELEASE`` command only if no
exception instance of type *exc_type* (defaults to `BaseException`) has
been raised from context. Otherwise, a ``ROLLBACK TO`` command is issued
and exception is propagated to the caller.
.. code-block:: python
with Sqlite.proxy(":memory:") as db:
# begin a transaction context
with db.transaction():
db.execute("CREATE TABLE t1 (col1 INTEGER)")
# nested transactions are supported
with db.transaction():
db.execute("CREATE TABLE t2 (col2 INTEGER)")
try:
# in case of an uncaught exception from a transaction()
# context, transaction() automatically rolls it back,
# depending on *exc_type* argument
with db.transaction():
db.execute("CREATE TABLE t3 (col3 INTEGER)")
raise RuntimeError
except Exception:
pass
# here, creation of table t3 has been discarded
# it is also possible to manually either commit or rollback
# current context's transaction
with db.transaction():
try:
db.execute("CREATE TABLE t4 (col4 INTEGER)")
raise RuntimeError
except Exception:
db.rollback() # discards t4
# from now on, emitting new SQL statements with db is
# UNDEFINED BEHAVIOR until this very nested transaction
# context is exited
#
# leaving this very next transaction context is now a
# no-op because rollback() has been called
# table t1 is about to be committed
# here, database is closed and db is a lightweight dummy object
# holding only a single property set to None
"""
if not isinstance(exc_type, type) or not issubclass(exc_type, BaseException):
raise ValueError("exc_type")
self.check_open()
self._savepoint_push()
try:
yield self
except exc_type:
if self.is_open:
self._savepoint_pop(commit=False, pop=True)
raise
else:
self._savepoint_pop(commit=True, pop=True)
def _savepoint_push(self):
with self.__savepoint_lock:
if __debug__:
in_transaction = len(self.__savepoint_stack)
in_transaction = bool(
in_transaction > 1 or
(in_transaction == 1 and self.__savepoint_stack[0]))
assert in_transaction is bool(self.in_transaction)
del in_transaction
try:
tid = threading.get_native_id()
except Exception:
tid = 0
self.__savepoint_id += 1
savepoint = (
f"SqlitePyTx"
f"_pid{_pid:06}"
f"_tid{tid:06}"
f"_id{self.__savepoint_id:06}")
self.execute(f"SAVEPOINT {self.qn(savepoint)}")
self.__savepoint_stack.append(savepoint)
assert self.in_transaction
return savepoint
def _savepoint_pop(self, *, commit, pop):
with self.__savepoint_lock:
if not self.__savepoint_stack:
return None
if not self.is_open:
self.__savepoint_stack = []
return None
else:
if pop:
savepoint = self.__savepoint_stack.pop(-1)
else:
savepoint = self.__savepoint_stack[-1]
if savepoint is not None:
self.__savepoint_stack[-1] = None
# savepoint may be None already due to commit() or rollback()
# method being called from a transaction() context
if savepoint:
assert self.in_transaction
if commit:
self.execute(
f"RELEASE SAVEPOINT {self.qn(savepoint)}")
else:
self.execute(
f"ROLLBACK TO SAVEPOINT {self.qn(savepoint)}")
# Here and despite the emitted "ROLLBACK TO" command,
# property *in_transaction* is still True even if we
# rolled back the only existing transaction so far.
#
# This is because in sqlite, command "ROLLBACK TO" does
# not actually close a transaction, it just moves the
# timeline cursor back to its "SAVEPOINT" counterpart,
# meaning transaction is reset but still active.
#
# In order to ensure consistency, emit a command
# "RELEASE SAVEPOINT" to truly discard this now-empty
# savepoint, only if we got back to the very first
# transaction in place.
assert self.in_transaction
if not self.__savepoint_stack or (
len(self.__savepoint_stack) == 1 and
self.__savepoint_stack[0] is None):
self.execute(
f"RELEASE SAVEPOINT {self.qn(savepoint)}")
return savepoint
def check_ident(self, identifier):
"""
Instance-level wrapper of `check_ident_static()` with instance-level
default values.
"""
return self.check_ident_static(
identifier,
minlen=self.IDENTIFIER_MINLEN,
maxlen=self.IDENTIFIER_MAXLEN,
regex=self.IDENTIFIER_REGEX)
@classmethod
@contextlib.contextmanager
def proxy(cls, database, *args, **kwargs):
"""
A context manager to create and manage a `Sqlite` or `SqliteCursor`
object, or to delegate its ownership.
Yields a `SqliteProxy` object (precisely ``cls.Proxy``). Closes
connection when exiting unless *database* was already an instance of
`SqliteProxy` (or ``cls.Proxy``).
Argument *database* can be:
* A `str` or path-like (`os.PathLike`) to be passed to
`sqlite3.connect()`, as well as *args* and *kwargs* optionally. In
this case, keyword *factory* defaults to *cls* unless overwritten
manually by the caller.
* An instance of `Sqlite` or `SqliteCursor` (*args* and *kwargs* must be
empty).
* A type that is a sub-class `Sqlite`, with *args* and *kwargs* to be
passed to `sqlite3.connect()` and *database* passed as keyword
argument *factory*.
* An instance of `SqliteProxy` or ``cls.Proxy`` (*args* and *kwargs*
must be empty), in which case caller remains the owner and connection
will NOT be closed by this context.
Usage:
.. code-block:: python
# open a connection to an in-memory sqlite database
with Sqlite.proxy(":memory:", ...) as db:
# db is a SqliteProxy object to be used like a Sqlite object
db.execute("CREATE TABLE foo (bar INTEGER)")
# connection is now closed and db is a lightweight object that only
# holds a single property, set to None
"""
if isinstance(database, SqliteProxy | cls.Proxy):
# already a Proxy, caller has and keeps ownership
if args or kwargs:
raise ValueError(
"args or kwargs not accepted when database is a Proxy "
"object")
ownership = False
proxy = database
elif isinstance(database, str | os.PathLike):
# database is a URI, caller trusts *cls* to be right Sqlite class to
# instantiate with *args*, we have full ownership
ownership = True
factory = kwargs.pop("factory", cls)
proxy = sqlite3.connect(database, *args, factory=factory, **kwargs)
proxy = cls.Proxy(proxy)
elif isinstance(database, Sqlite | SqliteCursor):
# Sqlite object already created, caller gives up on its ownership
if args or kwargs:
raise ValueError(
"args or kwargs not accepted when database is a Sqlite "
"object")
ownership = True
proxy = cls.Proxy(database)
elif isinstance(database, type) and issubclass(database, Sqlite):
# database is (a sub-class of) Sqlite object and *args* are to be
# forwarded to its constructor, we have full ownership
ownership = True
factory = kwargs.pop("factory", database)
if database is not database:
raise ValueError("database and factory arguments mismatch")
proxy = sqlite3.connect(*args, factory=factory, **kwargs)
proxy = cls.Proxy(proxy)
else:
raise ValueError("database value type")
try:
yield proxy
finally:
if ownership:
proxy.close()
@classmethod
def check_ident_static(
cls, identifier, *, minlen=None, maxlen=None, regex=None):
"""
Validate *identifier* with stricter rules than SQLite3.
Raise `ValueError` if *identifier* does not match the requirements.
Return `True` otherwise, such that this method can be called in a
conditional statement.
"""
# CAUTION: do not put argument defaults as part of method signature, so
# that a derived class redefine default values at class-level if
# desired
if not isinstance(identifier, str):
raise ValueError("invalid database identifier type")
if minlen is None:
minlen = cls.IDENTIFIER_MINLEN
if maxlen is None:
maxlen = cls.IDENTIFIER_MAXLEN
if not minlen <= len(identifier) <= maxlen:
raise ValueError("database identifier length out of boundaries")
if regex is None:
regex = cls.IDENTIFIER_REGEX
if not regex.fullmatch(identifier):
raise ValueError("invalid database identifier (regex)")
return True
@staticmethod
def quote_name(name):
"""
Utility class-method to quote a *name*.
For convenience, this method is also aliased as ``qn()``.
.. seealso:: `quote_value()`
"""
name = name.replace('"', '""')
return f'"{name}"'
qn = quote_name
@staticmethod
def quote_value(value):
"""
Utility class-method to quote a *value*.
For convenience, this method is also aliased as ``qv()``.
.. seealso:: `quote_name()`
"""
value = value.replace("'", "''")
return f"'{value}'"
qv = quote_value
@staticmethod
def yield_script_statements(*args, **kwargs):
"""Convenience shorthand for `sqlite_iter_script_statements()`."""
yield from sqlite_iter_script_statements(*args, **kwargs)
def sqlite_connect(*args, **kwargs):
"""
Convenience shorthand to `Sqlite.proxy()` (context manager).
Usage:
.. code-block:: python
# open a connection to an in-memory sqlite database
with sqlite_connect(":memory:", ...) as db:
# db is a SqliteProxy object to be used as a Sqlite object
db.execute("CREATE TABLE foo (bar INTEGER)")
# connection is now closed and db is a lightweight object that only
# holds a single property, set to None
"""
return Sqlite.proxy(*args, **kwargs)
def sqlite_iter_script_statements(
script, *, source="<???>", with_keyword=False):
"""
Yield SQL `str` statements from a sqlite3-compatible *script*.
If *with_keyword* is true, each yielded value is a `tuple` containing a pair
of `str` objects: the upper-cased keyword of the statement (i.e. ``CREATE``,
``SELECT``, ...) and the statement itself.
Raise `sqlite3.ProgrammingError` when *script* is not a complete SQL
statement.
"""
def _prepare_yield(st):
st = st.strip()
return st if not with_keyword else (st.split(maxsplit=1)[0].upper(), st)
assert isinstance(source, str)
if not sqlite3.complete_statement(script):
raise sqlite3.ProgrammingError(
f"not a complete SQL statement or script: {source}")
stmt = "" # current statement
for line in script.splitlines(keepends=True):
if ";" not in line:
stmt += line
if sqlite3.complete_statement(stmt):
yield _prepare_yield(stmt)
stmt = ""
else:
parts = line.split(";")
for idx, part in enumerate(parts):
stmt += part
if idx < len(parts) - 1:
stmt += ";"
if sqlite3.complete_statement(stmt):
yield _prepare_yield(stmt)
stmt = ""
# Trailing data, if any, can be safely ignored because the whole script has
# been validated at the beginning of this function so that any remaining
# data is likely to be space characters or comment.
def sqlite_statement_repr(script):
"""
Return a `str` representation of an SQL *script*, suitable for display
purpose.
This function is typically used to avoid cluttering log messages while still
giving a hint of the involved SQL statement.
Result is always a non-empty single line `str` object, and may be a
shortened version of *script*. It may or may not be a valid SQL statement.
"""
short = script.splitlines()
if len(short) == 1:
short = short[0].strip()
elif short:
short = [line.strip() for line in short]
short = [line for line in short if line and line[0] not in ("-", "#")]
if short:
short = short[0]
if short[-1] == "(":
short = short.rstrip("(").rstrip()
short = f"{short} [...]"
if not short:
short = "<EMPTY_SQL_REPR>"
return short
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment