Module server.db

Database interaction

Sub-modules

server.db.models
server.db.typedefs

Functions

def stat_db_errors()
Expand source code
@contextmanager
def stat_db_errors():
    """
    Collect metrics on errors thrown
    """
    try:
        yield
    except DBAPIError as e:
        db_exceptions.labels(e.__class__.__name__, e.code).inc()
        raise e

Collect metrics on errors thrown

Classes

class AsyncConnection (async_engine: AsyncEngine,
sync_connection: Optional[Connection] = None)
Expand source code
class AsyncConnection(_AsyncConnection):
    async def execute(
        self,
        statement,
        parameters=None,
        execution_options=EMPTY_DICT,
        **kwargs
    ):
        with stat_db_errors():
            return await self._execute(
                statement,
                parameters=parameters,
                execution_options=execution_options,
                **kwargs
            )

    async def _execute(
        self,
        statement,
        parameters=None,
        execution_options=EMPTY_DICT,
        **kwargs
    ):
        """
        Wrap strings in the text type automatically and allows bindparams to be
        passed via kwargs.
        """
        if isinstance(statement, str):
            statement = text(statement)

        if kwargs and parameters is None:
            parameters = kwargs

        return await super().execute(
            statement,
            parameters=parameters,
            execution_options=execution_options
        )

    async def stream(
        self,
        statement,
        parameters=None,
        execution_options=EMPTY_DICT,
        **kwargs
    ):
        with stat_db_errors():
            return await self._stream(
                statement,
                parameters=parameters,
                execution_options=execution_options,
                **kwargs
            )

    async def _stream(
        self,
        statement,
        parameters=None,
        execution_options=EMPTY_DICT,
        **kwargs
    ):
        """
        Wrap strings in the text type automatically and allows bindparams to be
        passed via kwargs.
        """
        if isinstance(statement, str):
            statement = text(statement)

        if kwargs and parameters is None:
            parameters = kwargs

        return await super().stream(
            statement,
            parameters=parameters,
            execution_options=execution_options
        )

    async def deadlock_retry_execute(
        self,
        statement,
        parameters=None,
        execution_options=EMPTY_DICT,
        max_attempts=3,
        **kwargs
    ):
        with stat_db_errors():
            return await self._deadlock_retry_execute(
                statement,
                parameters=parameters,
                execution_options=execution_options,
                max_attempts=max_attempts,
                **kwargs
            )

    async def _deadlock_retry_execute(
        self,
        statement,
        parameters=None,
        execution_options=EMPTY_DICT,
        max_attempts=3,
        **kwargs
    ):
        for attempt in range(max_attempts - 1):
            try:
                return await self._execute(
                    statement,
                    parameters=parameters,
                    execution_options=execution_options,
                    **kwargs
                )
            except OperationalError as e:
                error_text = str(e)
                if any(msg in error_text for msg in (
                    "Deadlock found",
                    "Lock wait timeout exceeded"
                )):
                    logger.warning(
                        "Encountered deadlock during SQL execution. Attempts: %d",
                        attempt + 1
                    )
                    # Exponential backoff
                    await asyncio.sleep(0.3 * 2 ** attempt)
                else:
                    raise

        # On the final attempt we don't do any error handling
        return await self._execute(
            statement,
            parameters=parameters,
            execution_options=execution_options,
            **kwargs
        )

An asyncio proxy for a :class:_engine.Connection.

:class:_asyncio.AsyncConnection is acquired using the :meth:_asyncio.AsyncEngine.connect method of :class:_asyncio.AsyncEngine::

from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")

async with engine.connect() as conn:
    result = await conn.execute(select(table))

Added in version: 1.4

Ancestors

  • sqlalchemy.ext.asyncio.engine.AsyncConnection
  • sqlalchemy.ext.asyncio.base.ProxyComparable
  • sqlalchemy.ext.asyncio.base.ReversibleProxy
  • sqlalchemy.ext.asyncio.base.StartableContext
  • collections.abc.Awaitable
  • typing.Generic
  • abc.ABC
  • sqlalchemy.ext.asyncio.engine.AsyncConnectable

Methods

async def deadlock_retry_execute(self,
statement,
parameters=None,
execution_options=immutabledict({}),
max_attempts=3,
**kwargs)
Expand source code
async def deadlock_retry_execute(
    self,
    statement,
    parameters=None,
    execution_options=EMPTY_DICT,
    max_attempts=3,
    **kwargs
):
    with stat_db_errors():
        return await self._deadlock_retry_execute(
            statement,
            parameters=parameters,
            execution_options=execution_options,
            max_attempts=max_attempts,
            **kwargs
        )
