Module server.db
Database interaction
async def get_and_validate_database_version(db: FAFDatabase) ‑> int | None
Expand source code
async def get_and_validate_database_version(db: FAFDatabase) -> Optional[int]: if not config.DB_FLYWAY_TABLE: return None flyway_schema_history = get_flyway_schema_history_table( config.DB_FLYWAY_TABLE, ) async with db.acquire() as conn: result = await conn.execute( select( flyway_schema_history.c.version, ).where( and_( flyway_schema_history.c.success == true(), flyway_schema_history.c.version.is_not(None), ), ), ) version = max((int(row.version) for row in result), default=None) if version is None: raise RuntimeError( "No successful database migrations found! Unable to determine " "database version", ) if version < FLYWAY_MINIMUM_REQUIRED_VERSION: raise RuntimeError( f"Database version v{version} does not meet minimum requirement " f"v{FLYWAY_MINIMUM_REQUIRED_VERSION}", ) return version
def stat_db_errors()
Expand source code
@contextmanager def stat_db_errors(): """ Collect metrics on errors thrown """ try: yield except DBAPIError as e: db_exceptions.labels(e.__class__.__name__, e.code).inc() raise e
Collect metrics on errors thrown
class AsyncConnection (async_engine: AsyncEngine,
sync_connection: Optional[Connection] = None)-
Expand source code
class AsyncConnection(_AsyncConnection): async def execute( self, statement, parameters=None, execution_options=EMPTY_DICT, **kwargs ): with stat_db_errors(): return await self._execute( statement, parameters=parameters, execution_options=execution_options, **kwargs ) async def _execute( self, statement, parameters=None, execution_options=EMPTY_DICT, **kwargs ): """ Wrap strings in the text type automatically and allows bindparams to be passed via kwargs. """ if isinstance(statement, str): statement = text(statement) if kwargs and parameters is None: parameters = kwargs return await super().execute( statement, parameters=parameters, execution_options=execution_options ) async def stream( self, statement, parameters=None, execution_options=EMPTY_DICT, **kwargs ): with stat_db_errors(): return await self._stream( statement, parameters=parameters, execution_options=execution_options, **kwargs ) async def _stream( self, statement, parameters=None, execution_options=EMPTY_DICT, **kwargs ): """ Wrap strings in the text type automatically and allows bindparams to be passed via kwargs. """ if isinstance(statement, str): statement = text(statement) if kwargs and parameters is None: parameters = kwargs return await super().stream( statement, parameters=parameters, execution_options=execution_options ) async def deadlock_retry_execute( self, statement, parameters=None, execution_options=EMPTY_DICT, max_attempts=3, **kwargs ): with stat_db_errors(): return await self._deadlock_retry_execute( statement, parameters=parameters, execution_options=execution_options, max_attempts=max_attempts, **kwargs ) async def _deadlock_retry_execute( self, statement, parameters=None, execution_options=EMPTY_DICT, max_attempts=3, **kwargs ): for attempt in range(max_attempts - 1): try: return await self._execute( statement, parameters=parameters, execution_options=execution_options, **kwargs ) except OperationalError as e: error_text = str(e) if any(msg in error_text for msg in ( "Deadlock found", "Lock wait timeout exceeded" )): logger.warning( "Encountered deadlock during SQL execution. Attempts: %d", attempt + 1 ) # Exponential backoff await asyncio.sleep(0.3 * 2 ** attempt) else: raise # On the final attempt we don't do any error handling return await self._execute( statement, parameters=parameters, execution_options=execution_options, **kwargs )
An asyncio proxy for a :class:
is acquired using the :meth:_asyncio.AsyncEngine.connect
method of :class:_asyncio.AsyncEngine
::from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname") async with engine.connect() as conn: result = await conn.execute(select(table))
Added in version: 1.4
- sqlalchemy.ext.asyncio.engine.AsyncConnection
- sqlalchemy.ext.asyncio.base.ProxyComparable
- sqlalchemy.ext.asyncio.base.ReversibleProxy
- sqlalchemy.ext.asyncio.base.StartableContext
- typing.Generic
- abc.ABC
- sqlalchemy.ext.asyncio.engine.AsyncConnectable
async def deadlock_retry_execute(self,
Expand source code
async def deadlock_retry_execute( self, statement, parameters=None, execution_options=EMPTY_DICT, max_attempts=3, **kwargs ): with stat_db_errors(): return await self._deadlock_retry_execute( statement, parameters=parameters, execution_options=execution_options, max_attempts=max_attempts, **kwargs )
async def execute(self, statement, parameters=None, execution_options=immutabledict({}), **kwargs)
Expand source code
async def execute( self, statement, parameters=None, execution_options=EMPTY_DICT, **kwargs ): with stat_db_errors(): return await self._execute( statement, parameters=parameters, execution_options=execution_options, **kwargs )
Executes a SQL statement construct and return a buffered :class:
.:param object: The statement to be executed. This is always an object that is in both the :class:
and :class:_expression.Executable
hierarchies, including:- :class:
- :class:
, :class:_expression.Update
, :class:_expression.Delete
- :class:
and :class:_expression.TextualSelect
- :class:
and objects which inherit from :class:_schema.ExecutableDDLElement
:param parameters: parameters which will be bound into the statement. This may be either a dictionary of parameter names to values, or a mutable sequence (e.g. a list) of dictionaries. When a list of dictionaries is passed, the underlying statement execution will make use of the DBAPI
method. When a single dictionary is passed, the DBAPIcursor.execute()
method will be used.:param execution_options: optional dictionary of execution options, which will be associated with the statement execution. This dictionary can provide a subset of the options that are accepted by :meth:
.:return: a :class:
object. - :class:
async def stream(self, statement, parameters=None, execution_options=immutabledict({}), **kwargs)
Expand source code
async def stream( self, statement, parameters=None, execution_options=EMPTY_DICT, **kwargs ): with stat_db_errors(): return await self._stream( statement, parameters=parameters, execution_options=execution_options, **kwargs )
Execute a statement and return an awaitable yielding a :class:
result = await async for row in result: print(f"{row}")
The :meth:
method supports optional context manager use against the :class:.AsyncResult
object, as in::async with as result: async for row in result: print(f"{row}")
In the above pattern, the :meth:
method is invoked unconditionally, even if the iterator is interrupted by an exception throw. Context manager use remains optional, however, and the function may be called in either anasync with fn():
orawait fn()
style.Added in version: 2.0.0b3 added context manager support
:return: an awaitable object that will yield an :class:
class AsyncEngine (sync_engine: Engine)
Expand source code
class AsyncEngine(_AsyncEngine): """ For overriding the connection class used to execute statements. This could also be done by changing engine._connection_cls, however this is undocumented and probably more fragile so we subclass instead. """ def connect(self): return AsyncConnection(self)
For overriding the connection class used to execute statements.
This could also be done by changing engine._connection_cls, however this is undocumented and probably more fragile so we subclass instead.
- sqlalchemy.ext.asyncio.engine.AsyncEngine
- sqlalchemy.ext.asyncio.base.ProxyComparable
- sqlalchemy.ext.asyncio.base.ReversibleProxy
- typing.Generic
- sqlalchemy.ext.asyncio.engine.AsyncConnectable
def connect(self)
Expand source code
def connect(self): return AsyncConnection(self)
Return an :class:
object.The :class:
will procure a database connection from the underlying connection pool when it is entered as an async context manager::async with async_engine.connect() as conn: result = await conn.execute(select(user_table))
The :class:
may also be started outside of a context manager by invoking its :meth:_asyncio.AsyncConnection.start
class FAFDatabase (host: str = 'localhost',
port: int = 3306,
user: str = 'root',
password: str = '',
db: str = 'faf_test',
Expand source code
class FAFDatabase: def __init__( self, host: str = "localhost", port: int = 3306, user: str = "root", password: str = "", db: str = "faf_test", **kwargs ): kwargs["future"] = True sync_engine = create_engine( f"mysql+aiomysql://{user}:{password}@{host}:{port}/{db}", **kwargs ) self.engine = AsyncEngine(sync_engine) def acquire(self): return self.engine.begin() async def close(self): await self.engine.dispose()
def acquire(self)
Expand source code
def acquire(self): return self.engine.begin()
async def close(self)
Expand source code
async def close(self): await self.engine.dispose()