Module server.message_queue_service

Interfaces with RabbitMQ

Classes

class ConnectionAttemptFailed (*args, **kwargs)
Expand source code
class ConnectionAttemptFailed(ConnectionError):
    pass

Connection error.

Ancestors

  • builtins.ConnectionError
  • builtins.OSError
  • builtins.Exception
  • builtins.BaseException
class MessageQueueService
Expand source code
@with_logger
class MessageQueueService(Service):
    """
    Service handling connection to the message queue
    and providing an interface to publish messages.
    """

    def __init__(self) -> None:
        self._connection = None
        self._channel = None
        self._exchanges = {}
        self._exchange_types = {}
        self._is_ready = False

        config.register_callback("MQ_USER", self.reconnect)
        config.register_callback("MQ_PASSWORD", self.reconnect)
        config.register_callback("MQ_VHOST", self.reconnect)
        config.register_callback("MQ_SERVER", self.reconnect)
        config.register_callback("MQ_PORT", self.reconnect)

    @synchronizedmethod("initialization_lock")
    async def initialize(self) -> None:
        if self._is_ready:
            return

        try:
            await self._connect()
        except ConnectionAttemptFailed:
            return
        self._is_ready = True

        await self._declare_exchange(config.MQ_EXCHANGE_NAME, ExchangeType.TOPIC)

    async def _connect(self) -> None:
        try:
            self._connection = await aio_pika.connect_robust(
                "amqp://{user}:{password}@{server}:{port}/{vhost}".format(
                    user=config.MQ_USER,
                    password=config.MQ_PASSWORD,
                    vhost=config.MQ_VHOST,
                    server=config.MQ_SERVER,
                    port=config.MQ_PORT,
                ),
                loop=asyncio.get_running_loop(),
            )
        except ConnectionError as e:
            self._logger.warning(
                "Unable to connect to RabbitMQ. Is it running?", exc_info=True
            )
            raise ConnectionAttemptFailed from e
        except ProbableAuthenticationError as e:
            self._logger.warning(
                "Unable to connect to RabbitMQ. Incorrect credentials?", exc_info=True
            )
            raise ConnectionAttemptFailed from e
        except Exception as e:
            self._logger.warning(
                "Unable to connect to RabbitMQ due to unhandled excpetion %s. Incorrect vhost?",
                e,
                exc_info=True,
            )
            raise ConnectionAttemptFailed from e

        self._channel = await self._connection.channel(publisher_confirms=False)
        self._logger.debug("Connected to RabbitMQ %r", self._connection)

    async def declare_exchange(
        self, exchange_name: str, exchange_type: ExchangeType = ExchangeType.TOPIC, durable: bool = True
    ) -> None:
        await self.initialize()
        if not self._is_ready:
            self._logger.warning(
                "Not connected to RabbitMQ, unable to declare exchange."
            )
            return

        await self._declare_exchange(exchange_name, exchange_type, durable)

    async def _declare_exchange(
        self, exchange_name: str, exchange_type: ExchangeType, durable: bool = True
    ) -> None:
        new_exchange = await self._channel.declare_exchange(
            exchange_name, exchange_type, durable
        )

        self._exchanges[exchange_name] = new_exchange
        self._exchange_types[exchange_name] = exchange_type

    @synchronizedmethod("initialization_lock")
    async def shutdown(self) -> None:
        self._is_ready = False
        await self._shutdown()

    async def _shutdown(self) -> None:
        if self._channel is not None:
            await self._channel.close()
            self._channel = None

        if self._connection is not None:
            await self._connection.close()
            self._connection = None

    async def publish(
        self,
        exchange_name: str,
        routing: str,
        payload: dict,
        mandatory: bool = False,
        delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
    ) -> None:
        await self.publish_many(
            exchange_name,
            routing,
            [payload],
            mandatory=mandatory,
            delivery_mode=delivery_mode
        )

    async def publish_many(
        self,
        exchange_name: str,
        routing: str,
        payloads: Iterable[dict],
        mandatory: bool = False,
        delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
    ) -> None:
        if not self._is_ready:
            self._logger.warning(
                "Not connected to RabbitMQ, unable to publish message."
            )
            return

        exchange = self._exchanges.get(exchange_name)
        if exchange is None:
            raise KeyError(f"Unknown exchange {exchange_name}.")

        async with self._channel.transaction():
            for payload in payloads:
                message = aio_pika.Message(
                    json.dumps(payload).encode(),
                    delivery_mode=delivery_mode
                )
                await exchange.publish(
                    message,
                    routing_key=routing,
                    mandatory=mandatory
                )
                self._logger.log(
                    TRACE,
                    "Published message %s to %s/%s",
                    payload,
                    exchange_name,
                    routing
                )

    @synchronizedmethod("initialization_lock")
    async def reconnect(self) -> None:
        self._is_ready = False
        await self._shutdown()

        try:
            await self._connect()
        except ConnectionAttemptFailed:
            return

        for exchange_name in list(self._exchanges.keys()):
            await self._declare_exchange(
                exchange_name, self._exchange_types[exchange_name]
            )
        self._is_ready = True

