Module server.broadcast_service
Classes
class BroadcastService (server: ServerInstance,
message_queue_service: MessageQueueService,
game_service: GameService,
player_service: PlayerService)-
Expand source code
@with_logger class BroadcastService(Service): """ Broadcast updates about changed entities. """ def __init__( self, server: "ServerInstance", message_queue_service: MessageQueueService, game_service: GameService, player_service: PlayerService, ): self.server = server self.message_queue_service = message_queue_service self.game_service = game_service self.player_service = player_service self._report_dirties_event = None async def initialize(self): # Using a lazy interval timer so that the intervals can be changed # without restarting the server. self._broadcast_dirties_timer = LazyIntervalTimer( lambda: config.DIRTY_REPORT_INTERVAL, self._monitored_report_dirties, start=True ) self._broadcast_ping_timer = LazyIntervalTimer( lambda: config.PING_INTERVAL, self.broadcast_ping, start=True ) async def _monitored_report_dirties(self): event = asyncio.Event() self._report_dirties_event = event try: await self.report_dirties() finally: event.set() async def report_dirties(self): """ Send updates about any dirty (changed) entities to connected players. This function is called at a fixed interval, which guarantees that any given object will not be written out to the clients more than once per interval. """ self.game_service.update_active_game_metrics() dirty_games = self.game_service.pop_dirty_games() dirty_queues = self.game_service.pop_dirty_queues() dirty_players = self.player_service.pop_dirty_players() if dirty_queues: matchmaker_info = { "command": "matchmaker_info", "queues": [queue.to_dict() for queue in dirty_queues] } self.server.write_broadcast(matchmaker_info) if dirty_players: player_info = { "command": "player_info", "players": [player.to_dict() for player in dirty_players] } self.server.write_broadcast(player_info) game_info = { "command": "game_info", "games": [] } # TODO: This spams squillions of messages: we should implement per- # connection message aggregation at the next abstraction layer down :P for game in dirty_games: if game.state == GameState.ENDED: self.game_service.remove_game(game) # So we're going to be broadcasting this to _somebody_... message = game.to_dict() game_info["games"].append(message) self.server.write_broadcast( message, lambda conn: ( conn.authenticated and game.is_visible_to_player(conn.player) ) ) if dirty_queues: await self.message_queue_service.publish( config.MQ_EXCHANGE_NAME, "broadcast.matchmakerInfo.update", matchmaker_info, delivery_mode=DeliveryMode.NOT_PERSISTENT ) if dirty_players: await self.message_queue_service.publish( config.MQ_EXCHANGE_NAME, "broadcast.playerInfo.update", player_info, delivery_mode=DeliveryMode.NOT_PERSISTENT ) if dirty_games: await self.message_queue_service.publish( config.MQ_EXCHANGE_NAME, "broadcast.gameInfo.update", game_info, delivery_mode=DeliveryMode.NOT_PERSISTENT ) def broadcast_ping(self): self.server.write_broadcast({"command": "ping"}) async def wait_report_dirtes(self): """ Wait for the current report_dirties task to complete. """ if self._report_dirties_event is None: return await self._report_dirties_event.wait() async def graceful_shutdown(self): if config.SHUTDOWN_KICK_IDLE_PLAYERS: message = ( "If you're in a game you can continue to play, otherwise you " "will be disconnected. If you aren't reconnected automatically " "please wait a few minutes and try to connect again." ) else: message = ( "If you're in a game you can continue to play, however, you " "will not be able to create any new games until the server has " "been restarted." ) delta = humanize.precisedelta(config.SHUTDOWN_GRACE_PERIOD) self.server.write_broadcast({ "command": "notice", "style": "info", "text": ( f"The server will be shutting down for maintenance in {delta}! " f"{message}" ) }) async def shutdown(self): self.server.write_broadcast({ "command": "notice", "style": "info", "text": ( "The server has been shut down for maintenance " "but should be back online soon. If you experience any " "problems, please restart your client. <br/><br/>" "We apologize for this interruption." ) })
Broadcast updates about changed entities.
Ancestors
Methods
def broadcast_ping(self)
-
Expand source code
def broadcast_ping(self): self.server.write_broadcast({"command": "ping"})
async def report_dirties(self)
-
Expand source code
async def report_dirties(self): """ Send updates about any dirty (changed) entities to connected players. This function is called at a fixed interval, which guarantees that any given object will not be written out to the clients more than once per interval. """ self.game_service.update_active_game_metrics() dirty_games = self.game_service.pop_dirty_games() dirty_queues = self.game_service.pop_dirty_queues() dirty_players = self.player_service.pop_dirty_players() if dirty_queues: matchmaker_info = { "command": "matchmaker_info", "queues": [queue.to_dict() for queue in dirty_queues] } self.server.write_broadcast(matchmaker_info) if dirty_players: player_info = { "command": "player_info", "players": [player.to_dict() for player in dirty_players] } self.server.write_broadcast(player_info) game_info = { "command": "game_info", "games": [] } # TODO: This spams squillions of messages: we should implement per- # connection message aggregation at the next abstraction layer down :P for game in dirty_games: if game.state == GameState.ENDED: self.game_service.remove_game(game) # So we're going to be broadcasting this to _somebody_... message = game.to_dict() game_info["games"].append(message) self.server.write_broadcast( message, lambda conn: ( conn.authenticated and game.is_visible_to_player(conn.player) ) ) if dirty_queues: await self.message_queue_service.publish( config.MQ_EXCHANGE_NAME, "broadcast.matchmakerInfo.update", matchmaker_info, delivery_mode=DeliveryMode.NOT_PERSISTENT ) if dirty_players: await self.message_queue_service.publish( config.MQ_EXCHANGE_NAME, "broadcast.playerInfo.update", player_info, delivery_mode=DeliveryMode.NOT_PERSISTENT ) if dirty_games: await self.message_queue_service.publish( config.MQ_EXCHANGE_NAME, "broadcast.gameInfo.update", game_info, delivery_mode=DeliveryMode.NOT_PERSISTENT )
Send updates about any dirty (changed) entities to connected players. This function is called at a fixed interval, which guarantees that any given object will not be written out to the clients more than once per interval.
async def wait_report_dirtes(self)
-
Expand source code
async def wait_report_dirtes(self): """ Wait for the current report_dirties task to complete. """ if self._report_dirties_event is None: return await self._report_dirties_event.wait()
Wait for the current report_dirties task to complete.
Inherited members