Module server.asyncio_extensions
Some helper functions for common async tasks
Functions
async def map_suppress(func: Callable[[~T], Coroutine[Any, Any, Any]],
iterable: Iterable[~T],
logger: logging.Logger = <Logger server.asyncio_extensions (WARNING)>,
msg: str = '')-
Expand source code
async def map_suppress( func: Callable[[T], Coroutine[Any, Any, Any]], iterable: Iterable[T], logger: logging.Logger = logger, msg: str = "" ): results = await asyncio.gather( *(func(item) for item in iterable), return_exceptions=True ) for result, item in zip(results, iterable): if isinstance(result, BaseException): logger.exception( "Unexpected error %s%s", msg, item, exc_info=result )
def synchronized(*args)
-
Expand source code
def synchronized(*args): """ Ensure that a function will only execute in serial. # Params - `lock`: An instance of asyncio.Lock to use for synchronization. """ # Invoked like @synchronized if args and inspect.isfunction(args[0]): return _synchronize(args[0]) # Invoked like @synchronized() or @synchronized(args, ...) return _partial(_synchronize, *args)
Ensure that a function will only execute in serial.
Params
lock
: An instance of asyncio.Lock to use for synchronization.
def synchronizedmethod(*args)
-
Expand source code
def synchronizedmethod(*args): """ Create a method that will be wrapped with an async lock. # Params - `attrname`: The name of the lock attribute that will be used. If the attribute doesn't exist or is None, a lock will be created. The default is to use a value based on the decorated function name. """ # Invoked like @synchronizedmethod if args and inspect.isfunction(args[0]): return _synchronize_method(args[0]) # Invoked like @synchronizedmethod() or @synchronizedmethod(args, ...) return _partial(_synchronize_method, *args)
Create a method that will be wrapped with an async lock.
Params
attrname
: The name of the lock attribute that will be used. If the attribute doesn't exist or is None, a lock will be created. The default is to use a value based on the decorated function name.
Classes
class AsyncLock (*args, **kwargs)
-
Expand source code
class AsyncLock(Protocol, AsyncContextManager["AsyncLock"]): def locked(self) -> bool: ... async def acquire(self) -> bool: ... def release(self) -> None: ...
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: ...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::
class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto(Protocol[T]): def meth(self) -> T: ...
Ancestors
- typing.Protocol
- contextlib.AbstractAsyncContextManager
- typing.Generic
- abc.ABC
Methods
async def acquire(self) ‑> bool
-
Expand source code
async def acquire(self) -> bool: ...
def locked(self) ‑> bool
-
Expand source code
def locked(self) -> bool: ...
def release(self) ‑> None
-
Expand source code
def release(self) -> None: ...
class SpinLock (sleep_duration: float = 0.01)
-
Expand source code
class SpinLock(_ContextManagerMixin): """ An asyncio spinlock. The advantage of using this over asyncio.Lock is that it can be called accross multiple event loops at the cost of being less performant. As with any spinlock, it's best used in situations where concurrent access is unlikely. """ def __init__(self, sleep_duration: float = 0.01): self.sleep_duration = sleep_duration self._locked = False def __repr__(self) -> str: res = super().__repr__() extra = "locked" if self._locked else "unlocked" return f"<{res[1:-1]} [{extra}]>" def locked(self) -> bool: """Return True if lock is acquired.""" return self._locked async def acquire(self) -> bool: """ Sleeps repeatedly for sleep_duration until the lock is unlocked, then sets it to locked and returns True. """ while self._locked: await asyncio.sleep(self.sleep_duration) self._locked = True return True def release(self) -> None: """ When invoked on an unlocked lock, a RuntimeError is raised. """ if self._locked: self._locked = False else: raise RuntimeError("Lock is not acquired.")
An asyncio spinlock. The advantage of using this over asyncio.Lock is that it can be called accross multiple event loops at the cost of being less performant. As with any spinlock, it's best used in situations where concurrent access is unlikely.
Ancestors
- asyncio.locks._ContextManagerMixin
Methods
async def acquire(self) ‑> bool
-
Expand source code
async def acquire(self) -> bool: """ Sleeps repeatedly for sleep_duration until the lock is unlocked, then sets it to locked and returns True. """ while self._locked: await asyncio.sleep(self.sleep_duration) self._locked = True return True
Sleeps repeatedly for sleep_duration until the lock is unlocked, then sets it to locked and returns True.
def locked(self) ‑> bool
-
Expand source code
def locked(self) -> bool: """Return True if lock is acquired.""" return self._locked
Return True if lock is acquired.
def release(self) ‑> None
-
Expand source code
def release(self) -> None: """ When invoked on an unlocked lock, a RuntimeError is raised. """ if self._locked: self._locked = False else: raise RuntimeError("Lock is not acquired.")
When invoked on an unlocked lock, a RuntimeError is raised.