async def execute(self, statement, parameters=None, execution_options=immutabledict({}), **kwargs)
Expand source code
async def execute(
    self,
    statement,
    parameters=None,
    execution_options=EMPTY_DICT,
    **kwargs
):
    with stat_db_errors():
        return await self._execute(
            statement,
            parameters=parameters,
            execution_options=execution_options,
            **kwargs
        )

Executes a SQL statement construct and return a buffered :class:_engine.Result.

:param object: The statement to be executed. This is always an object that is in both the :class:_expression.ClauseElement and :class:_expression.Executable hierarchies, including:

  • :class:_expression.Select
  • :class:_expression.Insert, :class:_expression.Update, :class:_expression.Delete
  • :class:_expression.TextClause and :class:_expression.TextualSelect
  • :class:_schema.DDL and objects which inherit from :class:_schema.ExecutableDDLElement

:param parameters: parameters which will be bound into the statement. This may be either a dictionary of parameter names to values, or a mutable sequence (e.g. a list) of dictionaries. When a list of dictionaries is passed, the underlying statement execution will make use of the DBAPI cursor.executemany() method. When a single dictionary is passed, the DBAPI cursor.execute() method will be used.

:param execution_options: optional dictionary of execution options, which will be associated with the statement execution. This dictionary can provide a subset of the options that are accepted by :meth:_engine.Connection.execution_options.

:return: a :class:_engine.Result object.

async def stream(self, statement, parameters=None, execution_options=immutabledict({}), **kwargs)
Expand source code
async def stream(
    self,
    statement,
    parameters=None,
    execution_options=EMPTY_DICT,
    **kwargs
):
    with stat_db_errors():
        return await self._stream(
            statement,
            parameters=parameters,
            execution_options=execution_options,
            **kwargs
        )

Execute a statement and return an awaitable yielding a :class:_asyncio.AsyncResult object.

E.g.::

result = await conn.stream(stmt):
async for row in result:
    print(f"{row}")

The :meth:.AsyncConnection.stream() method supports optional context manager use against the :class:.AsyncResult object, as in::

async with conn.stream(stmt) as result:
    async for row in result:
        print(f"{row}")

In the above pattern, the :meth:.AsyncResult.close method is invoked unconditionally, even if the iterator is interrupted by an exception throw. Context manager use remains optional, however, and the function may be called in either an async with fn(): or await fn() style.

Added in version: 2.0.0b3 added context manager support

:return: an awaitable object that will yield an :class:_asyncio.AsyncResult object.

Seealso

:meth:.AsyncConnection.stream_scalars

class AsyncEngine (sync_engine: Engine)
Expand source code
class AsyncEngine(_AsyncEngine):
    """
    For overriding the connection class used to execute statements.

    This could also be done by changing engine._connection_cls, however this
    is undocumented and probably more fragile so we subclass instead.
    """

    def connect(self):
        return AsyncConnection(self)

For overriding the connection class used to execute statements.

This could also be done by changing engine._connection_cls, however this is undocumented and probably more fragile so we subclass instead.

Ancestors

  • sqlalchemy.ext.asyncio.engine.AsyncEngine
  • sqlalchemy.ext.asyncio.base.ProxyComparable
  • sqlalchemy.ext.asyncio.base.ReversibleProxy
  • typing.Generic
  • sqlalchemy.ext.asyncio.engine.AsyncConnectable

Methods

def connect(self)
Expand source code
def connect(self):
    return AsyncConnection(self)

Return an :class:_asyncio.AsyncConnection object.

The :class:_asyncio.AsyncConnection will procure a database connection from the underlying connection pool when it is entered as an async context manager::

async with async_engine.connect() as conn:
    result = await conn.execute(select(user_table))

The :class:_asyncio.AsyncConnection may also be started outside of a context manager by invoking its :meth:_asyncio.AsyncConnection.start method.

class FAFDatabase (host: str = 'localhost',
port: int = 3306,
user: str = 'root',
password: str = '',
db: str = 'faf_test',
**kwargs)
Expand source code
class FAFDatabase:
    def __init__(
        self,
        host: str = "localhost",
        port: int = 3306,
        user: str = "root",
        password: str = "",
        db: str = "faf_test",
        **kwargs
    ):
        kwargs["future"] = True
        sync_engine = create_engine(
            f"mysql+aiomysql://{user}:{password}@{host}:{port}/{db}",
            **kwargs
        )

        self.engine = AsyncEngine(sync_engine)

    def acquire(self):
        return self.engine.begin()

    async def close(self):
        await self.engine.dispose()

Methods

def acquire(self)
Expand source code
def acquire(self):
    return self.engine.begin()
async def close(self)
Expand source code
async def close(self):
    await self.engine.dispose()