Module server.asyncio_extensions

Some helper functions for common async tasks

Functions

async def map_suppress(func: Callable[[~T], Coroutine[Any, Any, Any]],
iterable: Iterable[~T],
logger: logging.Logger = <Logger server.asyncio_extensions (WARNING)>,
msg: str = '')
Expand source code
async def map_suppress(
    func: Callable[[T], Coroutine[Any, Any, Any]],
    iterable: Iterable[T],
    logger: logging.Logger = logger,
    msg: str = ""
):
    results = await asyncio.gather(
        *(func(item) for item in iterable),
        return_exceptions=True
    )
    for result, item in zip(results, iterable):
        if isinstance(result, BaseException):
            logger.exception(
                "Unexpected error %s%s",
                msg,
                item,
                exc_info=result
            )
def synchronized(*args)
Expand source code
def synchronized(*args):
    """
    Ensure that a function will only execute in serial.

    # Params
    - `lock`: An instance of asyncio.Lock to use for synchronization.
    """
    # Invoked like @synchronized
    if args and inspect.isfunction(args[0]):
        return _synchronize(args[0])

    # Invoked like @synchronized() or @synchronized(args, ...)
    return _partial(_synchronize, *args)

Ensure that a function will only execute in serial.

Params

  • lock: An instance of asyncio.Lock to use for synchronization.
def synchronizedmethod(*args)
Expand source code
def synchronizedmethod(*args):
    """
    Create a method that will be wrapped with an async lock.

    # Params
    - `attrname`: The name of the lock attribute that will be used. If the
        attribute doesn't exist or is None, a lock will be created. The default
        is to use a value based on the decorated function name.
    """
    # Invoked like @synchronizedmethod
    if args and inspect.isfunction(args[0]):
        return _synchronize_method(args[0])

    # Invoked like @synchronizedmethod() or @synchronizedmethod(args, ...)
    return _partial(_synchronize_method, *args)

Create a method that will be wrapped with an async lock.

Params

  • attrname: The name of the lock attribute that will be used. If the attribute doesn't exist or is None, a lock will be created. The default is to use a value based on the decorated function name.

Classes

class AsyncLock (*args, **kwargs)
Expand source code
class AsyncLock(Protocol, AsyncContextManager["AsyncLock"]):
    def locked(self) -> bool: ...
    async def acquire(self) -> bool: ...
    def release(self) -> None: ...

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...

Ancestors

  • typing.Protocol
  • contextlib.AbstractAsyncContextManager
  • typing.Generic
  • abc.ABC

Methods

async def acquire(self) ‑> bool
Expand source code
async def acquire(self) -> bool: ...
def locked(self) ‑> bool
Expand source code
def locked(self) -> bool: ...
def release(self) ‑> None
Expand source code
def release(self) -> None: ...
class SpinLock (sleep_duration: float = 0.01)
Expand source code
class SpinLock(_ContextManagerMixin):
    """
    An asyncio spinlock. The advantage of using this over asyncio.Lock is that
    it can be called accross multiple event loops at the cost of being less
    performant. As with any spinlock, it's best used in situations where
    concurrent access is unlikely.
    """

    def __init__(self, sleep_duration: float = 0.01):
        self.sleep_duration = sleep_duration
        self._locked = False

    def __repr__(self) -> str:
        res = super().__repr__()
        extra = "locked" if self._locked else "unlocked"
        return f"<{res[1:-1]} [{extra}]>"

    def locked(self) -> bool:
        """Return True if lock is acquired."""
        return self._locked

    async def acquire(self) -> bool:
        """
        Sleeps repeatedly for sleep_duration until the lock is unlocked, then
        sets it to locked and returns True.
        """
        while self._locked:
            await asyncio.sleep(self.sleep_duration)

        self._locked = True
        return True

    def release(self) -> None:
        """
        When invoked on an unlocked lock, a RuntimeError is raised.
        """
        if self._locked:
            self._locked = False
        else:
            raise RuntimeError("Lock is not acquired.")

An asyncio spinlock. The advantage of using this over asyncio.Lock is that it can be called accross multiple event loops at the cost of being less performant. As with any spinlock, it's best used in situations where concurrent access is unlikely.

Ancestors

  • asyncio.locks._ContextManagerMixin

Methods

async def acquire(self) ‑> bool
Expand source code
async def acquire(self) -> bool:
    """
    Sleeps repeatedly for sleep_duration until the lock is unlocked, then
    sets it to locked and returns True.
    """
    while self._locked:
        await asyncio.sleep(self.sleep_duration)

    self._locked = True
    return True

Sleeps repeatedly for sleep_duration until the lock is unlocked, then sets it to locked and returns True.

def locked(self) ‑> bool
Expand source code
def locked(self) -> bool:
    """Return True if lock is acquired."""
    return self._locked

Return True if lock is acquired.

def release(self) ‑> None
Expand source code
def release(self) -> None:
    """
    When invoked on an unlocked lock, a RuntimeError is raised.
    """
    if self._locked:
        self._locked = False
    else:
        raise RuntimeError("Lock is not acquired.")

When invoked on an unlocked lock, a RuntimeError is raised.