Module server.db
Database interaction
Sub-modules
server.db.models
server.db.typedefs
Functions
def stat_db_errors()
-
Expand source code
@contextmanager def stat_db_errors(): """ Collect metrics on errors thrown """ try: yield except DBAPIError as e: db_exceptions.labels(e.__class__.__name__, e.code).inc() raise e
Collect metrics on errors thrown
Classes
class AsyncConnection (async_engine: AsyncEngine,
sync_connection: Optional[Connection] = None)-
Expand source code
class AsyncConnection(_AsyncConnection): async def execute( self, statement, parameters=None, execution_options=EMPTY_DICT, **kwargs ): with stat_db_errors(): return await self._execute( statement, parameters=parameters, execution_options=execution_options, **kwargs ) async def _execute( self, statement, parameters=None, execution_options=EMPTY_DICT, **kwargs ): """ Wrap strings in the text type automatically and allows bindparams to be passed via kwargs. """ if isinstance(statement, str): statement = text(statement) if kwargs and parameters is None: parameters = kwargs return await super().execute( statement, parameters=parameters, execution_options=execution_options ) async def stream( self, statement, parameters=None, execution_options=EMPTY_DICT, **kwargs ): with stat_db_errors(): return await self._stream( statement, parameters=parameters, execution_options=execution_options, **kwargs ) async def _stream( self, statement, parameters=None, execution_options=EMPTY_DICT, **kwargs ): """ Wrap strings in the text type automatically and allows bindparams to be passed via kwargs. """ if isinstance(statement, str): statement = text(statement) if kwargs and parameters is None: parameters = kwargs return await super().stream( statement, parameters=parameters, execution_options=execution_options ) async def deadlock_retry_execute( self, statement, parameters=None, execution_options=EMPTY_DICT, max_attempts=3, **kwargs ): with stat_db_errors(): return await self._deadlock_retry_execute( statement, parameters=parameters, execution_options=execution_options, max_attempts=max_attempts, **kwargs ) async def _deadlock_retry_execute( self, statement, parameters=None, execution_options=EMPTY_DICT, max_attempts=3, **kwargs ): for attempt in range(max_attempts - 1): try: return await self._execute( statement, parameters=parameters, execution_options=execution_options, **kwargs ) except OperationalError as e: error_text = str(e) if any(msg in error_text for msg in ( "Deadlock found", "Lock wait timeout exceeded" )): logger.warning( "Encountered deadlock during SQL execution. Attempts: %d", attempt + 1 ) # Exponential backoff await asyncio.sleep(0.3 * 2 ** attempt) else: raise # On the final attempt we don't do any error handling return await self._execute( statement, parameters=parameters, execution_options=execution_options, **kwargs )
An asyncio proxy for a :class:
_engine.Connection
.:class:
_asyncio.AsyncConnection
is acquired using the :meth:_asyncio.AsyncEngine.connect
method of :class:_asyncio.AsyncEngine
::from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname") async with engine.connect() as conn: result = await conn.execute(select(table))
Added in version: 1.4
Ancestors
- sqlalchemy.ext.asyncio.engine.AsyncConnection
- sqlalchemy.ext.asyncio.base.ProxyComparable
- sqlalchemy.ext.asyncio.base.ReversibleProxy
- sqlalchemy.ext.asyncio.base.StartableContext
- collections.abc.Awaitable
- typing.Generic
- abc.ABC
- sqlalchemy.ext.asyncio.engine.AsyncConnectable
Methods
async def deadlock_retry_execute(self,
statement,
parameters=None,
execution_options=immutabledict({}),
max_attempts=3,
**kwargs)-
Expand source code
async def deadlock_retry_execute( self, statement, parameters=None, execution_options=EMPTY_DICT, max_attempts=3, **kwargs ): with stat_db_errors(): return await self._deadlock_retry_execute( statement, parameters=parameters, execution_options=execution_options, max_attempts=max_attempts, **kwargs )
async def execute(self, statement, parameters=None, execution_options=immutabledict({}), **kwargs)
-
Expand source code
async def execute( self, statement, parameters=None, execution_options=EMPTY_DICT, **kwargs ): with stat_db_errors(): return await self._execute( statement, parameters=parameters, execution_options=execution_options, **kwargs )
Executes a SQL statement construct and return a buffered :class:
_engine.Result
.:param object: The statement to be executed. This is always an object that is in both the :class:
_expression.ClauseElement
and :class:_expression.Executable
hierarchies, including:- :class:
_expression.Select
- :class:
_expression.Insert
, :class:_expression.Update
, :class:_expression.Delete
- :class:
_expression.TextClause
and :class:_expression.TextualSelect
- :class:
_schema.DDL
and objects which inherit from :class:_schema.ExecutableDDLElement
:param parameters: parameters which will be bound into the statement. This may be either a dictionary of parameter names to values, or a mutable sequence (e.g. a list) of dictionaries. When a list of dictionaries is passed, the underlying statement execution will make use of the DBAPI
cursor.executemany()
method. When a single dictionary is passed, the DBAPIcursor.execute()
method will be used.:param execution_options: optional dictionary of execution options, which will be associated with the statement execution. This dictionary can provide a subset of the options that are accepted by :meth:
_engine.Connection.execution_options
.:return: a :class:
_engine.Result
object. - :class:
async def stream(self, statement, parameters=None, execution_options=immutabledict({}), **kwargs)
-
Expand source code
async def stream( self, statement, parameters=None, execution_options=EMPTY_DICT, **kwargs ): with stat_db_errors(): return await self._stream( statement, parameters=parameters, execution_options=execution_options, **kwargs )
Execute a statement and return an awaitable yielding a :class:
_asyncio.AsyncResult
object.E.g.::
result = await conn.stream(stmt): async for row in result: print(f"{row}")
The :meth:
.AsyncConnection.stream()
method supports optional context manager use against the :class:.AsyncResult
object, as in::async with conn.stream(stmt) as result: async for row in result: print(f"{row}")
In the above pattern, the :meth:
.AsyncResult.close
method is invoked unconditionally, even if the iterator is interrupted by an exception throw. Context manager use remains optional, however, and the function may be called in either anasync with fn():
orawait fn()
style.Added in version: 2.0.0b3 added context manager support
:return: an awaitable object that will yield an :class:
_asyncio.AsyncResult
object.Seealso
:meth:
.AsyncConnection.stream_scalars
class AsyncEngine (sync_engine: Engine)
-
Expand source code
class AsyncEngine(_AsyncEngine): """ For overriding the connection class used to execute statements. This could also be done by changing engine._connection_cls, however this is undocumented and probably more fragile so we subclass instead. """ def connect(self): return AsyncConnection(self)
For overriding the connection class used to execute statements.
This could also be done by changing engine._connection_cls, however this is undocumented and probably more fragile so we subclass instead.
Ancestors
- sqlalchemy.ext.asyncio.engine.AsyncEngine
- sqlalchemy.ext.asyncio.base.ProxyComparable
- sqlalchemy.ext.asyncio.base.ReversibleProxy
- typing.Generic
- sqlalchemy.ext.asyncio.engine.AsyncConnectable
Methods
def connect(self)
-
Expand source code
def connect(self): return AsyncConnection(self)
Return an :class:
_asyncio.AsyncConnection
object.The :class:
_asyncio.AsyncConnection
will procure a database connection from the underlying connection pool when it is entered as an async context manager::async with async_engine.connect() as conn: result = await conn.execute(select(user_table))
The :class:
_asyncio.AsyncConnection
may also be started outside of a context manager by invoking its :meth:_asyncio.AsyncConnection.start
method.
class FAFDatabase (host: str = 'localhost',
port: int = 3306,
user: str = 'root',
password: str = '',
db: str = 'faf_test',
**kwargs)-
Expand source code
class FAFDatabase: def __init__( self, host: str = "localhost", port: int = 3306, user: str = "root", password: str = "", db: str = "faf_test", **kwargs ): kwargs["future"] = True sync_engine = create_engine( f"mysql+aiomysql://{user}:{password}@{host}:{port}/{db}", **kwargs ) self.engine = AsyncEngine(sync_engine) def acquire(self): return self.engine.begin() async def close(self): await self.engine.dispose()
Methods
def acquire(self)
-
Expand source code
def acquire(self): return self.engine.begin()
async def close(self)
-
Expand source code
async def close(self): await self.engine.dispose()