Service handling connection to the message queue and providing an interface to publish messages.

Ancestors

Methods

async def declare_exchange(self,
exchange_name: str,
exchange_type: aio_pika.abc.ExchangeType = ExchangeType.TOPIC,
durable: bool = True) ‑> None
Expand source code
async def declare_exchange(
    self, exchange_name: str, exchange_type: ExchangeType = ExchangeType.TOPIC, durable: bool = True
) -> None:
    await self.initialize()
    if not self._is_ready:
        self._logger.warning(
            "Not connected to RabbitMQ, unable to declare exchange."
        )
        return

    await self._declare_exchange(exchange_name, exchange_type, durable)
async def publish(self,
exchange_name: str,
routing: str,
payload: dict,
mandatory: bool = False,
delivery_mode: aio_pika.abc.DeliveryMode = DeliveryMode.PERSISTENT) ‑> None
Expand source code
async def publish(
    self,
    exchange_name: str,
    routing: str,
    payload: dict,
    mandatory: bool = False,
    delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
) -> None:
    await self.publish_many(
        exchange_name,
        routing,
        [payload],
        mandatory=mandatory,
        delivery_mode=delivery_mode
    )
async def publish_many(self,
exchange_name: str,
routing: str,
payloads: Iterable[dict],
mandatory: bool = False,
delivery_mode: aio_pika.abc.DeliveryMode = DeliveryMode.PERSISTENT) ‑> None
Expand source code
async def publish_many(
    self,
    exchange_name: str,
    routing: str,
    payloads: Iterable[dict],
    mandatory: bool = False,
    delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
) -> None:
    if not self._is_ready:
        self._logger.warning(
            "Not connected to RabbitMQ, unable to publish message."
        )
        return

    exchange = self._exchanges.get(exchange_name)
    if exchange is None:
        raise KeyError(f"Unknown exchange {exchange_name}.")

    async with self._channel.transaction():
        for payload in payloads:
            message = aio_pika.Message(
                json.dumps(payload).encode(),
                delivery_mode=delivery_mode
            )
            await exchange.publish(
                message,
                routing_key=routing,
                mandatory=mandatory
            )
            self._logger.log(
                TRACE,
                "Published message %s to %s/%s",
                payload,
                exchange_name,
                routing
            )
async def reconnect(self) ‑> None
Expand source code
@synchronizedmethod("initialization_lock")
async def reconnect(self) -> None:
    self._is_ready = False
    await self._shutdown()

    try:
        await self._connect()
    except ConnectionAttemptFailed:
        return

    for exchange_name in list(self._exchanges.keys()):
        await self._declare_exchange(
            exchange_name, self._exchange_types[exchange_name]
        )
    self._is_ready = True

Inherited members