Module server.broadcast_service

Classes

class BroadcastService (server: ServerInstance,
message_queue_service: MessageQueueService,
game_service: GameService,
player_service: PlayerService)
Expand source code
@with_logger
class BroadcastService(Service):
    """
    Broadcast updates about changed entities.
    """

    def __init__(
        self,
        server: "ServerInstance",
        message_queue_service: MessageQueueService,
        game_service: GameService,
        player_service: PlayerService,
    ):
        self.server = server
        self.message_queue_service = message_queue_service
        self.game_service = game_service
        self.player_service = player_service
        self._report_dirties_event = None

    async def initialize(self):
        # Using a lazy interval timer so that the intervals can be changed
        # without restarting the server.
        self._broadcast_dirties_timer = LazyIntervalTimer(
            lambda: config.DIRTY_REPORT_INTERVAL,
            self._monitored_report_dirties,
            start=True
        )
        self._broadcast_ping_timer = LazyIntervalTimer(
            lambda: config.PING_INTERVAL,
            self.broadcast_ping,
            start=True
        )

    async def _monitored_report_dirties(self):
        event = asyncio.Event()
        self._report_dirties_event = event
        try:
            await self.report_dirties()
        finally:
            event.set()

    async def report_dirties(self):
        """
        Send updates about any dirty (changed) entities to connected players.
        This function is called at a fixed interval, which guarantees that any
        given object will not be written out to the clients more than once per
        interval.
        """
        self.game_service.update_active_game_metrics()
        dirty_games = self.game_service.pop_dirty_games()
        dirty_queues = self.game_service.pop_dirty_queues()
        dirty_players = self.player_service.pop_dirty_players()

        if dirty_queues:
            matchmaker_info = {
                "command": "matchmaker_info",
                "queues": [queue.to_dict() for queue in dirty_queues]
            }
            self.server.write_broadcast(matchmaker_info)

        if dirty_players:
            player_info = {
                "command": "player_info",
                "players": [player.to_dict() for player in dirty_players]
            }
            self.server.write_broadcast(player_info)

        game_info = {
            "command": "game_info",
            "games": []
        }
        # TODO: This spams squillions of messages: we should implement per-
        # connection message aggregation at the next abstraction layer down :P
        for game in dirty_games:
            if game.state == GameState.ENDED:
                self.game_service.remove_game(game)

            # So we're going to be broadcasting this to _somebody_...
            message = game.to_dict()
            game_info["games"].append(message)

            self.server.write_broadcast(
                message,
                lambda conn: (
                    conn.authenticated
                    and game.is_visible_to_player(conn.player)
                )
            )

        if dirty_queues:
            await self.message_queue_service.publish(
                config.MQ_EXCHANGE_NAME,
                "broadcast.matchmakerInfo.update",
                matchmaker_info,
                delivery_mode=DeliveryMode.NOT_PERSISTENT
            )

        if dirty_players:
            await self.message_queue_service.publish(
                config.MQ_EXCHANGE_NAME,
                "broadcast.playerInfo.update",
                player_info,
                delivery_mode=DeliveryMode.NOT_PERSISTENT
            )

        if dirty_games:
            await self.message_queue_service.publish(
                config.MQ_EXCHANGE_NAME,
                "broadcast.gameInfo.update",
                game_info,
                delivery_mode=DeliveryMode.NOT_PERSISTENT
            )

    def broadcast_ping(self):
        self.server.write_broadcast({"command": "ping"})

    async def wait_report_dirtes(self):
        """
        Wait for the current report_dirties task to complete.
        """
        if self._report_dirties_event is None:
            return

        await self._report_dirties_event.wait()

    async def graceful_shutdown(self):
        if config.SHUTDOWN_KICK_IDLE_PLAYERS:
            message = (
                "If you're in a game you can continue to play, otherwise you "
                "will be disconnected. If you aren't reconnected automatically "
                "please wait a few minutes and try to connect again."
            )
        else:
            message = (
                "If you're in a game you can continue to play, however, you "
                "will not be able to create any new games until the server has "
                "been restarted."
            )

        delta = humanize.precisedelta(config.SHUTDOWN_GRACE_PERIOD)
        self.server.write_broadcast({
            "command": "notice",
            "style": "info",
            "text": (
                f"The server will be shutting down for maintenance in {delta}! "
                f"{message}"
            )
        })

    async def shutdown(self):
        self.server.write_broadcast({
            "command": "notice",
            "style": "info",
            "text": (
                "The server has been shut down for maintenance "
                "but should be back online soon. If you experience any "
                "problems, please restart your client. <br/><br/>"
                "We apologize for this interruption."
            )
        })

Broadcast updates about changed entities.

Ancestors

Methods

def broadcast_ping(self)
Expand source code
def broadcast_ping(self):
    self.server.write_broadcast({"command": "ping"})
async def report_dirties(self)
Expand source code
async def report_dirties(self):
    """
    Send updates about any dirty (changed) entities to connected players.
    This function is called at a fixed interval, which guarantees that any
    given object will not be written out to the clients more than once per
    interval.
    """
    self.game_service.update_active_game_metrics()
    dirty_games = self.game_service.pop_dirty_games()
    dirty_queues = self.game_service.pop_dirty_queues()
    dirty_players = self.player_service.pop_dirty_players()

    if dirty_queues:
        matchmaker_info = {
            "command": "matchmaker_info",
            "queues": [queue.to_dict() for queue in dirty_queues]
        }
        self.server.write_broadcast(matchmaker_info)

    if dirty_players:
        player_info = {
            "command": "player_info",
            "players": [player.to_dict() for player in dirty_players]
        }
        self.server.write_broadcast(player_info)

    game_info = {
        "command": "game_info",
        "games": []
    }
    # TODO: This spams squillions of messages: we should implement per-
    # connection message aggregation at the next abstraction layer down :P
    for game in dirty_games:
        if game.state == GameState.ENDED:
            self.game_service.remove_game(game)

        # So we're going to be broadcasting this to _somebody_...
        message = game.to_dict()
        game_info["games"].append(message)

        self.server.write_broadcast(
            message,
            lambda conn: (
                conn.authenticated
                and game.is_visible_to_player(conn.player)
            )
        )

    if dirty_queues:
        await self.message_queue_service.publish(
            config.MQ_EXCHANGE_NAME,
            "broadcast.matchmakerInfo.update",
            matchmaker_info,
            delivery_mode=DeliveryMode.NOT_PERSISTENT
        )

    if dirty_players:
        await self.message_queue_service.publish(
            config.MQ_EXCHANGE_NAME,
            "broadcast.playerInfo.update",
            player_info,
            delivery_mode=DeliveryMode.NOT_PERSISTENT
        )

    if dirty_games:
        await self.message_queue_service.publish(
            config.MQ_EXCHANGE_NAME,
            "broadcast.gameInfo.update",
            game_info,
            delivery_mode=DeliveryMode.NOT_PERSISTENT
        )

Send updates about any dirty (changed) entities to connected players. This function is called at a fixed interval, which guarantees that any given object will not be written out to the clients more than once per interval.

async def wait_report_dirtes(self)
Expand source code
async def wait_report_dirtes(self):
    """
    Wait for the current report_dirties task to complete.
    """
    if self._report_dirties_event is None:
        return

    await self._report_dirties_event.wait()

Wait for the current report_dirties task to complete.

Inherited members