Module server.client_message_queue_service
Forward RabbitMQ messages from trusted microservices to connected clients.
Wire contract
Publishers post to the MQ_EXCHANGE_NAME topic exchange with routing key
request.client.notify. Addressing lives in AMQP message headers:
user-id(int, optional): forward the body to the player with this id, if connected to this lobby instance. If not connected, the message is logged and acked.channel(str, optional, reserved): future per-channel pub/sub. Currently recognised but not yet implemented.- If neither header is set, the message is broadcast to every authenticated client connected to this instance.
The message body is a UTF-8 JSON object and is forwarded to the client verbatim. The lobby server does not validate or rewrap it; producers are trusted because the broker is reachable only from internal services.
Classes
class ClientMessageQueueService (server: ServerInstance,
message_queue_service: MessageQueueService,
player_service: PlayerService)-
Expand source code
@with_logger class ClientMessageQueueService(Service): """Consume `request.client.notify` messages and forward to local clients.""" _logger: ClassVar[logging.Logger] def __init__( self, server: "ServerInstance", message_queue_service: MessageQueueService, player_service: PlayerService, ): """Wire dependencies; consumer is started in `initialize`.""" self.server = server self.message_queue_service = message_queue_service self.player_service = player_service self._queue: Optional[AbstractQueue] = None self._consumer_tag: Optional[str] = None async def initialize(self) -> None: # Queue naming follows `<exchange>.<service>.<routing-key>` plus a # per-instance suffix because each lobby pod has its own queue (vs. # the API's shared queues like `faf-lobby.api.event.update`). On k8s # `socket.gethostname()` resolves to the pod name (e.g. # `faf-lobby-server-6d9c4588ff-lzdcr`); locally it's the dev's host. queue_name = ( f"{config.MQ_EXCHANGE_NAME}.lobby.client.notify" f".{socket.gethostname()}" ) result = await self.message_queue_service.declare_queue_and_consume( exchange_name=config.MQ_EXCHANGE_NAME, routing_key=CLIENT_NOTIFY_ROUTING_KEY, callback=self._on_message, queue_name=queue_name, ) if result is not None: self._queue, self._consumer_tag = result async def shutdown(self) -> None: if self._queue is not None and self._consumer_tag is not None: await self._queue.cancel(self._consumer_tag) self._queue = None self._consumer_tag = None async def _on_message(self, message: AbstractIncomingMessage) -> None: async with message.process(requeue=False): try: payload = json.loads(message.body) except (ValueError, UnicodeDecodeError): self._logger.warning( "Dropping client-notify message with non-JSON body" ) return if not isinstance(payload, dict): self._logger.warning( "Dropping client-notify message: payload is not a JSON object" ) return headers = message.headers or {} user_id = headers.get("user-id") channel = headers.get("channel") if user_id is not None: self._dispatch_to_user(user_id, payload) elif channel is not None: self._logger.info( "client-notify channel %r received but channel routing is " "not yet implemented; dropping", channel, ) else: self.server.write_broadcast(payload) def _dispatch_to_user(self, user_id: Any, payload: dict) -> None: try: player_id = int(user_id) except (TypeError, ValueError): self._logger.warning( "Dropping client-notify message: invalid user-id %r", user_id ) return player = self.player_service[player_id] if player is None: self._logger.warning( "Dropping client-notify message: user %s not connected here", player_id, ) return player.write_message(payload)Consume
request.client.notifymessages and forward to local clients.Wire dependencies; consumer is started in
initialize.Ancestors
Inherited members