Package server
Forged Alliance Forever lobby server.
Overview
The lobby server handles live state information for the FAF ecosystem. This includes maintaining a list of online players, a list of hosted and ongoing games, and a number of matchmakers. It also performs certain post-game actions like computing and persisting rating changes and updating achievements. Every online player maintains an active TCP connection to the server through which the server syncronizes the current state.
Social
The social components of the lobby server are relatively limited, as the primary social element, chat, is handled by a separate server. The social features handled by the lobby server are therefore limited to:
- Syncronizing online player state
- Enforcing global bans
- Modifying a list of friends and a list of foes
- Modifying the currently selected avatar
Games
The server supports two ways of discovering games with other players: custom lobbies and matchmakers. Ultimately however, the lobby server is only able to help players discover eachother, and maintain certain meta information about games. Game simulation happens entirely on the client side, and is completely un-controlled by the server. Certain messages sent between clients throughout the course of a game will also be relayed to the server. These can be used to determine if clients were able to connect to eachother, and what the outcome of the game was.
Custom Games
Historically, the standard way to play FAF has been for one player to host a game lobby, setup the desired map and game settings, and for other players to voluntarily join that lobby until the host is satisfied with the players and launches the game. The lobby server facilitates sending certain information about these custom lobbies to all online players (subject to friend/foe rules) as well as managing a game id that can be used to join a specific lobby. This information includes, but is not necessarily limited to:
- Auto generated game uid
- Host specified game name
- Host selected map
- List of connected players (non-AI only) and their team setup
Matchmaker games
Players may also choose to join a matchmaker queue, instead of hosting a game and finding people to play with manually. The matchmaker will attempt to create balanced games using players TrueSkill rating, and choose a game host for hosting an automatch lobby. From the server perspective, automatch games behave virtually identical to custom games, the exception being that players may not request to join them by game id. The exchange of game messages and connectivity establishment happens identically to custom games.
Connectivity Establishment
When a player requests to join a game, the lobby server initiates connection establishment between the joining player and the host, and then the joining player and all other players in the match. Connections are then established using the Interactive Connectivity Establishment (ICE) protocol, using the lobby server as a medium of exchanging candidate addresses between clients. If clients require a relay in order to connect to eachother, they will authenticate with a separate sturn or turn server using credentials supplied by a separate API service.
Achievements
When a game ends, each client will report a summary of the game in the form of a stat report. These stats are then parsed to extract information about events that occurred during the game, like units built, units killed, etc. and used to unlock or progress achievements for the players.
Technical Overview
This section is intended for developers and will outline technical details of how to interact with the server. It will remain relatively high level and implementation agnostic, instead linking to other sections of the documentation that go into more detail.
Protocol
TODO
Legal
- Copyright © 2012-2014 Gael Honorez
- Copyright © 2015-2016 Michael Søndergaard sheeo@faforever.com
- Copyright © 2021 Forged Alliance Forever
Distributed under GPLv3, see license.txt
Sub-modules
server.asyncio_extensions
-
Some helper functions for common async tasks
server.broadcast_service
server.config
-
Server config variables
server.configuration_service
-
Manages periodic reloading of config variables
server.control
-
Tiny http server for introspecting state
server.core
-
Server framework …
server.db
-
Database interaction
server.decorators
-
Helper decorators
server.exceptions
-
Common exception definitions
server.factions
-
Supreme Commander known faction definitions
server.game_service
-
Manages the lifecycle of active games
server.gameconnection
-
Game communication over GpgNet
server.games
-
Type definitions for game objects
server.geoip_service
-
Manages the GeoIP database
server.health
-
Kubernetes compatible HTTP health check server.
server.info
-
Static meta information about the container/process
server.ladder_service
server.lobbyconnection
-
Handles requests from connected clients
server.matchmaker
-
The matchmaker system …
server.message_queue_service
-
Interfaces with RabbitMQ
server.metrics
-
Prometheus metric definitions
server.oauth_service
server.party_service
-
Manages interactions between players and parties
server.player_service
-
Manages connected and authenticated players
server.players
-
Player type definitions
server.profiler
-
Analysis of application performance
server.protocol
-
Protocol format definitions
server.rating
-
Type definitions for player ratings
server.rating_service
-
Post-game rating functionality
server.servercontext
-
Manages a group of connections using the same protocol over the same port
server.stats
-
Achievements and events
server.team_matchmaker
-
The team matchmaking system …
server.timing
-
Helpers for executing async functions on a timer
server.types
-
General type definitions
server.weakattr
-
Helpers for non-owned object attributes
Classes
class BroadcastService (server: ServerInstance,
message_queue_service: MessageQueueService,
game_service: GameService,
player_service: PlayerService)-
Expand source code
@with_logger class BroadcastService(Service): """ Broadcast updates about changed entities. """ def __init__( self, server: "ServerInstance", message_queue_service: MessageQueueService, game_service: GameService, player_service: PlayerService, ): self.server = server self.message_queue_service = message_queue_service self.game_service = game_service self.player_service = player_service self._report_dirties_event = None async def initialize(self): # Using a lazy interval timer so that the intervals can be changed # without restarting the server. self._broadcast_dirties_timer = LazyIntervalTimer( lambda: config.DIRTY_REPORT_INTERVAL, self._monitored_report_dirties, start=True ) self._broadcast_ping_timer = LazyIntervalTimer( lambda: config.PING_INTERVAL, self.broadcast_ping, start=True ) async def _monitored_report_dirties(self): event = asyncio.Event() self._report_dirties_event = event try: await self.report_dirties() finally: event.set() async def report_dirties(self): """ Send updates about any dirty (changed) entities to connected players. This function is called at a fixed interval, which guarantees that any given object will not be written out to the clients more than once per interval. """ self.game_service.update_active_game_metrics() dirty_games = self.game_service.pop_dirty_games() dirty_queues = self.game_service.pop_dirty_queues() dirty_players = self.player_service.pop_dirty_players() if dirty_queues: matchmaker_info = { "command": "matchmaker_info", "queues": [queue.to_dict() for queue in dirty_queues] } self.server.write_broadcast(matchmaker_info) if dirty_players: player_info = { "command": "player_info", "players": [player.to_dict() for player in dirty_players] } self.server.write_broadcast(player_info) game_info = { "command": "game_info", "games": [] } # TODO: This spams squillions of messages: we should implement per- # connection message aggregation at the next abstraction layer down :P for game in dirty_games: if game.state == GameState.ENDED: self.game_service.remove_game(game) # So we're going to be broadcasting this to _somebody_... message = game.to_dict() game_info["games"].append(message) self.server.write_broadcast( message, lambda conn: ( conn.authenticated and game.is_visible_to_player(conn.player) ) ) if dirty_queues: await self.message_queue_service.publish( config.MQ_EXCHANGE_NAME, "broadcast.matchmakerInfo.update", matchmaker_info, delivery_mode=DeliveryMode.NOT_PERSISTENT ) if dirty_players: await self.message_queue_service.publish( config.MQ_EXCHANGE_NAME, "broadcast.playerInfo.update", player_info, delivery_mode=DeliveryMode.NOT_PERSISTENT ) if dirty_games: await self.message_queue_service.publish( config.MQ_EXCHANGE_NAME, "broadcast.gameInfo.update", game_info, delivery_mode=DeliveryMode.NOT_PERSISTENT ) def broadcast_ping(self): self.server.write_broadcast({"command": "ping"}) async def wait_report_dirtes(self): """ Wait for the current report_dirties task to complete. """ if self._report_dirties_event is None: return await self._report_dirties_event.wait() async def graceful_shutdown(self): if config.SHUTDOWN_KICK_IDLE_PLAYERS: message = ( "If you're in a game you can continue to play, otherwise you " "will be disconnected. If you aren't reconnected automatically " "please wait a few minutes and try to connect again." ) else: message = ( "If you're in a game you can continue to play, however, you " "will not be able to create any new games until the server has " "been restarted." ) delta = humanize.precisedelta(config.SHUTDOWN_GRACE_PERIOD) self.server.write_broadcast({ "command": "notice", "style": "info", "text": ( f"The server will be shutting down for maintenance in {delta}! " f"{message}" ) }) async def shutdown(self): self.server.write_broadcast({ "command": "notice", "style": "info", "text": ( "The server has been shut down for maintenance " "but should be back online soon. If you experience any " "problems, please restart your client. <br/><br/>" "We apologize for this interruption." ) })
Broadcast updates about changed entities.
Ancestors
Methods
def broadcast_ping(self)
-
Expand source code
def broadcast_ping(self): self.server.write_broadcast({"command": "ping"})
async def report_dirties(self)
-
Expand source code
async def report_dirties(self): """ Send updates about any dirty (changed) entities to connected players. This function is called at a fixed interval, which guarantees that any given object will not be written out to the clients more than once per interval. """ self.game_service.update_active_game_metrics() dirty_games = self.game_service.pop_dirty_games() dirty_queues = self.game_service.pop_dirty_queues() dirty_players = self.player_service.pop_dirty_players() if dirty_queues: matchmaker_info = { "command": "matchmaker_info", "queues": [queue.to_dict() for queue in dirty_queues] } self.server.write_broadcast(matchmaker_info) if dirty_players: player_info = { "command": "player_info", "players": [player.to_dict() for player in dirty_players] } self.server.write_broadcast(player_info) game_info = { "command": "game_info", "games": [] } # TODO: This spams squillions of messages: we should implement per- # connection message aggregation at the next abstraction layer down :P for game in dirty_games: if game.state == GameState.ENDED: self.game_service.remove_game(game) # So we're going to be broadcasting this to _somebody_... message = game.to_dict() game_info["games"].append(message) self.server.write_broadcast( message, lambda conn: ( conn.authenticated and game.is_visible_to_player(conn.player) ) ) if dirty_queues: await self.message_queue_service.publish( config.MQ_EXCHANGE_NAME, "broadcast.matchmakerInfo.update", matchmaker_info, delivery_mode=DeliveryMode.NOT_PERSISTENT ) if dirty_players: await self.message_queue_service.publish( config.MQ_EXCHANGE_NAME, "broadcast.playerInfo.update", player_info, delivery_mode=DeliveryMode.NOT_PERSISTENT ) if dirty_games: await self.message_queue_service.publish( config.MQ_EXCHANGE_NAME, "broadcast.gameInfo.update", game_info, delivery_mode=DeliveryMode.NOT_PERSISTENT )
Send updates about any dirty (changed) entities to connected players. This function is called at a fixed interval, which guarantees that any given object will not be written out to the clients more than once per interval.
async def wait_report_dirtes(self)
-
Expand source code
async def wait_report_dirtes(self): """ Wait for the current report_dirties task to complete. """ if self._report_dirties_event is None: return await self._report_dirties_event.wait()
Wait for the current report_dirties task to complete.
Inherited members
class ConfigurationService
-
Expand source code
@with_logger class ConfigurationService(Service): def __init__(self) -> None: self._store = config self._task = None async def initialize(self) -> None: self._task = asyncio.create_task(self._worker_loop()) self._logger.info("Configuration service initialized") async def _worker_loop(self) -> None: while True: try: self._logger.debug("Refreshing configuration variables") self._store.refresh() await asyncio.sleep(self._store.CONFIGURATION_REFRESH_TIME) except Exception: self._logger.exception("Error while refreshing config") # To prevent a busy loop await asyncio.sleep(60) async def shutdown(self) -> None: if self._task is not None: self._logger.info("Configuration service stopping.") self._task.cancel() self._task = None
All services should inherit from this class.
Services are singleton objects which manage some server task.
Ancestors
Inherited members
class GameConnection (database: FAFDatabase,
game: Game,
player: Player,
protocol: Protocol,
player_service: PlayerService,
games: GameService,
state: GameConnectionState = GameConnectionState.INITIALIZING,
setup_timeout: int = 60)-
Expand source code
class GameConnection(GpgNetServerProtocol): """ Responsible for connections to the game, using the GPGNet protocol """ def __init__( self, database: FAFDatabase, game: Game, player: Player, protocol: Protocol, player_service: PlayerService, games: GameService, state: GameConnectionState = GameConnectionState.INITIALIZING, setup_timeout: int = 60, ): """ Construct a new GameConnection """ super().__init__() self._logger = logging.getLogger( f"{self.__class__.__qualname__}.{game.id}" ) self._db = database self.protocol = protocol self._state = state self.game_service = games self.player_service = player_service self.player = player player.game_connection = self # Set up weak reference to self self.game = game self.setup_timeout = setup_timeout self.finished_sim = False self._logger.debug("GameConnection initializing") if self.state is GameConnectionState.INITIALIZING: asyncio.get_event_loop().create_task( self.timeout_game_connection(setup_timeout) ) async def timeout_game_connection(self, timeout): await asyncio.sleep(timeout) if self.state is GameConnectionState.INITIALIZING: self._logger.debug("GameConection timed out...") await self.abort("Player took too long to start the game") @property def state(self) -> GameConnectionState: return self._state def is_host(self) -> bool: if not self.game or not self.player: return False return ( self.player.state == PlayerState.HOSTING and self.player == self.game.host ) async def send(self, message): """ Send a game message to the client. # Errors May raise `DisconnectedError` NOTE: When calling this on a connection other than `self` make sure to handle `DisconnectedError`, otherwise failure to send the message will cause the caller to be disconnected as well. """ message["target"] = "game" self._logger.log(TRACE, ">> %s: %s", self.player.login, message) await self.protocol.send_message(message) async def _handle_idle_state(self): """ This message is sent by FA when it doesn't know what to do. """ assert self.game if self.player == self.game.host: self.game.state = GameState.LOBBY self._state = GameConnectionState.CONNECTED_TO_HOST self.game.add_game_connection(self) self.player.state = PlayerState.HOSTING else: self._state = GameConnectionState.INITIALIZED self.player.state = PlayerState.JOINING async def _handle_lobby_state(self): """ The game has told us it is ready and listening on self.player.game_port for UDP. We determine the connectivity of the peer and respond appropriately """ player_state = self.player.state if player_state == PlayerState.HOSTING: await self.send_HostGame(self.game.map.folder_name) self.game.set_hosted() # If the player is joining, we connect him to host # followed by the rest of the players. elif player_state == PlayerState.JOINING: await self.connect_to_host(self.game.host.game_connection) if self._state is GameConnectionState.ENDED: # We aborted while trying to connect return self._state = GameConnectionState.CONNECTED_TO_HOST try: self.game.add_game_connection(self) except GameError as e: await self.abort(f"GameError while joining {self.game.id}: {e}") return tasks = [] for peer in self.game.connections: if peer != self and peer.player != self.game.host: self._logger.debug("%s connecting to %s", self.player, peer) tasks.append(self.connect_to_peer(peer)) await asyncio.gather(*tasks) async def connect_to_host(self, peer: "GameConnection"): """ Connect self to a given peer (host) """ if not peer or peer.player.state != PlayerState.HOSTING: await self.abort("The host left the lobby") return await self.send_JoinGame(peer.player.login, peer.player.id) if not peer: await self.abort("The host left the lobby") return await peer.send_ConnectToPeer( player_name=self.player.login, player_uid=self.player.id, offer=True ) async def connect_to_peer(self, peer: "GameConnection"): """ Connect two peers """ if peer is not None: await self.send_ConnectToPeer( player_name=peer.player.login, player_uid=peer.player.id, offer=True ) if peer is not None: with contextlib.suppress(DisconnectedError): await peer.send_ConnectToPeer( player_name=self.player.login, player_uid=self.player.id, offer=False ) async def handle_action(self, command: str, args: list[Any]): """ Handle GpgNetSend messages, wrapped in the JSON protocol """ try: await COMMAND_HANDLERS[command](self, *args) except KeyError: self._logger.warning( "Unrecognized command %s: %s from player %s", command, args, self.player ) except (TypeError, ValueError): self._logger.exception("Bad command arguments") except ConnectionError as e: raise e except Exception as e: # pragma: no cover self._logger.exception( "Something awful happened in a game thread! %s", e ) await self.abort() async def handle_desync(self, *_args): # pragma: no cover self.game.desyncs += 1 async def handle_game_option(self, key: str, value: Any): if not self.is_host(): return await self.game.game_options.set_option(key, value) self._mark_dirty() async def handle_game_mods(self, mode: Any, args: list[Any]): if not self.is_host(): return if mode == "activated": # In this case args is the number of mods if int(args) == 0: self.game.mods = {} elif mode == "uids": uids = str(args).split() self.game.mods = {uid: "Unknown sim mod" for uid in uids} async with self._db.acquire() as conn: rows = await conn.execute( "SELECT `uid`, `name` from `table_mod` WHERE `uid` in :ids", ids=tuple(uids) ) for row in rows: self.game.mods[row.uid] = row.name else: self._logger.warning("Ignoring game mod: %s, %s", mode, args) return self._mark_dirty() async def handle_player_option( self, player_id: Any, key: Any, value: Any ): if not self.is_host(): return self.game.set_player_option(int(player_id), key, value) self._mark_dirty() async def handle_ai_option(self, name: Any, key: Any, value: Any): if not self.is_host(): return self.game.set_ai_option(str(name), key, value) self._mark_dirty() async def handle_clear_slot(self, slot: Any): if not self.is_host(): return self.game.clear_slot(int(slot)) self._mark_dirty() async def handle_game_result(self, army: Any, result: Any): army = int(army) result = str(result).lower() try: *metadata, result_type, score = result.split() except ValueError: self._logger.warning("Invalid result for %s reported: %s", army, result) else: await self.game.add_result( self.player.id, army, result_type, int(score), frozenset(metadata), ) async def handle_operation_complete( self, primary: Any, secondary: Any, delta: str ): """ # Params - `primary`: are primary mission objectives complete? - `secondary`: are secondary mission objectives complete? - `delta`: the time it took to complete the mission """ primary = FA.ENABLED == primary secondary = FA.ENABLED == secondary if not primary: return if not isinstance(self.game, CoopGame): self._logger.warning( "OperationComplete called for non-coop game: %s", self.game.id ) return if self.game.validity != ValidityState.COOP_NOT_RANKED: return secondary, delta = secondary, str(delta) async with self._db.acquire() as conn: result = await conn.execute( select(coop_map.c.id).where( coop_map.c.filename == self.game.map.file_path ) ) row = result.fetchone() if not row: self._logger.debug( "can't find coop map: %s", self.game.map.file_path ) return mission = row.id # Each player in a co-op game will send the OperationComplete # message but we only need to perform this insert once async with self.game.leaderboard_lock: if not self.game.leaderboard_saved: await conn.execute( coop_leaderboard.insert().values( mission=mission, gameuid=self.game.id, secondary=secondary, time=delta, player_count=len(self.game.players), ) ) self.game.leaderboard_saved = True async def handle_json_stats(self, stats: str): try: self.game.report_army_stats(stats) except json.JSONDecodeError as e: self._logger.warning( "Malformed game stats reported by %s: '...%s...'", self.player.login, stats[e.pos-20:e.pos+20] ) async def handle_enforce_rating(self): self.game.enforce_rating = True async def handle_teamkill_report( self, gametime: Any, reporter_id: Any, reporter_name: str, teamkiller_id: Any, teamkiller_name: str, ): """ Sent when a player is teamkilled and clicks the 'Report' button. # Params - `gametime`: seconds of gametime when kill happened - `reporter_id`: reporter id - `reporter_name`: reporter nickname (for debug purpose only) - `teamkiller_id`: teamkiller id - `teamkiller_name`: teamkiller nickname (for debug purpose only) """ pass async def handle_teamkill_happened( self, gametime: Any, victim_id: Any, victim_name: str, teamkiller_id: Any, teamkiller_name: str, ): """ Send automatically by the game whenever a teamkill happens. Takes the same parameters as TeamkillReport. # Params - `gametime`: seconds of gametime when kill happened - `victim_id`: victim id - `victim_name`: victim nickname (for debug purpose only) - `teamkiller_id`: teamkiller id - `teamkiller_name`: teamkiller nickname (for debug purpose only) """ victim_id = int(victim_id) teamkiller_id = int(teamkiller_id) if 0 in (victim_id, teamkiller_id): self._logger.debug("Ignoring teamkill for AI player") return async with self._db.acquire() as conn: await conn.execute( teamkills.insert().values( teamkiller=teamkiller_id, victim=victim_id, game_id=self.game.id, gametime=gametime, ) ) async def handle_ice_message(self, receiver_id: Any, ice_msg: str): receiver_id = int(receiver_id) peer = self.player_service.get_player(receiver_id) if not peer: self._logger.debug( "Ignoring ICE message for unknown player: %s", receiver_id ) return game_connection = peer.game_connection if not game_connection: self._logger.debug( "Ignoring ICE message for player without game connection: %s", receiver_id ) return try: await game_connection.send({ "command": "IceMsg", "args": [int(self.player.id), ice_msg] }) except DisconnectedError: self._logger.debug( "Failed to send ICE message to player due to a disconnect: %s", receiver_id ) async def handle_game_state(self, state: str): """ Changes in game state """ if state == "Idle": await self._handle_idle_state() # Don't mark as dirty return elif state == "Lobby": # TODO: Do we still need to schedule with `ensure_future`? # # We do not yield from the task, since we # need to keep processing other commands while it runs await self._handle_lobby_state() elif state == "Launching": if self.player.state != PlayerState.HOSTING: return if self.game.state is not GameState.LOBBY: self._logger.warning( "Trying to launch game %s in invalid state %s", self.game, self.game.state ) return self._logger.info("Launching game %s", self.game) await self.game.launch() if len(self.game.mods.keys()) > 0: async with self._db.acquire() as conn: uids = list(self.game.mods.keys()) await conn.execute( "UPDATE mod_stats s JOIN mod_version v ON " "v.mod_id = s.mod_id " "SET s.times_played = s.times_played + 1 " "WHERE v.uid in :ids", ids=tuple(uids) ) # Signals that the FA executable has been closed elif state == "Ended": await self.on_connection_closed() self._mark_dirty() async def handle_game_ended(self, *args: list[Any]): """ Signals that the simulation has ended. """ self.finished_sim = True await self.game.check_game_finish(self.player) async def handle_rehost(self, *args: list[Any]): """ Signals that the user has rehosted the game. This is currently unused but included for documentation purposes. """ pass async def handle_launch_status(self, status: str): """ Currently is sent with status `Rejected` if a matchmaker game failed to start due to players using differing game settings. """ pass async def handle_bottleneck(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass async def handle_bottleneck_cleared(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass async def handle_disconnected(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass async def handle_chat(self, message: str): """ Whenever the player sends a chat message during the game lobby. """ pass async def handle_game_full(self): """ Sent when all game slots are full """ pass def _mark_dirty(self): if self.game: self.game_service.mark_dirty(self.game) async def abort(self, log_message: str = ""): """ Abort the connection Removes the GameConnection object from the any associated Game object, and deletes references to Player and Game held by this object. """ try: if self._state is GameConnectionState.ENDED: return self._logger.debug("%s.abort(%s)", self, log_message) if self.game.state is GameState.LOBBY: await self.disconnect_all_peers() self._state = GameConnectionState.ENDED await self.game.remove_game_connection(self) self._mark_dirty() self.player.state = PlayerState.IDLE if self.player.lobby_connection: self.player.lobby_connection.game_connection = None del self.player.game del self.player.game_connection except Exception as ex: # pragma: no cover self._logger.debug("Exception in abort(): %s", ex) async def disconnect_all_peers(self): tasks = [] for peer in self.game.connections: if peer == self: continue tasks.append(peer.send_DisconnectFromPeer(self.player.id)) for fut in asyncio.as_completed(tasks): try: await fut except Exception: self._logger.debug( "peer_sendDisconnectFromPeer failed for player %i", self.player.id, exc_info=True ) async def on_connection_closed(self): """ The connection is closed by the player. """ try: await self.game.disconnect_player(self.player) except Exception as e: # pragma: no cover self._logger.exception(e) finally: await self.abort() async def on_connection_lost(self): """ The connection is lost due to a disconnect from the lobby server. """ try: await self.game.remove_game_connection(self) except Exception as e: # pragma: no cover self._logger.exception(e) finally: await self.abort() def __str__(self): return f"GameConnection({self.player}, {self.game})"
Responsible for connections to the game, using the GPGNet protocol
Construct a new GameConnection
Ancestors
Instance variables
prop state : GameConnectionState
-
Expand source code
@property def state(self) -> GameConnectionState: return self._state
Methods
async def abort(self, log_message: str = '')
-
Expand source code
async def abort(self, log_message: str = ""): """ Abort the connection Removes the GameConnection object from the any associated Game object, and deletes references to Player and Game held by this object. """ try: if self._state is GameConnectionState.ENDED: return self._logger.debug("%s.abort(%s)", self, log_message) if self.game.state is GameState.LOBBY: await self.disconnect_all_peers() self._state = GameConnectionState.ENDED await self.game.remove_game_connection(self) self._mark_dirty() self.player.state = PlayerState.IDLE if self.player.lobby_connection: self.player.lobby_connection.game_connection = None del self.player.game del self.player.game_connection except Exception as ex: # pragma: no cover self._logger.debug("Exception in abort(): %s", ex)
Abort the connection
Removes the GameConnection object from the any associated Game object, and deletes references to Player and Game held by this object.
async def connect_to_host(self,
peer: GameConnection)-
Expand source code
async def connect_to_host(self, peer: "GameConnection"): """ Connect self to a given peer (host) """ if not peer or peer.player.state != PlayerState.HOSTING: await self.abort("The host left the lobby") return await self.send_JoinGame(peer.player.login, peer.player.id) if not peer: await self.abort("The host left the lobby") return await peer.send_ConnectToPeer( player_name=self.player.login, player_uid=self.player.id, offer=True )
Connect self to a given peer (host)
async def connect_to_peer(self,
peer: GameConnection)-
Expand source code
async def connect_to_peer(self, peer: "GameConnection"): """ Connect two peers """ if peer is not None: await self.send_ConnectToPeer( player_name=peer.player.login, player_uid=peer.player.id, offer=True ) if peer is not None: with contextlib.suppress(DisconnectedError): await peer.send_ConnectToPeer( player_name=self.player.login, player_uid=self.player.id, offer=False )
Connect two peers
async def disconnect_all_peers(self)
-
Expand source code
async def disconnect_all_peers(self): tasks = [] for peer in self.game.connections: if peer == self: continue tasks.append(peer.send_DisconnectFromPeer(self.player.id)) for fut in asyncio.as_completed(tasks): try: await fut except Exception: self._logger.debug( "peer_sendDisconnectFromPeer failed for player %i", self.player.id, exc_info=True )
async def handle_action(self, command: str, args: list[typing.Any])
-
Expand source code
async def handle_action(self, command: str, args: list[Any]): """ Handle GpgNetSend messages, wrapped in the JSON protocol """ try: await COMMAND_HANDLERS[command](self, *args) except KeyError: self._logger.warning( "Unrecognized command %s: %s from player %s", command, args, self.player ) except (TypeError, ValueError): self._logger.exception("Bad command arguments") except ConnectionError as e: raise e except Exception as e: # pragma: no cover self._logger.exception( "Something awful happened in a game thread! %s", e ) await self.abort()
Handle GpgNetSend messages, wrapped in the JSON protocol
async def handle_ai_option(self, name: Any, key: Any, value: Any)
-
Expand source code
async def handle_ai_option(self, name: Any, key: Any, value: Any): if not self.is_host(): return self.game.set_ai_option(str(name), key, value) self._mark_dirty()
async def handle_bottleneck(self, *args: list[typing.Any])
-
Expand source code
async def handle_bottleneck(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass
Not sure what this command means. This is currently unused but included for documentation purposes.
async def handle_bottleneck_cleared(self, *args: list[typing.Any])
-
Expand source code
async def handle_bottleneck_cleared(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass
Not sure what this command means. This is currently unused but included for documentation purposes.
async def handle_chat(self, message: str)
-
Expand source code
async def handle_chat(self, message: str): """ Whenever the player sends a chat message during the game lobby. """ pass
Whenever the player sends a chat message during the game lobby.
async def handle_clear_slot(self, slot: Any)
-
Expand source code
async def handle_clear_slot(self, slot: Any): if not self.is_host(): return self.game.clear_slot(int(slot)) self._mark_dirty()
async def handle_desync(self, *_args)
-
Expand source code
async def handle_desync(self, *_args): # pragma: no cover self.game.desyncs += 1
async def handle_disconnected(self, *args: list[typing.Any])
-
Expand source code
async def handle_disconnected(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass
Not sure what this command means. This is currently unused but included for documentation purposes.
async def handle_enforce_rating(self)
-
Expand source code
async def handle_enforce_rating(self): self.game.enforce_rating = True
async def handle_game_ended(self, *args: list[typing.Any])
-
Expand source code
async def handle_game_ended(self, *args: list[Any]): """ Signals that the simulation has ended. """ self.finished_sim = True await self.game.check_game_finish(self.player)
Signals that the simulation has ended.
async def handle_game_full(self)
-
Expand source code
async def handle_game_full(self): """ Sent when all game slots are full """ pass
Sent when all game slots are full
async def handle_game_mods(self, mode: Any, args: list[typing.Any])
-
Expand source code
async def handle_game_mods(self, mode: Any, args: list[Any]): if not self.is_host(): return if mode == "activated": # In this case args is the number of mods if int(args) == 0: self.game.mods = {} elif mode == "uids": uids = str(args).split() self.game.mods = {uid: "Unknown sim mod" for uid in uids} async with self._db.acquire() as conn: rows = await conn.execute( "SELECT `uid`, `name` from `table_mod` WHERE `uid` in :ids", ids=tuple(uids) ) for row in rows: self.game.mods[row.uid] = row.name else: self._logger.warning("Ignoring game mod: %s, %s", mode, args) return self._mark_dirty()
async def handle_game_option(self, key: str, value: Any)
-
Expand source code
async def handle_game_option(self, key: str, value: Any): if not self.is_host(): return await self.game.game_options.set_option(key, value) self._mark_dirty()
async def handle_game_result(self, army: Any, result: Any)
-
Expand source code
async def handle_game_result(self, army: Any, result: Any): army = int(army) result = str(result).lower() try: *metadata, result_type, score = result.split() except ValueError: self._logger.warning("Invalid result for %s reported: %s", army, result) else: await self.game.add_result( self.player.id, army, result_type, int(score), frozenset(metadata), )
async def handle_game_state(self, state: str)
-
Expand source code
async def handle_game_state(self, state: str): """ Changes in game state """ if state == "Idle": await self._handle_idle_state() # Don't mark as dirty return elif state == "Lobby": # TODO: Do we still need to schedule with `ensure_future`? # # We do not yield from the task, since we # need to keep processing other commands while it runs await self._handle_lobby_state() elif state == "Launching": if self.player.state != PlayerState.HOSTING: return if self.game.state is not GameState.LOBBY: self._logger.warning( "Trying to launch game %s in invalid state %s", self.game, self.game.state ) return self._logger.info("Launching game %s", self.game) await self.game.launch() if len(self.game.mods.keys()) > 0: async with self._db.acquire() as conn: uids = list(self.game.mods.keys()) await conn.execute( "UPDATE mod_stats s JOIN mod_version v ON " "v.mod_id = s.mod_id " "SET s.times_played = s.times_played + 1 " "WHERE v.uid in :ids", ids=tuple(uids) ) # Signals that the FA executable has been closed elif state == "Ended": await self.on_connection_closed() self._mark_dirty()
Changes in game state
async def handle_ice_message(self, receiver_id: Any, ice_msg: str)
-
Expand source code
async def handle_ice_message(self, receiver_id: Any, ice_msg: str): receiver_id = int(receiver_id) peer = self.player_service.get_player(receiver_id) if not peer: self._logger.debug( "Ignoring ICE message for unknown player: %s", receiver_id ) return game_connection = peer.game_connection if not game_connection: self._logger.debug( "Ignoring ICE message for player without game connection: %s", receiver_id ) return try: await game_connection.send({ "command": "IceMsg", "args": [int(self.player.id), ice_msg] }) except DisconnectedError: self._logger.debug( "Failed to send ICE message to player due to a disconnect: %s", receiver_id )
async def handle_json_stats(self, stats: str)
-
Expand source code
async def handle_json_stats(self, stats: str): try: self.game.report_army_stats(stats) except json.JSONDecodeError as e: self._logger.warning( "Malformed game stats reported by %s: '...%s...'", self.player.login, stats[e.pos-20:e.pos+20] )
async def handle_launch_status(self, status: str)
-
Expand source code
async def handle_launch_status(self, status: str): """ Currently is sent with status `Rejected` if a matchmaker game failed to start due to players using differing game settings. """ pass
Currently is sent with status
Rejected
if a matchmaker game failed to start due to players using differing game settings. async def handle_operation_complete(self, primary: Any, secondary: Any, delta: str)
-
Expand source code
async def handle_operation_complete( self, primary: Any, secondary: Any, delta: str ): """ # Params - `primary`: are primary mission objectives complete? - `secondary`: are secondary mission objectives complete? - `delta`: the time it took to complete the mission """ primary = FA.ENABLED == primary secondary = FA.ENABLED == secondary if not primary: return if not isinstance(self.game, CoopGame): self._logger.warning( "OperationComplete called for non-coop game: %s", self.game.id ) return if self.game.validity != ValidityState.COOP_NOT_RANKED: return secondary, delta = secondary, str(delta) async with self._db.acquire() as conn: result = await conn.execute( select(coop_map.c.id).where( coop_map.c.filename == self.game.map.file_path ) ) row = result.fetchone() if not row: self._logger.debug( "can't find coop map: %s", self.game.map.file_path ) return mission = row.id # Each player in a co-op game will send the OperationComplete # message but we only need to perform this insert once async with self.game.leaderboard_lock: if not self.game.leaderboard_saved: await conn.execute( coop_leaderboard.insert().values( mission=mission, gameuid=self.game.id, secondary=secondary, time=delta, player_count=len(self.game.players), ) ) self.game.leaderboard_saved = True
Params
primary
: are primary mission objectives complete?secondary
: are secondary mission objectives complete?delta
: the time it took to complete the mission
async def handle_player_option(self, player_id: Any, key: Any, value: Any)
-
Expand source code
async def handle_player_option( self, player_id: Any, key: Any, value: Any ): if not self.is_host(): return self.game.set_player_option(int(player_id), key, value) self._mark_dirty()
async def handle_rehost(self, *args: list[typing.Any])
-
Expand source code
async def handle_rehost(self, *args: list[Any]): """ Signals that the user has rehosted the game. This is currently unused but included for documentation purposes. """ pass
Signals that the user has rehosted the game. This is currently unused but included for documentation purposes.
async def handle_teamkill_happened(self,
gametime: Any,
victim_id: Any,
victim_name: str,
teamkiller_id: Any,
teamkiller_name: str)-
Expand source code
async def handle_teamkill_happened( self, gametime: Any, victim_id: Any, victim_name: str, teamkiller_id: Any, teamkiller_name: str, ): """ Send automatically by the game whenever a teamkill happens. Takes the same parameters as TeamkillReport. # Params - `gametime`: seconds of gametime when kill happened - `victim_id`: victim id - `victim_name`: victim nickname (for debug purpose only) - `teamkiller_id`: teamkiller id - `teamkiller_name`: teamkiller nickname (for debug purpose only) """ victim_id = int(victim_id) teamkiller_id = int(teamkiller_id) if 0 in (victim_id, teamkiller_id): self._logger.debug("Ignoring teamkill for AI player") return async with self._db.acquire() as conn: await conn.execute( teamkills.insert().values( teamkiller=teamkiller_id, victim=victim_id, game_id=self.game.id, gametime=gametime, ) )
Send automatically by the game whenever a teamkill happens. Takes the same parameters as TeamkillReport.
Params
gametime
: seconds of gametime when kill happenedvictim_id
: victim idvictim_name
: victim nickname (for debug purpose only)teamkiller_id
: teamkiller idteamkiller_name
: teamkiller nickname (for debug purpose only)
async def handle_teamkill_report(self,
gametime: Any,
reporter_id: Any,
reporter_name: str,
teamkiller_id: Any,
teamkiller_name: str)-
Expand source code
async def handle_teamkill_report( self, gametime: Any, reporter_id: Any, reporter_name: str, teamkiller_id: Any, teamkiller_name: str, ): """ Sent when a player is teamkilled and clicks the 'Report' button. # Params - `gametime`: seconds of gametime when kill happened - `reporter_id`: reporter id - `reporter_name`: reporter nickname (for debug purpose only) - `teamkiller_id`: teamkiller id - `teamkiller_name`: teamkiller nickname (for debug purpose only) """ pass
Sent when a player is teamkilled and clicks the 'Report' button.
Params
gametime
: seconds of gametime when kill happenedreporter_id
: reporter idreporter_name
: reporter nickname (for debug purpose only)teamkiller_id
: teamkiller idteamkiller_name
: teamkiller nickname (for debug purpose only)
def is_host(self) ‑> bool
-
Expand source code
def is_host(self) -> bool: if not self.game or not self.player: return False return ( self.player.state == PlayerState.HOSTING and self.player == self.game.host )
async def on_connection_closed(self)
-
Expand source code
async def on_connection_closed(self): """ The connection is closed by the player. """ try: await self.game.disconnect_player(self.player) except Exception as e: # pragma: no cover self._logger.exception(e) finally: await self.abort()
The connection is closed by the player.
async def on_connection_lost(self)
-
Expand source code
async def on_connection_lost(self): """ The connection is lost due to a disconnect from the lobby server. """ try: await self.game.remove_game_connection(self) except Exception as e: # pragma: no cover self._logger.exception(e) finally: await self.abort()
The connection is lost due to a disconnect from the lobby server.
async def send(self, message)
-
Expand source code
async def send(self, message): """ Send a game message to the client. # Errors May raise `DisconnectedError` NOTE: When calling this on a connection other than `self` make sure to handle `DisconnectedError`, otherwise failure to send the message will cause the caller to be disconnected as well. """ message["target"] = "game" self._logger.log(TRACE, ">> %s: %s", self.player.login, message) await self.protocol.send_message(message)
Send a game message to the client.
Errors
May raise
DisconnectedError
NOTE: When calling this on a connection other than
self
make sure to handleDisconnectedError
, otherwise failure to send the message will cause the caller to be disconnected as well. async def timeout_game_connection(self, timeout)
-
Expand source code
async def timeout_game_connection(self, timeout): await asyncio.sleep(timeout) if self.state is GameConnectionState.INITIALIZING: self._logger.debug("GameConection timed out...") await self.abort("Player took too long to start the game")
Inherited members
class GameService (database: FAFDatabase,
player_service,
game_stats_service,
rating_service: RatingService,
message_queue_service: MessageQueueService)-
Expand source code
@with_logger class GameService(Service): """ Utility class for maintaining lifecycle of games """ def __init__( self, database: FAFDatabase, player_service, game_stats_service, rating_service: RatingService, message_queue_service: MessageQueueService ): self._db = database self._dirty_games: set[Game] = set() self._dirty_queues: set[MatchmakerQueue] = set() self.player_service = player_service self.game_stats_service = game_stats_service self._rating_service = rating_service self._message_queue_service = message_queue_service self.game_id_counter = 0 self._allow_new_games = False self._drain_event = None # Populated below in update_data. self.featured_mods = dict() # A set of mod ids that are allowed in ranked games self.ranked_mods: set[str] = set() # A cache of map_version info needed by Game self.map_info_cache = LRUCache(maxsize=256) # The set of active games self._games: dict[int, Game] = dict() async def initialize(self) -> None: await self.initialise_game_counter() await self.update_data() self._update_cron = aiocron.crontab( "*/10 * * * *", func=self.update_data ) self._allow_new_games = True async def initialise_game_counter(self): async with self._db.acquire() as conn: # InnoDB, unusually, doesn't allow insertion of values greater than the next expected # value into an auto_increment field. We'd like to do that, because we no longer insert # games into the database when they don't start, so game ids aren't contiguous (as # unstarted games consume ids that never get written out). # So, id has to just be an integer primary key, no auto-increment: we handle its # incrementing here in game service, but have to do this slightly expensive query on # startup (though the primary key index probably makes it super fast anyway). # This is definitely a better choice than inserting useless rows when games are created, # doing LAST_UPDATE_ID to get the id number, and then doing an UPDATE when the actual # data to go into the row becomes available: we now only do a single insert for each # game, and don't end up with 800,000 junk rows in the database. sql = "SELECT MAX(id) FROM game_stats" self.game_id_counter = await conn.scalar(sql) or 0 async def update_data(self): """ Loads from the database the mostly-constant things that it doesn't make sense to query every time we need, but which can in principle change over time. """ async with self._db.acquire() as conn: rows = await conn.execute( select( game_featuredMods.c.id, game_featuredMods.c.gamemod, game_featuredMods.c.name, game_featuredMods.c.description, game_featuredMods.c.publish, game_featuredMods.c.order ) ) for row in rows: self.featured_mods[row.gamemod] = FeaturedMod( row.id, row.gamemod, row.name, row.description, row.publish, row.order ) result = await conn.execute( "SELECT uid FROM table_mod WHERE ranked = 1" ) # Turn resultset into a list of uids self.ranked_mods = {row.uid for row in result} async def get_map(self, folder_name: str) -> Map: folder_name = folder_name.lower() filename = f"maps/{folder_name}.zip" map = self.map_info_cache.get(filename) if map is not None: return map async with self._db.acquire() as conn: result = await conn.execute( select( map_version.c.id, map_version.c.filename, map_version.c.ranked, ) .where( func.lower(map_version.c.filename) == filename ) ) row = result.fetchone() if not row: # The map requested is not in the database. This is fine as # players may be using privately shared or generated maps that # are not in the vault. return Map( id=None, folder_name=folder_name, ranked=NeroxisGeneratedMap.is_neroxis_map(folder_name), ) map = Map( id=row.id, folder_name=folder_name, ranked=row.ranked ) self.map_info_cache[filename] = map return map def mark_dirty(self, obj: Union[Game, MatchmakerQueue]): if isinstance(obj, Game): self._dirty_games.add(obj) elif isinstance(obj, MatchmakerQueue): self._dirty_queues.add(obj) def pop_dirty_games(self) -> set[Game]: dirty_games = self._dirty_games self._dirty_games = set() return dirty_games def pop_dirty_queues(self) -> set[MatchmakerQueue]: dirty_queues = self._dirty_queues self._dirty_queues = set() return dirty_queues def create_uid(self) -> int: self.game_id_counter += 1 return self.game_id_counter def create_game( self, game_mode: str, game_class: type[Game] = CustomGame, visibility=VisibilityState.PUBLIC, host: Optional[Player] = None, name: Optional[str] = None, map: Map = MAP_DEFAULT, password: Optional[str] = None, matchmaker_queue_id: Optional[int] = None, **kwargs ): """ Main entrypoint for creating new games """ if not self._allow_new_games: raise DisabledError() game_id = self.create_uid() game_args = { "database": self._db, "id": game_id, "host": host, "name": name, "map": map, "game_mode": game_mode, "game_service": self, "game_stats_service": self.game_stats_service, "matchmaker_queue_id": matchmaker_queue_id, } game_args.update(kwargs) game = game_class(**game_args) self._games[game_id] = game game.visibility = visibility game.password = password self.mark_dirty(game) return game def update_active_game_metrics(self): modes = list(self.featured_mods.keys()) game_counter = Counter( ( game.game_mode if game.game_mode in modes else "other", game.state ) for game in self._games.values() ) for state in GameState: for mode in modes + ["other"]: metrics.active_games.labels(mode, state.name).set( game_counter[(mode, state)] ) rating_type_counter = Counter( ( game.rating_type, game.state ) for game in self._games.values() ) for state in GameState: for rating_type in rating_type_counter.keys(): metrics.active_games_by_rating_type.labels(rating_type, state.name).set( rating_type_counter[(rating_type, state)] ) @property def all_games(self) -> ValuesView[Game]: return self._games.values() @property def live_games(self) -> list[Game]: return [ game for game in self.all_games if game.state is GameState.LIVE ] @property def open_games(self) -> list[Game]: """ Return all games that meet the client's definition of "not closed". Server game states are mapped to client game states as follows: GameState.LOBBY: "open", GameState.LIVE: "playing", GameState.ENDED: "closed", GameState.INITIALIZING: "closed", The client ignores everything "closed". This property fetches all such not-closed games. """ return [ game for game in self.all_games if game.state in (GameState.LOBBY, GameState.LIVE) ] @property def pending_games(self) -> list[Game]: return [ game for game in self.all_games if game.state in (GameState.LOBBY, GameState.INITIALIZING) ] def remove_game(self, game: Game): if game.id in self._games: self._logger.debug("Removing game %s", game) del self._games[game.id] if ( self._drain_event is not None and not self._drain_event.is_set() and not self._games ): self._drain_event.set() def __getitem__(self, item: int) -> Game: return self._games[item] def __contains__(self, item): return item in self._games async def publish_game_results(self, game_results: EndedGameInfo): result_dict = game_results.to_dict() await self._message_queue_service.publish( config.MQ_EXCHANGE_NAME, "success.gameResults.create", result_dict, ) if ( game_results.validity is ValidityState.VALID and game_results.rating_type is not None ): metrics.rated_games.labels(game_results.rating_type).inc() # TODO: Remove when rating service starts listening to message queue await self._rating_service.enqueue(result_dict) async def drain_games(self): """ Wait for all games to finish. """ if not self._games: return if not self._drain_event: self._drain_event = asyncio.Event() await self._drain_event.wait() async def graceful_shutdown(self): self._allow_new_games = False await self.close_lobby_games() async def close_lobby_games(self): self._logger.info("Closing all games currently in lobby") for game in self.pending_games: for game_connection in list(game.connections): # Tell the client to kill the FA process game_connection.player.write_message({ "command": "notice", "style": "kill" }) await game_connection.abort()
Utility class for maintaining lifecycle of games
Ancestors
Instance variables
prop all_games : ValuesView[Game]
-
Expand source code
@property def all_games(self) -> ValuesView[Game]: return self._games.values()
prop live_games : list[Game]
-
Expand source code
@property def live_games(self) -> list[Game]: return [ game for game in self.all_games if game.state is GameState.LIVE ]
prop open_games : list[Game]
-
Expand source code
@property def open_games(self) -> list[Game]: """ Return all games that meet the client's definition of "not closed". Server game states are mapped to client game states as follows: GameState.LOBBY: "open", GameState.LIVE: "playing", GameState.ENDED: "closed", GameState.INITIALIZING: "closed", The client ignores everything "closed". This property fetches all such not-closed games. """ return [ game for game in self.all_games if game.state in (GameState.LOBBY, GameState.LIVE) ]
Return all games that meet the client's definition of "not closed". Server game states are mapped to client game states as follows:
GameState.LOBBY: "open", GameState.LIVE: "playing", GameState.ENDED: "closed", GameState.INITIALIZING: "closed",
The client ignores everything "closed". This property fetches all such not-closed games.
prop pending_games : list[Game]
-
Expand source code
@property def pending_games(self) -> list[Game]: return [ game for game in self.all_games if game.state in (GameState.LOBBY, GameState.INITIALIZING) ]
Methods
async def close_lobby_games(self)
-
Expand source code
async def close_lobby_games(self): self._logger.info("Closing all games currently in lobby") for game in self.pending_games: for game_connection in list(game.connections): # Tell the client to kill the FA process game_connection.player.write_message({ "command": "notice", "style": "kill" }) await game_connection.abort()
def create_game(self,
game_mode: str,
game_class: type[Game] = server.games.custom_game.CustomGame,
visibility=VisibilityState.PUBLIC,
host: Player | None = None,
name: str | None = None,
map: Map = Map(id=None, folder_name='scmp_007', ranked=False, weight=1),
password: str | None = None,
matchmaker_queue_id: int | None = None,
**kwargs)-
Expand source code
def create_game( self, game_mode: str, game_class: type[Game] = CustomGame, visibility=VisibilityState.PUBLIC, host: Optional[Player] = None, name: Optional[str] = None, map: Map = MAP_DEFAULT, password: Optional[str] = None, matchmaker_queue_id: Optional[int] = None, **kwargs ): """ Main entrypoint for creating new games """ if not self._allow_new_games: raise DisabledError() game_id = self.create_uid() game_args = { "database": self._db, "id": game_id, "host": host, "name": name, "map": map, "game_mode": game_mode, "game_service": self, "game_stats_service": self.game_stats_service, "matchmaker_queue_id": matchmaker_queue_id, } game_args.update(kwargs) game = game_class(**game_args) self._games[game_id] = game game.visibility = visibility game.password = password self.mark_dirty(game) return game
Main entrypoint for creating new games
def create_uid(self) ‑> int
-
Expand source code
def create_uid(self) -> int: self.game_id_counter += 1 return self.game_id_counter
async def drain_games(self)
-
Expand source code
async def drain_games(self): """ Wait for all games to finish. """ if not self._games: return if not self._drain_event: self._drain_event = asyncio.Event() await self._drain_event.wait()
Wait for all games to finish.
async def get_map(self, folder_name: str) ‑> Map
-
Expand source code
async def get_map(self, folder_name: str) -> Map: folder_name = folder_name.lower() filename = f"maps/{folder_name}.zip" map = self.map_info_cache.get(filename) if map is not None: return map async with self._db.acquire() as conn: result = await conn.execute( select( map_version.c.id, map_version.c.filename, map_version.c.ranked, ) .where( func.lower(map_version.c.filename) == filename ) ) row = result.fetchone() if not row: # The map requested is not in the database. This is fine as # players may be using privately shared or generated maps that # are not in the vault. return Map( id=None, folder_name=folder_name, ranked=NeroxisGeneratedMap.is_neroxis_map(folder_name), ) map = Map( id=row.id, folder_name=folder_name, ranked=row.ranked ) self.map_info_cache[filename] = map return map
async def initialise_game_counter(self)
-
Expand source code
async def initialise_game_counter(self): async with self._db.acquire() as conn: # InnoDB, unusually, doesn't allow insertion of values greater than the next expected # value into an auto_increment field. We'd like to do that, because we no longer insert # games into the database when they don't start, so game ids aren't contiguous (as # unstarted games consume ids that never get written out). # So, id has to just be an integer primary key, no auto-increment: we handle its # incrementing here in game service, but have to do this slightly expensive query on # startup (though the primary key index probably makes it super fast anyway). # This is definitely a better choice than inserting useless rows when games are created, # doing LAST_UPDATE_ID to get the id number, and then doing an UPDATE when the actual # data to go into the row becomes available: we now only do a single insert for each # game, and don't end up with 800,000 junk rows in the database. sql = "SELECT MAX(id) FROM game_stats" self.game_id_counter = await conn.scalar(sql) or 0
def mark_dirty(self,
obj: Game | MatchmakerQueue)-
Expand source code
def mark_dirty(self, obj: Union[Game, MatchmakerQueue]): if isinstance(obj, Game): self._dirty_games.add(obj) elif isinstance(obj, MatchmakerQueue): self._dirty_queues.add(obj)
def pop_dirty_games(self) ‑> set[Game]
-
Expand source code
def pop_dirty_games(self) -> set[Game]: dirty_games = self._dirty_games self._dirty_games = set() return dirty_games
def pop_dirty_queues(self) ‑> set[MatchmakerQueue]
-
Expand source code
def pop_dirty_queues(self) -> set[MatchmakerQueue]: dirty_queues = self._dirty_queues self._dirty_queues = set() return dirty_queues
async def publish_game_results(self,
game_results: EndedGameInfo)-
Expand source code
async def publish_game_results(self, game_results: EndedGameInfo): result_dict = game_results.to_dict() await self._message_queue_service.publish( config.MQ_EXCHANGE_NAME, "success.gameResults.create", result_dict, ) if ( game_results.validity is ValidityState.VALID and game_results.rating_type is not None ): metrics.rated_games.labels(game_results.rating_type).inc() # TODO: Remove when rating service starts listening to message queue await self._rating_service.enqueue(result_dict)
def remove_game(self,
game: Game)-
Expand source code
def remove_game(self, game: Game): if game.id in self._games: self._logger.debug("Removing game %s", game) del self._games[game.id] if ( self._drain_event is not None and not self._drain_event.is_set() and not self._games ): self._drain_event.set()
def update_active_game_metrics(self)
-
Expand source code
def update_active_game_metrics(self): modes = list(self.featured_mods.keys()) game_counter = Counter( ( game.game_mode if game.game_mode in modes else "other", game.state ) for game in self._games.values() ) for state in GameState: for mode in modes + ["other"]: metrics.active_games.labels(mode, state.name).set( game_counter[(mode, state)] ) rating_type_counter = Counter( ( game.rating_type, game.state ) for game in self._games.values() ) for state in GameState: for rating_type in rating_type_counter.keys(): metrics.active_games_by_rating_type.labels(rating_type, state.name).set( rating_type_counter[(rating_type, state)] )
async def update_data(self)
-
Expand source code
async def update_data(self): """ Loads from the database the mostly-constant things that it doesn't make sense to query every time we need, but which can in principle change over time. """ async with self._db.acquire() as conn: rows = await conn.execute( select( game_featuredMods.c.id, game_featuredMods.c.gamemod, game_featuredMods.c.name, game_featuredMods.c.description, game_featuredMods.c.publish, game_featuredMods.c.order ) ) for row in rows: self.featured_mods[row.gamemod] = FeaturedMod( row.id, row.gamemod, row.name, row.description, row.publish, row.order ) result = await conn.execute( "SELECT uid FROM table_mod WHERE ranked = 1" ) # Turn resultset into a list of uids self.ranked_mods = {row.uid for row in result}
Loads from the database the mostly-constant things that it doesn't make sense to query every time we need, but which can in principle change over time.
Inherited members
class GameStatsService (event_service: EventService,
achievement_service: AchievementService)-
Expand source code
@with_logger class GameStatsService(Service): def __init__( self, event_service: EventService, achievement_service: AchievementService ): self._event_service = event_service self._achievement_service = achievement_service async def process_game_stats( self, player: Player, game: Game, army_stats_list: list ): try: await self._process_game_stats(player, game, army_stats_list) except KeyError as e: self._logger.info("Malformed game stats. KeyError: %s", e) except Exception: self._logger.exception( "Error processing game stats for %s in game %d", player.login, game.id ) async def _process_game_stats( self, player: Player, game: Game, army_stats_list: list ): stats = None number_of_humans = 0 highest_score = 0 highest_scorer = None for army_stats in army_stats_list: if army_stats["type"] == "AI" and army_stats["name"] != "civilian": self._logger.debug("Ignoring AI game reported by %s", player.login) return if army_stats["type"] == "Human": number_of_humans += 1 if highest_score < army_stats["general"]["score"]: highest_score = army_stats["general"]["score"] highest_scorer = army_stats["name"] if army_stats["name"] == player.login: stats = army_stats if number_of_humans < 2: self._logger.debug("Ignoring single player game reported by %s", player.login) return if stats is None: self._logger.warning("Player %s reported stats of a game he was not part of", player.login) return army_result = game.get_player_outcome(player) if army_result is ArmyOutcome.UNKNOWN: self._logger.warning("No army result available for player %s", player.login) return self._logger.debug("Processing game stats for player: %s", player.login) faction = stats["faction"] # Stores achievements to batch update a_queue = [] # Stores events to batch update e_queue = [] self._logger.debug("Army result for %s => %s ", player, army_result) survived = army_result is ArmyOutcome.VICTORY blueprint_stats = stats["blueprints"] unit_stats = stats["units"] scored_highest = highest_scorer == player.login if survived and game.rating_type == RatingType.LADDER_1V1: self._unlock(ACH_FIRST_SUCCESS, a_queue) self._increment(ACH_NOVICE, 1, a_queue) self._increment(ACH_JUNIOR, 1, a_queue) self._increment(ACH_SENIOR, 1, a_queue) self._increment(ACH_VETERAN, 1, a_queue) self._increment(ACH_ADDICT, 1, a_queue) self._faction_played(faction, survived, a_queue, e_queue) self._category_stats(unit_stats, survived, a_queue, e_queue) self._killed_acus(unit_stats, survived, a_queue) self._built_mercies(_count_built_units(blueprint_stats, Unit.MERCY), a_queue) self._built_fire_beetles(_count_built_units(blueprint_stats, Unit.FIRE_BEETLE), a_queue) self._built_salvations(_count_built_units(blueprint_stats, Unit.SALVATION), survived, a_queue) self._built_yolona_oss(_count_built_units(blueprint_stats, Unit.YOLONA_OSS), survived, a_queue) self._built_paragons(_count_built_units(blueprint_stats, Unit.PARAGON), survived, a_queue) self._built_atlantis(_count_built_units(blueprint_stats, Unit.ATLANTIS), a_queue) self._built_tempests(_count_built_units(blueprint_stats, Unit.TEMPEST), a_queue) self._built_scathis(_count_built_units(blueprint_stats, Unit.SCATHIS), survived, a_queue) self._built_mavors(_count_built_units(blueprint_stats, Unit.MAVOR), survived, a_queue) self._built_czars(_count_built_units(blueprint_stats, Unit.CZAR), a_queue) self._built_ahwassas(_count_built_units(blueprint_stats, Unit.AHWASSA), a_queue) self._built_ythothas(_count_built_units(blueprint_stats, Unit.YTHOTHA), a_queue) self._built_fatboys(_count_built_units(blueprint_stats, Unit.FATBOY), a_queue) self._built_monkeylords(_count_built_units(blueprint_stats, Unit.MONKEYLORD), a_queue) self._built_galactic_colossus(_count_built_units(blueprint_stats, Unit.GALACTIC_COLOSSUS), a_queue) self._built_soul_rippers(_count_built_units(blueprint_stats, Unit.SOUL_RIPPER), a_queue) self._built_megaliths(_count_built_units(blueprint_stats, Unit.MEGALITH), a_queue) self._built_asfs(_count_built_units(blueprint_stats, *ASFS), a_queue) self._built_transports(unit_stats["transportation"].get("built", 0), a_queue) self._built_sacus(unit_stats["sacu"].get("built", 0), a_queue) self._lowest_acu_health(_count(blueprint_stats, lambda x: x.get("lowest_health", 0), *ACUS), survived, a_queue) self._highscore(scored_highest, number_of_humans, a_queue) await self._achievement_service.execute_batch_update(player.id, a_queue) await self._event_service.execute_batch_update(player.id, e_queue) def _category_stats(self, unit_stats, survived, achievements_queue, events_queue): built_air = unit_stats["air"].get("built", 0) built_land = unit_stats["land"].get("built", 0) built_naval = unit_stats["naval"].get("built", 0) built_experimentals = unit_stats["experimental"].get("built", 0) self._record_event(EVENT_BUILT_AIR_UNITS, built_air, events_queue) self._record_event(EVENT_LOST_AIR_UNITS, unit_stats["air"].get("lost", 0), events_queue) self._record_event(EVENT_BUILT_LAND_UNITS, built_land, events_queue) self._record_event(EVENT_LOST_LAND_UNITS, unit_stats["land"].get("lost", 0), events_queue) self._record_event(EVENT_BUILT_NAVAL_UNITS, built_naval, events_queue) self._record_event(EVENT_LOST_NAVAL_UNITS, unit_stats["naval"].get("lost", 0), events_queue) self._record_event(EVENT_LOST_ACUS, unit_stats["cdr"].get("lost", 0), events_queue) self._record_event(EVENT_BUILT_TECH_1_UNITS, unit_stats["tech1"].get("built", 0), events_queue) self._record_event(EVENT_LOST_TECH_1_UNITS, unit_stats["tech1"].get("lost", 0), events_queue) self._record_event(EVENT_BUILT_TECH_2_UNITS, unit_stats["tech2"].get("built", 0), events_queue) self._record_event(EVENT_LOST_TECH_2_UNITS, unit_stats["tech2"].get("lost", 0), events_queue) self._record_event(EVENT_BUILT_TECH_3_UNITS, unit_stats["tech3"].get("built", 0), events_queue) self._record_event(EVENT_LOST_TECH_3_UNITS, unit_stats["tech3"].get("lost", 0), events_queue) self._record_event(EVENT_BUILT_EXPERIMENTALS, built_experimentals, events_queue) self._record_event(EVENT_LOST_EXPERIMENTALS, unit_stats["experimental"].get("lost", 0), events_queue) self._record_event(EVENT_BUILT_ENGINEERS, unit_stats["engineer"].get("built", 0), events_queue) self._record_event(EVENT_LOST_ENGINEERS, unit_stats["engineer"].get("lost", 0), events_queue) if survived: if built_air > built_land and built_air > built_naval: self._increment(ACH_WRIGHT_BROTHER, 1, achievements_queue) self._increment(ACH_WINGMAN, 1, achievements_queue) self._increment(ACH_KING_OF_THE_SKIES, 1, achievements_queue) elif built_land > built_air and built_land > built_naval: self._increment(ACH_MILITIAMAN, 1, achievements_queue) self._increment(ACH_GRENADIER, 1, achievements_queue) self._increment(ACH_FIELD_MARSHAL, 1, achievements_queue) elif built_naval > built_land and built_naval > built_air: self._increment(ACH_LANDLUBBER, 1, achievements_queue) self._increment(ACH_SEAMAN, 1, achievements_queue) self._increment(ACH_ADMIRAL_OF_THE_FLEET, 1, achievements_queue) if built_experimentals > 0: self._increment(ACH_DR_EVIL, built_experimentals, achievements_queue) if built_experimentals >= 3: self._increment(ACH_TECHIE, 1, achievements_queue) self._increment(ACH_I_LOVE_BIG_TOYS, 1, achievements_queue) self._increment(ACH_EXPERIMENTALIST, 1, achievements_queue) def _faction_played(self, faction, survived, achievements_queue, events_queue): if faction == Faction.aeon: self._record_event(EVENT_AEON_PLAYS, 1, events_queue) if survived: self._record_event(EVENT_AEON_WINS, 1, events_queue) self._increment(ACH_AURORA, 1, achievements_queue) self._increment(ACH_BLAZE, 1, achievements_queue) self._increment(ACH_SERENITY, 1, achievements_queue) elif faction == Faction.cybran: self._record_event(EVENT_CYBRAN_PLAYS, 1, events_queue) if survived: self._record_event(EVENT_CYBRAN_WINS, 1, events_queue) self._increment(ACH_MANTIS, 1, achievements_queue) self._increment(ACH_WAGNER, 1, achievements_queue) self._increment(ACH_TREBUCHET, 1, achievements_queue) elif faction == Faction.uef: self._record_event(EVENT_UEF_PLAYS, 1, events_queue) if survived: self._record_event(EVENT_UEF_WINS, 1, events_queue) self._increment(ACH_MA12_STRIKER, 1, achievements_queue) self._increment(ACH_RIPTIDE, 1, achievements_queue) self._increment(ACH_DEMOLISHER, 1, achievements_queue) elif faction == Faction.seraphim: self._record_event(EVENT_SERAPHIM_PLAYS, 1, events_queue) if survived: self._record_event(EVENT_SERAPHIM_WINS, 1, events_queue) self._increment(ACH_THAAM, 1, achievements_queue) self._increment(ACH_YENZYNE, 1, achievements_queue) self._increment(ACH_SUTHANUS, 1, achievements_queue) def _killed_acus(self, unit_stats, survived, achievements_queue): killed_acus = unit_stats["cdr"].get("kills", 0) if killed_acus > 0: self._increment(ACH_DONT_MESS_WITH_ME, killed_acus, achievements_queue) if killed_acus >= 3 and survived: self._unlock(ACH_HATTRICK, achievements_queue) def _built_mercies(self, count, achievements_queue): self._increment(ACH_NO_MERCY, count, achievements_queue) def _built_fire_beetles(self, count, achievements_queue): self._increment(ACH_DEADLY_BUGS, count, achievements_queue) def _built_salvations(self, count, survived, achievements_queue): if survived and count > 0: self._unlock(ACH_RAINMAKER, achievements_queue) def _built_yolona_oss(self, count, survived, achievements_queue): if survived and count > 0: self._unlock(ACH_NUCLEAR_WAR, achievements_queue) def _built_paragons(self, count, survived, achievements_queue): if survived and count > 0: self._unlock(ACH_SO_MUCH_RESOURCES, achievements_queue) def _built_atlantis(self, count, achievements_queue): self._increment(ACH_IT_AINT_A_CITY, count, achievements_queue) def _built_tempests(self, count, achievements_queue): self._increment(ACH_STORMY_SEA, count, achievements_queue) def _built_scathis(self, count, survived, achievements_queue): if survived and count > 0: self._unlock(ACH_MAKE_IT_HAIL, achievements_queue) def _built_mavors(self, count, survived, achievements_queue): if survived and count > 0: self._unlock(ACH_I_HAVE_A_CANON, achievements_queue) def _built_czars(self, count, achievements_queue): self._increment(ACH_DEATH_FROM_ABOVE, count, achievements_queue) def _built_ahwassas(self, count, achievements_queue): self._increment(ACH_ASS_WASHER, count, achievements_queue) def _built_ythothas(self, count, achievements_queue): self._increment(ACH_ALIEN_INVASION, count, achievements_queue) def _built_fatboys(self, count, achievements_queue): self._increment(ACH_FATTER_IS_BETTER, count, achievements_queue) def _built_monkeylords(self, count, achievements_queue): self._increment(ACH_ARACHNOLOGIST, count, achievements_queue) def _built_galactic_colossus(self, count, achievements_queue): self._increment(ACH_INCOMING_ROBOTS, count, achievements_queue) def _built_soul_rippers(self, count, achievements_queue): self._increment(ACH_FLYING_DEATH, count, achievements_queue) def _built_megaliths(self, count, achievements_queue): self._increment(ACH_HOLY_CRAB, count, achievements_queue) def _built_transports(self, count, achievements_queue): self._increment(ACH_THE_TRANSPORTER, count, achievements_queue) def _built_sacus(self, count, achievements_queue): self._set_steps_at_least(ACH_WHO_NEEDS_SUPPORT, count, achievements_queue) def _built_asfs(self, count, achievements_queue): self._set_steps_at_least(ACH_WHAT_A_SWARM, count, achievements_queue) def _lowest_acu_health(self, health, survived, achievements_queue): if 0 < health < 500 and survived: self._unlock(ACH_THAT_WAS_CLOSE, achievements_queue) def _highscore(self, scored_highest, number_of_humans, achievements_queue): if scored_highest and number_of_humans >= 8: self._unlock(ACH_TOP_SCORE, achievements_queue) self._increment(ACH_UNBEATABLE, 1, achievements_queue) def _unlock(self, achievement_id, achievements_queue): self._achievement_service.unlock(achievement_id, achievements_queue) def _increment(self, achievement_id, steps, achievements_queue): self._achievement_service.increment(achievement_id, steps, achievements_queue) def _set_steps_at_least(self, achievement_id, steps, achievements_queue): self._achievement_service.set_steps_at_least(achievement_id, steps, achievements_queue) def _record_event(self, event_id, count, events_queue): self._event_service.record_event(event_id, count, events_queue)
All services should inherit from this class.
Services are singleton objects which manage some server task.
Ancestors
Methods
async def process_game_stats(self,
player: Player,
game: Game,
army_stats_list: list)-
Expand source code
async def process_game_stats( self, player: Player, game: Game, army_stats_list: list ): try: await self._process_game_stats(player, game, army_stats_list) except KeyError as e: self._logger.info("Malformed game stats. KeyError: %s", e) except Exception: self._logger.exception( "Error processing game stats for %s in game %d", player.login, game.id )
Inherited members
class GeoIpService
-
Expand source code
@with_logger class GeoIpService(Service): """ Service for managing the GeoIp database. This includes an asyncio crontab which periodically checks if the current file is out of date. If it is, then the service will try to download a new file from tue url in `server.config`. Provides an interface for getting data out of the database. """ def __init__(self): self.refresh_file_path() config.register_callback("GEO_IP_DATABASE_PATH", self.refresh_file_path) self.db = None self.db_update_time = None def refresh_file_path(self): self.file_path = config.GEO_IP_DATABASE_PATH async def initialize(self) -> None: self.check_geoip_db_file_updated() await self.check_update_geoip_db() # crontab: min hour day month day_of_week # Run every Wednesday because GeoLite2 is updated every first Tuesday # of the month. self._update_cron = aiocron.crontab( "0 0 * * 3", func=self.check_update_geoip_db ) self._check_file_timer = Timer( 60 * 10, self.check_geoip_db_file_updated, start=True ) def check_geoip_db_file_updated(self): """ Checks if the local database file has been updated by a server admin and loads it if it has. """ if not os.path.isfile(self.file_path): return if self.db is None: # We haven't loaded the file before self.load_db() else: assert self.db_update_time is not None # We have loaded the file, so check if it has been updated date_modified = datetime.fromtimestamp( os.path.getmtime(self.file_path) ) if date_modified > self.db_update_time: self.load_db() async def check_update_geoip_db(self) -> None: """ Check if the geoip database is old and update it if so. """ if not config.GEO_IP_LICENSE_KEY: self._logger.warning( "GEO_IP_LICENSE_KEY not set! Unable to download GeoIP database!" ) return self._logger.debug("Checking if geoip database needs updating") try: date_modified = datetime.fromtimestamp( os.path.getmtime(self.file_path) ) delta = datetime.now() - date_modified if delta.days > config.GEO_IP_DATABASE_MAX_AGE_DAYS: self._logger.info("Geoip database is out of date") await self.download_geoip_db() except FileNotFoundError: # pragma: no cover self._logger.warning("Geoip database is missing...") await self.download_geoip_db() except asyncio.TimeoutError: # pragma: no cover self._logger.warning( "Failed to download database file! " "Check the network connection and try again" ) except Exception as e: # pragma: no cover self._logger.exception(e) raise e self.load_db() async def download_geoip_db(self) -> None: """ Download the geoip database to a file. If the downloaded file is not a valid gzip file, then it does NOT overwrite the old file. """ assert config.GEO_IP_LICENSE_KEY is not None self._logger.info("Downloading new geoip database") # Download new file to a temp location with TemporaryFile() as temp_file: await self._download_file( config.GEO_IP_DATABASE_URL, config.GEO_IP_LICENSE_KEY, temp_file ) temp_file.seek(0) # Unzip the archive and overwrite the old file try: with tarfile.open(fileobj=temp_file, mode="r:gz") as tar: with open(self.file_path, "wb") as f_out: f_in = extract_file(tar, "GeoLite2-Country.mmdb") shutil.copyfileobj(f_in, f_out) except (tarfile.TarError) as e: # pragma: no cover self._logger.warning("Failed to extract downloaded file!") raise e self._logger.info("New database download complete") async def _download_file( self, url: str, license_key: str, fileobj: IO[bytes] ) -> None: """ Download a file using aiohttp and save it to a file. # Params - `url`: The url to download from - `file_path`: Path to save the file at """ chunk_size = 1024 params = { "edition_id": "GeoLite2-Country", "license_key": license_key, "suffix": "tar.gz" } async def get_checksum(session): async with session.get(url, params={ **params, "suffix": params["suffix"] + ".md5" }, timeout=60 * 20) as resp: return await resp.text() async def get_db_file_with_checksum(session): hasher = hashlib.md5() async with session.get(url, params=params, timeout=60 * 20) as resp: while True: chunk = await resp.content.read(chunk_size) if not chunk: break fileobj.write(chunk) hasher.update(chunk) return hasher.hexdigest() async with aiohttp.ClientSession(raise_for_status=True) as session: checksum, our_hash = await asyncio.gather( get_checksum(session), get_db_file_with_checksum(session) ) if checksum != our_hash: raise Exception( f"Hashes did not match! Expected {checksum} got {our_hash}" ) def load_db(self) -> None: """ Loads the database into memory. """ # Set the time first, if the file is corrupted we don't need to try # loading it again anyways self.db_update_time = datetime.now() try: new_db = maxminddb.open_database(self.file_path) except (InvalidDatabaseError, OSError, ValueError): self._logger.exception( "Failed to load maxmind db! Maybe the download was interrupted" ) else: if self.db is not None: self.db.close() self.db = new_db self._logger.info( "File loaded successfully from %s", self.file_path ) def country(self, address: str) -> str: """ Look up an ip address in the db and return it's country code. """ default_value = "" if self.db is None: return default_value entry = self.db.get(address) if entry is None: return default_value return str(entry.get("country", {}).get("iso_code", default_value)) async def shutdown(self): if self.db is not None: self.db.close()
Service for managing the GeoIp database. This includes an asyncio crontab which periodically checks if the current file is out of date. If it is, then the service will try to download a new file from tue url in
server.config
.Provides an interface for getting data out of the database.
Ancestors
Methods
def check_geoip_db_file_updated(self)
-
Expand source code
def check_geoip_db_file_updated(self): """ Checks if the local database file has been updated by a server admin and loads it if it has. """ if not os.path.isfile(self.file_path): return if self.db is None: # We haven't loaded the file before self.load_db() else: assert self.db_update_time is not None # We have loaded the file, so check if it has been updated date_modified = datetime.fromtimestamp( os.path.getmtime(self.file_path) ) if date_modified > self.db_update_time: self.load_db()
Checks if the local database file has been updated by a server admin and loads it if it has.
async def check_update_geoip_db(self) ‑> None
-
Expand source code
async def check_update_geoip_db(self) -> None: """ Check if the geoip database is old and update it if so. """ if not config.GEO_IP_LICENSE_KEY: self._logger.warning( "GEO_IP_LICENSE_KEY not set! Unable to download GeoIP database!" ) return self._logger.debug("Checking if geoip database needs updating") try: date_modified = datetime.fromtimestamp( os.path.getmtime(self.file_path) ) delta = datetime.now() - date_modified if delta.days > config.GEO_IP_DATABASE_MAX_AGE_DAYS: self._logger.info("Geoip database is out of date") await self.download_geoip_db() except FileNotFoundError: # pragma: no cover self._logger.warning("Geoip database is missing...") await self.download_geoip_db() except asyncio.TimeoutError: # pragma: no cover self._logger.warning( "Failed to download database file! " "Check the network connection and try again" ) except Exception as e: # pragma: no cover self._logger.exception(e) raise e self.load_db()
Check if the geoip database is old and update it if so.
def country(self, address: str) ‑> str
-
Expand source code
def country(self, address: str) -> str: """ Look up an ip address in the db and return it's country code. """ default_value = "" if self.db is None: return default_value entry = self.db.get(address) if entry is None: return default_value return str(entry.get("country", {}).get("iso_code", default_value))
Look up an ip address in the db and return it's country code.
async def download_geoip_db(self) ‑> None
-
Expand source code
async def download_geoip_db(self) -> None: """ Download the geoip database to a file. If the downloaded file is not a valid gzip file, then it does NOT overwrite the old file. """ assert config.GEO_IP_LICENSE_KEY is not None self._logger.info("Downloading new geoip database") # Download new file to a temp location with TemporaryFile() as temp_file: await self._download_file( config.GEO_IP_DATABASE_URL, config.GEO_IP_LICENSE_KEY, temp_file ) temp_file.seek(0) # Unzip the archive and overwrite the old file try: with tarfile.open(fileobj=temp_file, mode="r:gz") as tar: with open(self.file_path, "wb") as f_out: f_in = extract_file(tar, "GeoLite2-Country.mmdb") shutil.copyfileobj(f_in, f_out) except (tarfile.TarError) as e: # pragma: no cover self._logger.warning("Failed to extract downloaded file!") raise e self._logger.info("New database download complete")
Download the geoip database to a file. If the downloaded file is not a valid gzip file, then it does NOT overwrite the old file.
def load_db(self) ‑> None
-
Expand source code
def load_db(self) -> None: """ Loads the database into memory. """ # Set the time first, if the file is corrupted we don't need to try # loading it again anyways self.db_update_time = datetime.now() try: new_db = maxminddb.open_database(self.file_path) except (InvalidDatabaseError, OSError, ValueError): self._logger.exception( "Failed to load maxmind db! Maybe the download was interrupted" ) else: if self.db is not None: self.db.close() self.db = new_db self._logger.info( "File loaded successfully from %s", self.file_path )
Loads the database into memory.
def refresh_file_path(self)
-
Expand source code
def refresh_file_path(self): self.file_path = config.GEO_IP_DATABASE_PATH
Inherited members
class LadderService (database: FAFDatabase,
game_service: GameService,
violation_service: ViolationService)-
Expand source code
@with_logger class LadderService(Service): """ Service responsible for managing the automatches. Does matchmaking, updates statistics, and launches the games. """ def __init__( self, database: FAFDatabase, game_service: GameService, violation_service: ViolationService, ): self._db = database self._informed_players: set[Player] = set() self.game_service = game_service self.queues = {} self.violation_service = violation_service self._searches: dict[Player, dict[str, Search]] = defaultdict(dict) self._allow_new_searches = True async def initialize(self) -> None: await self.update_data() self._update_cron = aiocron.crontab("*/10 * * * *", func=self.update_data) async def update_data(self) -> None: async with self._db.acquire() as conn: map_pool_maps = await self.fetch_map_pools(conn) db_queues = await self.fetch_matchmaker_queues(conn) for name, info in db_queues.items(): if name not in self.queues: queue = MatchmakerQueue( self.game_service, self.on_match_found, name=name, queue_id=info["id"], featured_mod=info["mod"], rating_type=info["rating_type"], team_size=info["team_size"], params=info.get("params") ) self.queues[name] = queue queue.initialize() else: queue = self.queues[name] queue.featured_mod = info["mod"] queue.rating_type = info["rating_type"] queue.team_size = info["team_size"] queue.rating_peak = await self.fetch_rating_peak(info["rating_type"]) queue.map_pools.clear() for map_pool_id, min_rating, max_rating in info["map_pools"]: map_pool_name, map_list = map_pool_maps[map_pool_id] if not map_list: self._logger.warning( "Map pool '%s' is empty! Some %s games will " "likely fail to start!", map_pool_name, name ) queue.add_map_pool( MapPool(map_pool_id, map_pool_name, map_list), min_rating, max_rating ) # Remove queues that don't exist anymore for queue_name in list(self.queues.keys()): if queue_name not in db_queues: self.queues[queue_name].shutdown() del self.queues[queue_name] async def fetch_map_pools(self, conn) -> dict[int, tuple[str, list[Map]]]: result = await conn.execute( select( map_pool.c.id, map_pool.c.name, map_pool_map_version.c.weight, map_pool_map_version.c.map_params, map_version.c.id.label("map_id"), map_version.c.filename, map_version.c.ranked, ).select_from( map_pool.outerjoin(map_pool_map_version) .outerjoin(map_version) ) ) map_pool_maps = {} for row in result: id_ = row.id name = row.name if id_ not in map_pool_maps: map_pool_maps[id_] = (name, list()) _, map_list = map_pool_maps[id_] if row.map_id is not None: # Database filenames contain the maps/ prefix and .zip suffix. # This comes from the content server which hosts the files at # https://content.faforever.com/maps/name.zip folder_name = re.match(r"maps/(.+)\.zip", row.filename).group(1) map_list.append( Map( id=row.map_id, folder_name=folder_name, ranked=row.ranked, weight=row.weight, ) ) elif row.map_params is not None: try: params = json.loads(row.map_params) map_type = params["type"] if map_type == "neroxis": map_list.append( NeroxisGeneratedMap.of(params, row.weight) ) else: self._logger.warning( "Unsupported map type %s in pool %s", map_type, row.id ) except Exception: self._logger.warning( "Failed to load map in map pool %d. " "Parameters are '%s'", row.id, row.map_params, exc_info=True ) return map_pool_maps async def fetch_matchmaker_queues(self, conn): result = await conn.execute( select( matchmaker_queue.c.id, matchmaker_queue.c.technical_name, matchmaker_queue.c.team_size, matchmaker_queue.c.params, matchmaker_queue_map_pool.c.map_pool_id, matchmaker_queue_map_pool.c.min_rating, matchmaker_queue_map_pool.c.max_rating, game_featuredMods.c.gamemod, leaderboard.c.technical_name.label("rating_type") ) .select_from( matchmaker_queue .join(matchmaker_queue_map_pool) .join(game_featuredMods) .join(leaderboard) ).where(matchmaker_queue.c.enabled == true()) ) # So we don't log the same error multiple times when a queue has several # map pools errored = set() matchmaker_queues = defaultdict(lambda: defaultdict(list)) for row in result: name = row.technical_name if name in errored: continue info = matchmaker_queues[name] try: info["id"] = row.id info["mod"] = row.gamemod info["rating_type"] = row.rating_type info["team_size"] = row.team_size info["params"] = json.loads(row.params) if row.params else None info["map_pools"].append(( row.map_pool_id, row.min_rating, row.max_rating )) except Exception: self._logger.warning( "Unable to load queue '%s'!", name, exc_info=True ) del matchmaker_queues[name] errored.add(name) return matchmaker_queues async def fetch_rating_peak(self, rating_type): async with self._db.acquire() as conn: result = await conn.execute( select( leaderboard_rating_journal.c.rating_mean_before, leaderboard_rating_journal.c.rating_deviation_before ) .select_from(leaderboard_rating_journal.join(leaderboard)) .where(leaderboard.c.technical_name == rating_type) .order_by(leaderboard_rating_journal.c.id.desc()) .limit(1000) ) rows = result.fetchall() rowcount = len(rows) rating_peak = 1000.0 if rowcount > 0: rating_peak = statistics.mean( row.rating_mean_before - 3 * row.rating_deviation_before for row in rows ) metrics.leaderboard_rating_peak.labels(rating_type).set(rating_peak) if rowcount < 100: self._logger.warning( "Could only fetch %s ratings for %s queue.", rowcount, rating_type ) if rating_peak < 600 or rating_peak > 1200: self._logger.warning( "Estimated rating peak for %s is %s. This could lead to issues with matchmaking.", rating_type, rating_peak ) else: self._logger.info( "Estimated rating peak for %s is %s.", rating_type, rating_peak ) return rating_peak def start_search( self, players: list[Player], queue_name: str, on_matched: OnMatchedCallback = lambda _1, _2: None ): if not self._allow_new_searches: raise DisabledError() timeouts = self.violation_service.get_violations(players) if timeouts: self._logger.debug("timeouts: %s", timeouts) times = [ { "player": p.id, "expires_at": violation.get_ban_expiration().isoformat() } for p, violation in timeouts.items() ] for player in players: player.write_message({ "command": "search_timeout", "timeouts": times }) # TODO: Do we need this or is `search_timeout` enough? player.write_message({ "command": "search_info", "queue_name": queue_name, "state": "stop" }) # For compatibility with clients that don't understand # `search_timeout` only. This may be removed at any time. if len(times) == 1: s = "" are = "is" else: s = "s" are = "are" names = ", ".join(p.login for p in timeouts) max_time = humanize.naturaldelta( max( timeouts.values(), key=lambda v: v.get_ban_expiration() ).get_remaining() ) player.write_message({ "command": "notice", "style": "info", "text": f"Player{s} {names} {are} timed out for {max_time}" }) return # Cancel any existing searches that players have for this queue for player in players: if queue_name in self._searches[player]: self._cancel_search(player, queue_name) queue = self.queues[queue_name] search = Search( players, rating_type=queue.rating_type, on_matched=on_matched ) for player in players: player.state = PlayerState.SEARCHING_LADDER self.write_rating_progress(player, queue.rating_type) player.write_message({ "command": "search_info", "queue_name": queue_name, "state": "start" }) self._searches[player][queue_name] = search self._logger.info("%s started searching for %s", search, queue_name) asyncio.create_task(queue.search(search)) def cancel_search( self, initiator: Player, queue_name: Optional[str] = None ) -> None: if queue_name is None: queue_names = list(self._searches[initiator].keys()) else: queue_names = [queue_name] for queue_name in queue_names: self._cancel_search(initiator, queue_name) def _cancel_search(self, initiator: Player, queue_name: str) -> None: """ Cancel search for a specific player/queue. """ cancelled_search = self._clear_search(initiator, queue_name) if cancelled_search is None: self._logger.debug( "Ignoring request to cancel a search that does not exist: " "%s, %s", initiator, queue_name ) return cancelled_search.cancel() for player in cancelled_search.players: player.write_message({ "command": "search_info", "queue_name": queue_name, "state": "stop" }) if ( not self._searches[player] and player.state == PlayerState.SEARCHING_LADDER ): player.state = PlayerState.IDLE self._logger.info( "%s stopped searching for %s", cancelled_search, queue_name ) def _clear_search( self, initiator: Player, queue_name: str ) -> Optional[Search]: """ Remove a search from the searches dictionary. Does NOT cancel the search. """ search = self._searches[initiator].get(queue_name) if search is not None: for player in search.players: del self._searches[player][queue_name] return search def write_rating_progress(self, player: Player, rating_type: str) -> None: if player not in self._informed_players: self._informed_players.add(player) _, deviation = player.ratings[rating_type] if deviation > 490: player.write_message({ "command": "notice", "style": "info", "text": ( "<i>Welcome to the matchmaker</i><br><br><b>The " "matchmaking system needs to calibrate your skill level; " "your first few games may be more imbalanced as the " "system attempts to learn your capability as a player." "</b><br><b>" "Afterwards, you'll be more reliably matched up with " "people of your skill level: so don't worry if your " "first few games are uneven. This will improve as you " "play!</b>" ) }) def on_match_found( self, s1: Search, s2: Search, queue: MatchmakerQueue ) -> None: """ Callback for when a match is generated by a matchmaker queue. NOTE: This function is called while the matchmaker search lock is held, so it should only perform fast operations. """ try: msg = {"command": "match_found", "queue_name": queue.name} for player in s1.players + s2.players: player.state = PlayerState.STARTING_AUTOMATCH player.write_message(msg) # Cancel any other searches queue_names = list( name for name in self._searches[player].keys() if name != queue.name ) for queue_name in queue_names: self._cancel_search(player, queue_name) self._clear_search(player, queue.name) asyncio.create_task(self.start_game(s1.players, s2.players, queue)) except Exception: self._logger.exception( "Error processing match between searches %s, and %s", s1, s2 ) def start_game( self, team1: list[Player], team2: list[Player], queue: MatchmakerQueue ) -> Awaitable[None]: # We want assertion errors to trigger when the caller attempts to # create the async function, not when the function starts executing. assert len(team1) == len(team2) return self._start_game(team1, team2, queue) async def _start_game( self, team1: list[Player], team2: list[Player], queue: MatchmakerQueue ) -> None: self._logger.debug( "Starting %s game between %s and %s", queue.name, [p.login for p in team1], [p.login for p in team2] ) game = None try: host = team1[0] all_players = team1 + team2 all_guests = all_players[1:] played_map_ids = await self.get_game_history( all_players, queue.id, limit=config.LADDER_ANTI_REPETITION_LIMIT ) def get_displayed_rating(player: Player) -> float: return player.ratings[queue.rating_type].displayed() ratings = (get_displayed_rating(player) for player in all_players) func = MAP_POOL_RATING_SELECTION_FUNCTIONS.get( config.MAP_POOL_RATING_SELECTION, statistics.mean ) rating = func(ratings) pool = queue.get_map_pool_for_rating(rating) if not pool: raise RuntimeError(f"No map pool available for rating {rating}!") game_map = pool.choose_map(played_map_ids) game = self.game_service.create_game( game_class=LadderGame, game_mode=queue.featured_mod, host=host, name="Matchmaker Game", map=game_map, matchmaker_queue_id=queue.id, rating_type=queue.rating_type, max_players=len(all_players), ) game.init_mode = InitMode.AUTO_LOBBY game.set_name_unchecked(game_name(team1, team2)) team1 = sorted(team1, key=get_displayed_rating) team2 = sorted(team2, key=get_displayed_rating) # Shuffle the teams such that direct opponents remain the same zipped_teams = list(zip(team1, team2)) random.shuffle(zipped_teams) for i, player in enumerate( player for pair in zipped_teams for player in pair ): # FA uses lua and lua arrays are 1-indexed slot = i + 1 # 2 if even, 3 if odd team = (i % 2) + 2 player.game = game # Set player options without triggering the logic for # determining that players have actually connected to the game. game._player_options[player.id]["Faction"] = player.faction.value game._player_options[player.id]["Team"] = team game._player_options[player.id]["StartSpot"] = slot game._player_options[player.id]["Army"] = slot game._player_options[player.id]["Color"] = slot game_options = queue.get_game_options() if game_options: game.game_options.update(game_options) self._logger.debug("Starting ladder game: %s", game) def make_game_options(player: Player) -> GameLaunchOptions: return GameLaunchOptions( mapname=game_map.folder_name, expected_players=len(all_players), game_options=game_options, team=game.get_player_option(player.id, "Team"), faction=game.get_player_option(player.id, "Faction"), map_position=game.get_player_option(player.id, "StartSpot") ) await self.launch_match(game, host, all_guests, make_game_options) self._logger.debug("Ladder game launched successfully %s", game) metrics.matches.labels(queue.name, MatchLaunch.SUCCESSFUL).inc() except Exception as e: abandoning_players = [] if isinstance(e, NotConnectedError): self._logger.info( "Ladder game failed to start! %s setup timed out", game ) metrics.matches.labels(queue.name, MatchLaunch.TIMED_OUT).inc() abandoning_players = e.players elif isinstance(e, GameClosedError): self._logger.info( "Ladder game %s failed to start! " "Player %s closed their game instance", game, e.player ) metrics.matches.labels(queue.name, MatchLaunch.ABORTED_BY_PLAYER).inc() abandoning_players = [e.player] else: # All timeout errors should be transformed by the match starter. assert not isinstance(e, asyncio.TimeoutError) self._logger.exception("Ladder game failed to start %s", game) metrics.matches.labels(queue.name, MatchLaunch.ERRORED).inc() if game: await game.on_game_finish() game_id = game.id if game else None msg = {"command": "match_cancelled", "game_id": game_id} for player in all_players: player.write_message(msg) if abandoning_players: self._logger.info( "Players failed to connect: %s", abandoning_players ) self.violation_service.register_violations(abandoning_players) async def launch_match( self, game: LadderGame, host: Player, guests: list[Player], make_game_options: Callable[[Player], GameLaunchOptions] ): # Launch the host if host.lobby_connection is None: raise NotConnectedError([host]) host.lobby_connection.write_launch_game( game, is_host=True, options=make_game_options(host) ) try: await game.wait_hosted(60) except asyncio.TimeoutError: raise NotConnectedError([host]) finally: # TODO: Once the client supports `match_cancelled`, don't # send `launch_game` to the client if the host timed out. Until # then, failing to send `launch_game` will cause the client to # think it is searching for ladder, even though the server has # already removed it from the queue. # Launch the guests not_connected_guests = [ player for player in guests if player.lobby_connection is None ] if not_connected_guests: raise NotConnectedError(not_connected_guests) for guest in guests: assert guest.lobby_connection is not None guest.lobby_connection.write_launch_game( game, is_host=False, options=make_game_options(guest) ) try: await game.wait_launched(60 + 10 * len(guests)) except asyncio.TimeoutError: connected_players = game.get_connected_players() raise NotConnectedError([ player for player in guests if player not in connected_players ]) async def get_game_history( self, players: list[Player], queue_id: int, limit: int = 3 ) -> list[int]: async with self._db.acquire() as conn: result = [] for player in players: query = select( game_stats.c.mapId, ).select_from( game_player_stats .join(game_stats) .join(matchmaker_queue_game) ).where( and_( game_player_stats.c.playerId == player.id, game_stats.c.startTime >= func.DATE_SUB( func.now(), text("interval 1 day") ), matchmaker_queue_game.c.matchmaker_queue_id == queue_id ) ).order_by( game_stats.c.startTime.desc(), # Timestamps only have second resolution, so for this to # work correctly in the unit tests we also need id game_stats.c.id.desc() ).limit(limit) result.extend([ row.mapId for row in await conn.execute(query) ]) return result def on_connection_lost(self, conn: "LobbyConnection") -> None: if not conn.player: return player = conn.player self.cancel_search(player) del self._searches[player] if player in self._informed_players: self._informed_players.remove(player) async def graceful_shutdown(self): self._allow_new_searches = False for queue in self.queues.values(): queue.shutdown() for player, searches in self._searches.items(): for queue_name in list(searches.keys()): self._cancel_search(player, queue_name)
Service responsible for managing the automatches. Does matchmaking, updates statistics, and launches the games.
Ancestors
Methods
def cancel_search(self,
initiator: Player,
queue_name: str | None = None) ‑> None-
Expand source code
def cancel_search( self, initiator: Player, queue_name: Optional[str] = None ) -> None: if queue_name is None: queue_names = list(self._searches[initiator].keys()) else: queue_names = [queue_name] for queue_name in queue_names: self._cancel_search(initiator, queue_name)
async def fetch_map_pools(self, conn) ‑> dict[int, tuple[str, list[Map]]]
-
Expand source code
async def fetch_map_pools(self, conn) -> dict[int, tuple[str, list[Map]]]: result = await conn.execute( select( map_pool.c.id, map_pool.c.name, map_pool_map_version.c.weight, map_pool_map_version.c.map_params, map_version.c.id.label("map_id"), map_version.c.filename, map_version.c.ranked, ).select_from( map_pool.outerjoin(map_pool_map_version) .outerjoin(map_version) ) ) map_pool_maps = {} for row in result: id_ = row.id name = row.name if id_ not in map_pool_maps: map_pool_maps[id_] = (name, list()) _, map_list = map_pool_maps[id_] if row.map_id is not None: # Database filenames contain the maps/ prefix and .zip suffix. # This comes from the content server which hosts the files at # https://content.faforever.com/maps/name.zip folder_name = re.match(r"maps/(.+)\.zip", row.filename).group(1) map_list.append( Map( id=row.map_id, folder_name=folder_name, ranked=row.ranked, weight=row.weight, ) ) elif row.map_params is not None: try: params = json.loads(row.map_params) map_type = params["type"] if map_type == "neroxis": map_list.append( NeroxisGeneratedMap.of(params, row.weight) ) else: self._logger.warning( "Unsupported map type %s in pool %s", map_type, row.id ) except Exception: self._logger.warning( "Failed to load map in map pool %d. " "Parameters are '%s'", row.id, row.map_params, exc_info=True ) return map_pool_maps
async def fetch_matchmaker_queues(self, conn)
-
Expand source code
async def fetch_matchmaker_queues(self, conn): result = await conn.execute( select( matchmaker_queue.c.id, matchmaker_queue.c.technical_name, matchmaker_queue.c.team_size, matchmaker_queue.c.params, matchmaker_queue_map_pool.c.map_pool_id, matchmaker_queue_map_pool.c.min_rating, matchmaker_queue_map_pool.c.max_rating, game_featuredMods.c.gamemod, leaderboard.c.technical_name.label("rating_type") ) .select_from( matchmaker_queue .join(matchmaker_queue_map_pool) .join(game_featuredMods) .join(leaderboard) ).where(matchmaker_queue.c.enabled == true()) ) # So we don't log the same error multiple times when a queue has several # map pools errored = set() matchmaker_queues = defaultdict(lambda: defaultdict(list)) for row in result: name = row.technical_name if name in errored: continue info = matchmaker_queues[name] try: info["id"] = row.id info["mod"] = row.gamemod info["rating_type"] = row.rating_type info["team_size"] = row.team_size info["params"] = json.loads(row.params) if row.params else None info["map_pools"].append(( row.map_pool_id, row.min_rating, row.max_rating )) except Exception: self._logger.warning( "Unable to load queue '%s'!", name, exc_info=True ) del matchmaker_queues[name] errored.add(name) return matchmaker_queues
async def fetch_rating_peak(self, rating_type)
-
Expand source code
async def fetch_rating_peak(self, rating_type): async with self._db.acquire() as conn: result = await conn.execute( select( leaderboard_rating_journal.c.rating_mean_before, leaderboard_rating_journal.c.rating_deviation_before ) .select_from(leaderboard_rating_journal.join(leaderboard)) .where(leaderboard.c.technical_name == rating_type) .order_by(leaderboard_rating_journal.c.id.desc()) .limit(1000) ) rows = result.fetchall() rowcount = len(rows) rating_peak = 1000.0 if rowcount > 0: rating_peak = statistics.mean( row.rating_mean_before - 3 * row.rating_deviation_before for row in rows ) metrics.leaderboard_rating_peak.labels(rating_type).set(rating_peak) if rowcount < 100: self._logger.warning( "Could only fetch %s ratings for %s queue.", rowcount, rating_type ) if rating_peak < 600 or rating_peak > 1200: self._logger.warning( "Estimated rating peak for %s is %s. This could lead to issues with matchmaking.", rating_type, rating_peak ) else: self._logger.info( "Estimated rating peak for %s is %s.", rating_type, rating_peak ) return rating_peak
async def get_game_history(self,
players: list[Player],
queue_id: int,
limit: int = 3) ‑> list[int]-
Expand source code
async def get_game_history( self, players: list[Player], queue_id: int, limit: int = 3 ) -> list[int]: async with self._db.acquire() as conn: result = [] for player in players: query = select( game_stats.c.mapId, ).select_from( game_player_stats .join(game_stats) .join(matchmaker_queue_game) ).where( and_( game_player_stats.c.playerId == player.id, game_stats.c.startTime >= func.DATE_SUB( func.now(), text("interval 1 day") ), matchmaker_queue_game.c.matchmaker_queue_id == queue_id ) ).order_by( game_stats.c.startTime.desc(), # Timestamps only have second resolution, so for this to # work correctly in the unit tests we also need id game_stats.c.id.desc() ).limit(limit) result.extend([ row.mapId for row in await conn.execute(query) ]) return result
async def launch_match(self,
game: LadderGame,
host: Player,
guests: list[Player],
make_game_options: Callable[[Player], GameLaunchOptions])-
Expand source code
async def launch_match( self, game: LadderGame, host: Player, guests: list[Player], make_game_options: Callable[[Player], GameLaunchOptions] ): # Launch the host if host.lobby_connection is None: raise NotConnectedError([host]) host.lobby_connection.write_launch_game( game, is_host=True, options=make_game_options(host) ) try: await game.wait_hosted(60) except asyncio.TimeoutError: raise NotConnectedError([host]) finally: # TODO: Once the client supports `match_cancelled`, don't # send `launch_game` to the client if the host timed out. Until # then, failing to send `launch_game` will cause the client to # think it is searching for ladder, even though the server has # already removed it from the queue. # Launch the guests not_connected_guests = [ player for player in guests if player.lobby_connection is None ] if not_connected_guests: raise NotConnectedError(not_connected_guests) for guest in guests: assert guest.lobby_connection is not None guest.lobby_connection.write_launch_game( game, is_host=False, options=make_game_options(guest) ) try: await game.wait_launched(60 + 10 * len(guests)) except asyncio.TimeoutError: connected_players = game.get_connected_players() raise NotConnectedError([ player for player in guests if player not in connected_players ])
def on_match_found(self,
s1: Search,
s2: Search,
queue: MatchmakerQueue) ‑> None-
Expand source code
def on_match_found( self, s1: Search, s2: Search, queue: MatchmakerQueue ) -> None: """ Callback for when a match is generated by a matchmaker queue. NOTE: This function is called while the matchmaker search lock is held, so it should only perform fast operations. """ try: msg = {"command": "match_found", "queue_name": queue.name} for player in s1.players + s2.players: player.state = PlayerState.STARTING_AUTOMATCH player.write_message(msg) # Cancel any other searches queue_names = list( name for name in self._searches[player].keys() if name != queue.name ) for queue_name in queue_names: self._cancel_search(player, queue_name) self._clear_search(player, queue.name) asyncio.create_task(self.start_game(s1.players, s2.players, queue)) except Exception: self._logger.exception( "Error processing match between searches %s, and %s", s1, s2 )
Callback for when a match is generated by a matchmaker queue.
NOTE: This function is called while the matchmaker search lock is held, so it should only perform fast operations.
def start_game(self,
team1: list[Player],
team2: list[Player],
queue: MatchmakerQueue) ‑> Awaitable[None]-
Expand source code
def start_game( self, team1: list[Player], team2: list[Player], queue: MatchmakerQueue ) -> Awaitable[None]: # We want assertion errors to trigger when the caller attempts to # create the async function, not when the function starts executing. assert len(team1) == len(team2) return self._start_game(team1, team2, queue)
def start_search(self,
players: list[Player],
queue_name: str,
on_matched: Callable[[ForwardRef('Search'), ForwardRef('Search')], Any] = <function LadderService.<lambda>>)-
Expand source code
def start_search( self, players: list[Player], queue_name: str, on_matched: OnMatchedCallback = lambda _1, _2: None ): if not self._allow_new_searches: raise DisabledError() timeouts = self.violation_service.get_violations(players) if timeouts: self._logger.debug("timeouts: %s", timeouts) times = [ { "player": p.id, "expires_at": violation.get_ban_expiration().isoformat() } for p, violation in timeouts.items() ] for player in players: player.write_message({ "command": "search_timeout", "timeouts": times }) # TODO: Do we need this or is `search_timeout` enough? player.write_message({ "command": "search_info", "queue_name": queue_name, "state": "stop" }) # For compatibility with clients that don't understand # `search_timeout` only. This may be removed at any time. if len(times) == 1: s = "" are = "is" else: s = "s" are = "are" names = ", ".join(p.login for p in timeouts) max_time = humanize.naturaldelta( max( timeouts.values(), key=lambda v: v.get_ban_expiration() ).get_remaining() ) player.write_message({ "command": "notice", "style": "info", "text": f"Player{s} {names} {are} timed out for {max_time}" }) return # Cancel any existing searches that players have for this queue for player in players: if queue_name in self._searches[player]: self._cancel_search(player, queue_name) queue = self.queues[queue_name] search = Search( players, rating_type=queue.rating_type, on_matched=on_matched ) for player in players: player.state = PlayerState.SEARCHING_LADDER self.write_rating_progress(player, queue.rating_type) player.write_message({ "command": "search_info", "queue_name": queue_name, "state": "start" }) self._searches[player][queue_name] = search self._logger.info("%s started searching for %s", search, queue_name) asyncio.create_task(queue.search(search))
async def update_data(self) ‑> None
-
Expand source code
async def update_data(self) -> None: async with self._db.acquire() as conn: map_pool_maps = await self.fetch_map_pools(conn) db_queues = await self.fetch_matchmaker_queues(conn) for name, info in db_queues.items(): if name not in self.queues: queue = MatchmakerQueue( self.game_service, self.on_match_found, name=name, queue_id=info["id"], featured_mod=info["mod"], rating_type=info["rating_type"], team_size=info["team_size"], params=info.get("params") ) self.queues[name] = queue queue.initialize() else: queue = self.queues[name] queue.featured_mod = info["mod"] queue.rating_type = info["rating_type"] queue.team_size = info["team_size"] queue.rating_peak = await self.fetch_rating_peak(info["rating_type"]) queue.map_pools.clear() for map_pool_id, min_rating, max_rating in info["map_pools"]: map_pool_name, map_list = map_pool_maps[map_pool_id] if not map_list: self._logger.warning( "Map pool '%s' is empty! Some %s games will " "likely fail to start!", map_pool_name, name ) queue.add_map_pool( MapPool(map_pool_id, map_pool_name, map_list), min_rating, max_rating ) # Remove queues that don't exist anymore for queue_name in list(self.queues.keys()): if queue_name not in db_queues: self.queues[queue_name].shutdown() del self.queues[queue_name]
def write_rating_progress(self,
player: Player,
rating_type: str) ‑> None-
Expand source code
def write_rating_progress(self, player: Player, rating_type: str) -> None: if player not in self._informed_players: self._informed_players.add(player) _, deviation = player.ratings[rating_type] if deviation > 490: player.write_message({ "command": "notice", "style": "info", "text": ( "<i>Welcome to the matchmaker</i><br><br><b>The " "matchmaking system needs to calibrate your skill level; " "your first few games may be more imbalanced as the " "system attempts to learn your capability as a player." "</b><br><b>" "Afterwards, you'll be more reliably matched up with " "people of your skill level: so don't worry if your " "first few games are uneven. This will improve as you " "play!</b>" ) })
Inherited members
class MessageQueueService
-
Expand source code
@with_logger class MessageQueueService(Service): """ Service handling connection to the message queue and providing an interface to publish messages. """ def __init__(self) -> None: self._connection = None self._channel = None self._exchanges = {} self._exchange_types = {} self._is_ready = False config.register_callback("MQ_USER", self.reconnect) config.register_callback("MQ_PASSWORD", self.reconnect) config.register_callback("MQ_VHOST", self.reconnect) config.register_callback("MQ_SERVER", self.reconnect) config.register_callback("MQ_PORT", self.reconnect) @synchronizedmethod("initialization_lock") async def initialize(self) -> None: if self._is_ready: return try: await self._connect() except ConnectionAttemptFailed: return self._is_ready = True await self._declare_exchange(config.MQ_EXCHANGE_NAME, ExchangeType.TOPIC) async def _connect(self) -> None: try: self._connection = await aio_pika.connect_robust( "amqp://{user}:{password}@{server}:{port}/{vhost}".format( user=config.MQ_USER, password=config.MQ_PASSWORD, vhost=config.MQ_VHOST, server=config.MQ_SERVER, port=config.MQ_PORT, ), loop=asyncio.get_running_loop(), ) except ConnectionError as e: self._logger.warning( "Unable to connect to RabbitMQ. Is it running?", exc_info=True ) raise ConnectionAttemptFailed from e except ProbableAuthenticationError as e: self._logger.warning( "Unable to connect to RabbitMQ. Incorrect credentials?", exc_info=True ) raise ConnectionAttemptFailed from e except Exception as e: self._logger.warning( "Unable to connect to RabbitMQ due to unhandled excpetion %s. Incorrect vhost?", e, exc_info=True, ) raise ConnectionAttemptFailed from e self._channel = await self._connection.channel(publisher_confirms=False) self._logger.debug("Connected to RabbitMQ %r", self._connection) async def declare_exchange( self, exchange_name: str, exchange_type: ExchangeType = ExchangeType.TOPIC, durable: bool = True ) -> None: await self.initialize() if not self._is_ready: self._logger.warning( "Not connected to RabbitMQ, unable to declare exchange." ) return await self._declare_exchange(exchange_name, exchange_type, durable) async def _declare_exchange( self, exchange_name: str, exchange_type: ExchangeType, durable: bool = True ) -> None: new_exchange = await self._channel.declare_exchange( exchange_name, exchange_type, durable ) self._exchanges[exchange_name] = new_exchange self._exchange_types[exchange_name] = exchange_type @synchronizedmethod("initialization_lock") async def shutdown(self) -> None: self._is_ready = False await self._shutdown() async def _shutdown(self) -> None: if self._channel is not None: await self._channel.close() self._channel = None if self._connection is not None: await self._connection.close() self._connection = None async def publish( self, exchange_name: str, routing: str, payload: dict, mandatory: bool = False, delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT, ) -> None: await self.publish_many( exchange_name, routing, [payload], mandatory=mandatory, delivery_mode=delivery_mode ) async def publish_many( self, exchange_name: str, routing: str, payloads: Iterable[dict], mandatory: bool = False, delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT, ) -> None: if not self._is_ready: self._logger.warning( "Not connected to RabbitMQ, unable to publish message." ) return exchange = self._exchanges.get(exchange_name) if exchange is None: raise KeyError(f"Unknown exchange {exchange_name}.") async with self._channel.transaction(): for payload in payloads: message = aio_pika.Message( json.dumps(payload).encode(), delivery_mode=delivery_mode ) await exchange.publish( message, routing_key=routing, mandatory=mandatory ) self._logger.log( TRACE, "Published message %s to %s/%s", payload, exchange_name, routing ) @synchronizedmethod("initialization_lock") async def reconnect(self) -> None: self._is_ready = False await self._shutdown() try: await self._connect() except ConnectionAttemptFailed: return for exchange_name in list(self._exchanges.keys()): await self._declare_exchange( exchange_name, self._exchange_types[exchange_name] ) self._is_ready = True
Service handling connection to the message queue and providing an interface to publish messages.
Ancestors
Methods
async def declare_exchange(self,
exchange_name: str,
exchange_type: aio_pika.abc.ExchangeType = ExchangeType.TOPIC,
durable: bool = True) ‑> None-
Expand source code
async def declare_exchange( self, exchange_name: str, exchange_type: ExchangeType = ExchangeType.TOPIC, durable: bool = True ) -> None: await self.initialize() if not self._is_ready: self._logger.warning( "Not connected to RabbitMQ, unable to declare exchange." ) return await self._declare_exchange(exchange_name, exchange_type, durable)
async def publish(self,
exchange_name: str,
routing: str,
payload: dict,
mandatory: bool = False,
delivery_mode: aio_pika.abc.DeliveryMode = DeliveryMode.PERSISTENT) ‑> None-
Expand source code
async def publish( self, exchange_name: str, routing: str, payload: dict, mandatory: bool = False, delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT, ) -> None: await self.publish_many( exchange_name, routing, [payload], mandatory=mandatory, delivery_mode=delivery_mode )
async def publish_many(self,
exchange_name: str,
routing: str,
payloads: Iterable[dict],
mandatory: bool = False,
delivery_mode: aio_pika.abc.DeliveryMode = DeliveryMode.PERSISTENT) ‑> None-
Expand source code
async def publish_many( self, exchange_name: str, routing: str, payloads: Iterable[dict], mandatory: bool = False, delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT, ) -> None: if not self._is_ready: self._logger.warning( "Not connected to RabbitMQ, unable to publish message." ) return exchange = self._exchanges.get(exchange_name) if exchange is None: raise KeyError(f"Unknown exchange {exchange_name}.") async with self._channel.transaction(): for payload in payloads: message = aio_pika.Message( json.dumps(payload).encode(), delivery_mode=delivery_mode ) await exchange.publish( message, routing_key=routing, mandatory=mandatory ) self._logger.log( TRACE, "Published message %s to %s/%s", payload, exchange_name, routing )
async def reconnect(self) ‑> None
-
Expand source code
@synchronizedmethod("initialization_lock") async def reconnect(self) -> None: self._is_ready = False await self._shutdown() try: await self._connect() except ConnectionAttemptFailed: return for exchange_name in list(self._exchanges.keys()): await self._declare_exchange( exchange_name, self._exchange_types[exchange_name] ) self._is_ready = True
Inherited members
class OAuthService
-
Expand source code
@with_logger class OAuthService(Service, name="oauth_service"): """ Service for managing the OAuth token logins and verification. """ def __init__(self): self.public_keys = {} self._last_key_fetch_time = None async def initialize(self) -> None: await self.retrieve_public_keys() # crontab: min hour day month day_of_week # Run every 10 minutes to update public keys. self._update_cron = aiocron.crontab( "*/10 * * * *", func=self.retrieve_public_keys ) @synchronizedmethod async def get_public_keys(self) -> dict: """ Return cached keys, or fetch them if they're missing """ if not self.public_keys: # Rate limit requests so we don't spam the endpoint when it's down if ( not self._last_key_fetch_time or time.monotonic() - self._last_key_fetch_time > 5 ): await self.retrieve_public_keys() if not self.public_keys: raise RuntimeError("jwks could not be retrieved") return self.public_keys async def retrieve_public_keys(self) -> None: """ Get the latest jwks from the hydra endpoint """ self._last_key_fetch_time = time.monotonic() try: async with aiohttp.ClientSession(raise_for_status=True) as session: async with session.get(config.HYDRA_JWKS_URI) as resp: jwks = await resp.json() self.public_keys = { jwk["kid"]: RSAAlgorithm.from_jwk(jwk) for jwk in jwks["keys"] } self._logger.info("Got public keys from %s", config.HYDRA_JWKS_URI) except Exception: self._logger.exception( "Unable to retrieve jwks, token login will be unavailable!" ) async def get_player_id_from_token(self, token: str) -> int: """ Decode the JWT to get the player_id """ # Ensures that if we're missing the jwks we will try to fetch them on # each new login request. This way our login functionality will be # restored as soon as possible keys = await self.get_public_keys() try: kid = jwt.get_unverified_header(token)["kid"] key = keys[kid] decoded = jwt.decode( token, key=key, algorithms="RS256", options={"verify_aud": False} ) if "lobby" not in decoded["scp"]: raise AuthenticationError( "Token does not have permission to login to the lobby server", "token" ) return int(decoded["sub"]) except (InvalidTokenError, KeyError, ValueError): raise AuthenticationError("Token signature was invalid", "token")
Service for managing the OAuth token logins and verification.
Ancestors
Methods
async def get_player_id_from_token(self, token: str) ‑> int
-
Expand source code
async def get_player_id_from_token(self, token: str) -> int: """ Decode the JWT to get the player_id """ # Ensures that if we're missing the jwks we will try to fetch them on # each new login request. This way our login functionality will be # restored as soon as possible keys = await self.get_public_keys() try: kid = jwt.get_unverified_header(token)["kid"] key = keys[kid] decoded = jwt.decode( token, key=key, algorithms="RS256", options={"verify_aud": False} ) if "lobby" not in decoded["scp"]: raise AuthenticationError( "Token does not have permission to login to the lobby server", "token" ) return int(decoded["sub"]) except (InvalidTokenError, KeyError, ValueError): raise AuthenticationError("Token signature was invalid", "token")
Decode the JWT to get the player_id
async def get_public_keys(self) ‑> dict
-
Expand source code
@synchronizedmethod async def get_public_keys(self) -> dict: """ Return cached keys, or fetch them if they're missing """ if not self.public_keys: # Rate limit requests so we don't spam the endpoint when it's down if ( not self._last_key_fetch_time or time.monotonic() - self._last_key_fetch_time > 5 ): await self.retrieve_public_keys() if not self.public_keys: raise RuntimeError("jwks could not be retrieved") return self.public_keys
Return cached keys, or fetch them if they're missing
async def retrieve_public_keys(self) ‑> None
-
Expand source code
async def retrieve_public_keys(self) -> None: """ Get the latest jwks from the hydra endpoint """ self._last_key_fetch_time = time.monotonic() try: async with aiohttp.ClientSession(raise_for_status=True) as session: async with session.get(config.HYDRA_JWKS_URI) as resp: jwks = await resp.json() self.public_keys = { jwk["kid"]: RSAAlgorithm.from_jwk(jwk) for jwk in jwks["keys"] } self._logger.info("Got public keys from %s", config.HYDRA_JWKS_URI) except Exception: self._logger.exception( "Unable to retrieve jwks, token login will be unavailable!" )
Get the latest jwks from the hydra endpoint
Inherited members
class PartyService (game_service: GameService)
-
Expand source code
@with_logger class PartyService(Service): """ Service responsible for managing the player parties. Logically, we consider players to always be in a party, either alone, or with other players. """ def __init__(self, game_service: GameService): self.game_service = game_service self.player_parties: dict[Player, PlayerParty] = {} self._dirty_parties: set[PlayerParty] = set() async def initialize(self): self._update_task = at_interval(1, self.update_dirties) async def shutdown(self): self._update_task.stop() async def update_dirties(self): if not self._dirty_parties: return dirty_parties = self._dirty_parties self._dirty_parties = set() for party in dirty_parties: try: self.write_broadcast_party(party) except Exception: # pragma: no cover self._logger.exception( "Unexpected exception while sending party updates!" ) def write_broadcast_party(self, party, members=None): """ Send a party update to all players in the party """ if not members: members = iter(party) msg = { "command": "update_party", **party.to_dict() } for member in members: # Will re-encode the message for each player member.player.write_message(msg) def get_party(self, owner: Player) -> PlayerParty: party = self.player_parties.get(owner) if not party: party = PlayerParty(owner) self.player_parties[owner] = party return party def mark_dirty(self, party: PlayerParty): self._dirty_parties.add(party) def invite_player_to_party(self, sender: Player, recipient: Player): """ Creates a new party for `sender` if one doesn't exist, and invites `recipient` to that party. """ if sender not in self.player_parties: self.player_parties[sender] = PlayerParty(sender) party = self.player_parties[sender] if party.owner != sender: raise ClientError("You do not own this party.", recoverable=True) party.add_invited_player(recipient) recipient.write_message({ "command": "party_invite", "sender": sender.id }) async def accept_invite(self, recipient: Player, sender: Player): party = self.player_parties.get(sender) if ( not party or recipient not in party.invited_players or party.invited_players[recipient].is_expired() ): # TODO: Localize with a proper message raise ClientError("You are not invited to that party (anymore)", recoverable=True) if sender.state is PlayerState.SEARCHING_LADDER: # TODO: Localize with a proper message raise ClientError("That party is already in queue", recoverable=True) old_party = self.player_parties.get(recipient) if old_party is not None: # Preserve state (like faction selection) from the old party member = old_party.get_member_by_player(recipient) assert member is not None await self.leave_party(recipient) party.add_member(member) else: party.add_player(recipient) self.player_parties[recipient] = party self.mark_dirty(party) async def kick_player_from_party(self, owner: Player, kicked_player: Player): if owner not in self.player_parties: raise ClientError("You are not in a party.", recoverable=True) party = self.player_parties[owner] if party.owner != owner: raise ClientError("You do not own that party.", recoverable=True) if kicked_player not in party: # Client state appears to be out of date await party.send_party(owner) return party.remove_player(kicked_player) del self.player_parties[kicked_player] kicked_player.write_message({"command": "kicked_from_party"}) self.mark_dirty(party) async def leave_party(self, player: Player): if player not in self.player_parties: raise ClientError("You are not in a party.", recoverable=True) party = self.player_parties[player] self._remove_player_from_party(player, party) # TODO: Remove? await party.send_party(player) def _remove_player_from_party(self, player, party): party.remove_player(player) del self.player_parties[player] if party.is_disbanded(): self.remove_party(party) return self.mark_dirty(party) def set_factions(self, player: Player, factions: list[Faction]): if player not in self.player_parties: self.player_parties[player] = PlayerParty(player) party = self.player_parties[player] party.set_factions(player, factions) self.mark_dirty(party) def remove_party(self, party): # Remove all players who were in the party for member in party: self._logger.info("Removing party for player %s", member.player) if party == self.player_parties.get(member.player): del self.player_parties[member.player] else: self._logger.warning( "Player %s was in two parties at once!", member.player ) members = party.members party.clear() # TODO: Send a special "disbanded" command? self.write_broadcast_party(party, members=members) def on_connection_lost(self, conn: "LobbyConnection") -> None: if not conn.player or conn.player not in self.player_parties: return self._remove_player_from_party( conn.player, self.player_parties[conn.player] )
Service responsible for managing the player parties.
Logically, we consider players to always be in a party, either alone, or with other players.
Ancestors
Methods
async def accept_invite(self,
recipient: Player,
sender: Player)-
Expand source code
async def accept_invite(self, recipient: Player, sender: Player): party = self.player_parties.get(sender) if ( not party or recipient not in party.invited_players or party.invited_players[recipient].is_expired() ): # TODO: Localize with a proper message raise ClientError("You are not invited to that party (anymore)", recoverable=True) if sender.state is PlayerState.SEARCHING_LADDER: # TODO: Localize with a proper message raise ClientError("That party is already in queue", recoverable=True) old_party = self.player_parties.get(recipient) if old_party is not None: # Preserve state (like faction selection) from the old party member = old_party.get_member_by_player(recipient) assert member is not None await self.leave_party(recipient) party.add_member(member) else: party.add_player(recipient) self.player_parties[recipient] = party self.mark_dirty(party)
def get_party(self,
owner: Player) ‑> PlayerParty-
Expand source code
def get_party(self, owner: Player) -> PlayerParty: party = self.player_parties.get(owner) if not party: party = PlayerParty(owner) self.player_parties[owner] = party return party
def invite_player_to_party(self,
sender: Player,
recipient: Player)-
Expand source code
def invite_player_to_party(self, sender: Player, recipient: Player): """ Creates a new party for `sender` if one doesn't exist, and invites `recipient` to that party. """ if sender not in self.player_parties: self.player_parties[sender] = PlayerParty(sender) party = self.player_parties[sender] if party.owner != sender: raise ClientError("You do not own this party.", recoverable=True) party.add_invited_player(recipient) recipient.write_message({ "command": "party_invite", "sender": sender.id })
Creates a new party for
sender
if one doesn't exist, and invitesrecipient
to that party. async def kick_player_from_party(self,
owner: Player,
kicked_player: Player)-
Expand source code
async def kick_player_from_party(self, owner: Player, kicked_player: Player): if owner not in self.player_parties: raise ClientError("You are not in a party.", recoverable=True) party = self.player_parties[owner] if party.owner != owner: raise ClientError("You do not own that party.", recoverable=True) if kicked_player not in party: # Client state appears to be out of date await party.send_party(owner) return party.remove_player(kicked_player) del self.player_parties[kicked_player] kicked_player.write_message({"command": "kicked_from_party"}) self.mark_dirty(party)
async def leave_party(self,
player: Player)-
Expand source code
async def leave_party(self, player: Player): if player not in self.player_parties: raise ClientError("You are not in a party.", recoverable=True) party = self.player_parties[player] self._remove_player_from_party(player, party) # TODO: Remove? await party.send_party(player)
def mark_dirty(self,
party: PlayerParty)-
Expand source code
def mark_dirty(self, party: PlayerParty): self._dirty_parties.add(party)
def remove_party(self, party)
-
Expand source code
def remove_party(self, party): # Remove all players who were in the party for member in party: self._logger.info("Removing party for player %s", member.player) if party == self.player_parties.get(member.player): del self.player_parties[member.player] else: self._logger.warning( "Player %s was in two parties at once!", member.player ) members = party.members party.clear() # TODO: Send a special "disbanded" command? self.write_broadcast_party(party, members=members)
def set_factions(self,
player: Player,
factions: list[Faction])-
Expand source code
def set_factions(self, player: Player, factions: list[Faction]): if player not in self.player_parties: self.player_parties[player] = PlayerParty(player) party = self.player_parties[player] party.set_factions(player, factions) self.mark_dirty(party)
async def update_dirties(self)
-
Expand source code
async def update_dirties(self): if not self._dirty_parties: return dirty_parties = self._dirty_parties self._dirty_parties = set() for party in dirty_parties: try: self.write_broadcast_party(party) except Exception: # pragma: no cover self._logger.exception( "Unexpected exception while sending party updates!" )
def write_broadcast_party(self, party, members=None)
-
Expand source code
def write_broadcast_party(self, party, members=None): """ Send a party update to all players in the party """ if not members: members = iter(party) msg = { "command": "update_party", **party.to_dict() } for member in members: # Will re-encode the message for each player member.player.write_message(msg)
Send a party update to all players in the party
Inherited members
class PlayerService (database: FAFDatabase)
-
Expand source code
@with_logger class PlayerService(Service): def __init__(self, database: FAFDatabase): self._db = database self._players = dict() # Static-ish data fields. self.uniqueid_exempt = {} self._dirty_players = set() async def initialize(self) -> None: await self.update_data() self._update_cron = aiocron.crontab( "*/10 * * * *", func=self.update_data ) def __len__(self): return len(self._players) def __iter__(self): return self._players.values().__iter__() def __getitem__(self, player_id: int) -> Optional[Player]: return self._players.get(player_id) def __setitem__(self, player_id: int, player: Player): self._players[player_id] = player metrics.players_online.set(len(self._players)) @property def all_players(self) -> ValuesView[Player]: return self._players.values() def mark_dirty(self, player: Player): self._dirty_players.add(player) def pop_dirty_players(self) -> set[Player]: dirty_players = self._dirty_players self._dirty_players = set() return dirty_players async def fetch_player_data(self, player): async with self._db.acquire() as conn: result = await conn.execute( select(user_group.c.technical_name) .select_from(user_group_assignment.join(user_group)) .where(user_group_assignment.c.user_id == player.id) ) player.user_groups = {row.technical_name for row in result} sql = select( avatars_list.c.url, avatars_list.c.tooltip, clan.c.tag ).select_from( login .outerjoin(clan_membership) .outerjoin(clan) .outerjoin( avatars, onclause=and_( avatars.c.idUser == login.c.id, avatars.c.selected == 1 ) ) .outerjoin(avatars_list) ).where(login.c.id == player.id) # yapf: disable result = await conn.execute(sql) row = result.fetchone() if not row: self._logger.warning( "Did not find data for player with id %i", player.id ) return row = row._mapping player.clan = row.get(clan.c.tag) url, tooltip = ( row.get(avatars_list.c.url), row.get(avatars_list.c.tooltip) ) if url and tooltip: player.avatar = {"url": url, "tooltip": tooltip} await self._fetch_player_ratings(player, conn) async def _fetch_player_ratings(self, player, conn): sql = select( leaderboard_rating.c.mean, leaderboard_rating.c.deviation, leaderboard_rating.c.total_games, leaderboard.c.technical_name, ).select_from( leaderboard.join(leaderboard_rating) ).where( leaderboard_rating.c.login_id == player.id ) result = await conn.execute(sql) retrieved_ratings = { row.technical_name: ( (row.mean, row.deviation), row.total_games ) for row in result } for rating_type, (rating, total_games) in retrieved_ratings.items(): player.ratings[rating_type] = rating player.game_count[rating_type] = total_games def remove_player(self, player: Player): if player.id in self._players: # This signals that the player is now disconnected del player.lobby_connection del self._players[player.id] metrics.players_online.set(len(self._players)) self.mark_dirty(player) async def has_permission_role(self, player: Player, role_name: str) -> bool: async with self._db.acquire() as conn: result = await conn.execute( select(group_permission.c.id) .select_from( user_group_assignment .join(group_permission_assignment, onclause=( user_group_assignment.c.group_id == group_permission_assignment.c.group_id )) .join(group_permission) ) .where( and_( user_group_assignment.c.user_id == player.id, group_permission.c.technical_name == role_name ) ) ) row = result.fetchone() return row is not None def is_uniqueid_exempt(self, user_id: int) -> bool: return user_id in self.uniqueid_exempt def get_player(self, player_id: int) -> Optional[Player]: return self._players.get(player_id) def signal_player_rating_change( self, player_id: int, rating_type: str, new_rating: Rating ) -> None: player = self.get_player(player_id) if player is None: self._logger.debug( "Received rating change for player with id %i not in PlayerService.", player_id ) return self._logger.debug( "Received rating change for player %s.", player ) player.ratings[rating_type] = new_rating player.game_count[rating_type] += 1 self.mark_dirty(player) async def update_data(self): """ Update rarely-changing data, such as the admin list and the list of users exempt from the uniqueid check. """ async with self._db.acquire() as conn: # UniqueID-exempt users. result = await conn.execute( "SELECT `user_id` FROM uniqueid_exempt" ) self.uniqueid_exempt = frozenset(map(lambda x: x[0], result)) async def kick_idle_players(self): for fut in asyncio.as_completed([ player.lobby_connection.abort("Graceful shutdown.") for player in self.all_players if player.state == PlayerState.IDLE if player.lobby_connection is not None ]): try: await fut except Exception: self._logger.debug( "Error while aborting connection", exc_info=True ) def on_connection_lost(self, conn: "LobbyConnection") -> None: if not conn.player: return self.remove_player(conn.player) self._logger.debug( "Removed player %d, %s, %d", conn.player.id, conn.player.login, conn.session ) async def graceful_shutdown(self): if config.SHUTDOWN_KICK_IDLE_PLAYERS: self._kick_idle_task = at_interval(1, self.kick_idle_players)
All services should inherit from this class.
Services are singleton objects which manage some server task.
Ancestors
Instance variables
prop all_players : ValuesView[Player]
-
Expand source code
@property def all_players(self) -> ValuesView[Player]: return self._players.values()
Methods
async def fetch_player_data(self, player)
-
Expand source code
async def fetch_player_data(self, player): async with self._db.acquire() as conn: result = await conn.execute( select(user_group.c.technical_name) .select_from(user_group_assignment.join(user_group)) .where(user_group_assignment.c.user_id == player.id) ) player.user_groups = {row.technical_name for row in result} sql = select( avatars_list.c.url, avatars_list.c.tooltip, clan.c.tag ).select_from( login .outerjoin(clan_membership) .outerjoin(clan) .outerjoin( avatars, onclause=and_( avatars.c.idUser == login.c.id, avatars.c.selected == 1 ) ) .outerjoin(avatars_list) ).where(login.c.id == player.id) # yapf: disable result = await conn.execute(sql) row = result.fetchone() if not row: self._logger.warning( "Did not find data for player with id %i", player.id ) return row = row._mapping player.clan = row.get(clan.c.tag) url, tooltip = ( row.get(avatars_list.c.url), row.get(avatars_list.c.tooltip) ) if url and tooltip: player.avatar = {"url": url, "tooltip": tooltip} await self._fetch_player_ratings(player, conn)
def get_player(self, player_id: int) ‑> Player | None
-
Expand source code
def get_player(self, player_id: int) -> Optional[Player]: return self._players.get(player_id)
async def has_permission_role(self,
player: Player,
role_name: str) ‑> bool-
Expand source code
async def has_permission_role(self, player: Player, role_name: str) -> bool: async with self._db.acquire() as conn: result = await conn.execute( select(group_permission.c.id) .select_from( user_group_assignment .join(group_permission_assignment, onclause=( user_group_assignment.c.group_id == group_permission_assignment.c.group_id )) .join(group_permission) ) .where( and_( user_group_assignment.c.user_id == player.id, group_permission.c.technical_name == role_name ) ) ) row = result.fetchone() return row is not None
def is_uniqueid_exempt(self, user_id: int) ‑> bool
-
Expand source code
def is_uniqueid_exempt(self, user_id: int) -> bool: return user_id in self.uniqueid_exempt
async def kick_idle_players(self)
-
Expand source code
async def kick_idle_players(self): for fut in asyncio.as_completed([ player.lobby_connection.abort("Graceful shutdown.") for player in self.all_players if player.state == PlayerState.IDLE if player.lobby_connection is not None ]): try: await fut except Exception: self._logger.debug( "Error while aborting connection", exc_info=True )
def mark_dirty(self,
player: Player)-
Expand source code
def mark_dirty(self, player: Player): self._dirty_players.add(player)
def pop_dirty_players(self) ‑> set[Player]
-
Expand source code
def pop_dirty_players(self) -> set[Player]: dirty_players = self._dirty_players self._dirty_players = set() return dirty_players
def remove_player(self,
player: Player)-
Expand source code
def remove_player(self, player: Player): if player.id in self._players: # This signals that the player is now disconnected del player.lobby_connection del self._players[player.id] metrics.players_online.set(len(self._players)) self.mark_dirty(player)
def signal_player_rating_change(self, player_id: int, rating_type: str, new_rating: trueskill.Rating) ‑> None
-
Expand source code
def signal_player_rating_change( self, player_id: int, rating_type: str, new_rating: Rating ) -> None: player = self.get_player(player_id) if player is None: self._logger.debug( "Received rating change for player with id %i not in PlayerService.", player_id ) return self._logger.debug( "Received rating change for player %s.", player ) player.ratings[rating_type] = new_rating player.game_count[rating_type] += 1 self.mark_dirty(player)
async def update_data(self)
-
Expand source code
async def update_data(self): """ Update rarely-changing data, such as the admin list and the list of users exempt from the uniqueid check. """ async with self._db.acquire() as conn: # UniqueID-exempt users. result = await conn.execute( "SELECT `user_id` FROM uniqueid_exempt" ) self.uniqueid_exempt = frozenset(map(lambda x: x[0], result))
Update rarely-changing data, such as the admin list and the list of users exempt from the uniqueid check.
Inherited members
class RatingService (database: FAFDatabase,
player_service: PlayerService,
message_queue_service: MessageQueueService)-
Expand source code
@with_logger class RatingService(Service): """ Service responsible for calculating and saving trueskill rating updates. To avoid race conditions, rating updates from a single game ought to be atomic. """ def __init__( self, database: FAFDatabase, player_service: PlayerService, message_queue_service: MessageQueueService ): self._db = database self._player_service_callback = player_service.signal_player_rating_change self._accept_input = False self._queue = asyncio.Queue() self._task = None self._rating_type_ids: Optional[dict[str, int]] = None self.leaderboards: dict[str, Leaderboard] = {} self._message_queue_service = message_queue_service async def initialize(self) -> None: if self._task is not None: self._logger.error("Service already runnning or not properly shut down.") return await self.update_data() self._update_cron = aiocron.crontab("*/10 * * * *", func=self.update_data) self._accept_input = True self._task = asyncio.create_task(self._handle_rating_queue()) async def update_data(self): async with self._db.acquire() as conn: initializer = leaderboard.alias() sql = select( leaderboard.c.id, leaderboard.c.technical_name, initializer.c.technical_name.label("initializer") ).select_from( leaderboard.outerjoin( initializer, leaderboard.c.initializer_id == initializer.c.id ) ) result = await conn.execute(sql) rows = result.fetchall() self.leaderboards.clear() self._rating_type_ids = {} for row in rows: self.leaderboards[row.technical_name] = Leaderboard( row.id, row.technical_name ) self._rating_type_ids[row.technical_name] = row.id # Link the initializers for row in rows: current = self.leaderboards[row.technical_name] init = self.leaderboards.get(row.initializer) if init: current.initializer = init async def enqueue(self, game_info: dict[str]) -> None: if not self._accept_input: self._logger.warning("Dropped rating request %s", game_info) raise ServiceNotReadyError( "RatingService not yet initialized or shutting down." ) summary = GameRatingSummary.from_game_info_dict(game_info) self._logger.debug("Queued up rating request %s", summary) await self._queue.put(summary) rating_service_backlog.set(self._queue.qsize()) async def _handle_rating_queue(self) -> None: self._logger.debug("RatingService started!") try: while self._accept_input or not self._queue.empty(): summary = await self._queue.get() self._logger.debug("Now rating request %s", summary) try: # Make sure we finish writing rating changes even if the # server is shutting down await asyncio.shield(self._rate(summary)) except GameRatingError: self._logger.warning("Error rating game %s", summary) except Exception: # pragma: no cover self._logger.exception("Failed rating request %s", summary) else: self._logger.debug("Done rating request.") finally: self._queue.task_done() rating_service_backlog.set(self._queue.qsize()) except asyncio.CancelledError: pass except Exception: # pragma: no cover self._logger.critical( "Unexpected exception while handling rating queue.", exc_info=True ) self._logger.debug("RatingService stopped.") async def _rate(self, summary: GameRatingSummary) -> None: assert self._rating_type_ids is not None if summary.rating_type not in self._rating_type_ids: raise GameRatingError(f"Unknown rating type {summary.rating_type}.") rater = GameRater(summary) rating_results = [] async with self._db.acquire() as conn: # Fetch all players rating info from the database player_ratings = await self._get_all_player_ratings( conn, rater.player_ids ) rating_result = await self._rate_for_leaderboard( conn, summary.game_id, summary.rating_type, player_ratings, rater ) assert rating_result is not None rating_results.append(rating_result) # TODO: If we add hidden ratings, make sure to check for them here. # Hidden ratings should not affect global. # TODO: Use game_type == "matchmaker" instead? if summary.rating_type != RatingType.GLOBAL: self._logger.debug( "Performing global rating adjustment for players: %s", rater.player_ids ) adjustment_rater = AdjustmentGameRater( rater, rating_result.old_ratings ) global_rating_result = await self._rate_for_leaderboard( conn, summary.game_id, RatingType.GLOBAL, player_ratings, adjustment_rater, update_game_player_stats=False ) if global_rating_result: rating_results.append(global_rating_result) for rating_result in rating_results: await self._publish_rating_changes( rating_result.game_id, rating_result.rating_type, rating_result.old_ratings, rating_result.new_ratings, rating_result.outcome_map ) async def _rate_for_leaderboard( self, conn, game_id: int, rating_type: str, player_ratings: dict[PlayerID, PlayerRatings], rater: GameRater, update_game_player_stats: bool = True ) -> Optional[GameRatingResult]: """ Rates a game using a particular rating_type and GameRater. """ uninitialized_ratings = { # Querying the key will create the value using rating # initialization, sort of like a defaultdict. player_id: Rating(*player_ratings[player_id][rating_type]) for player_id in player_ratings.keys() if rating_type not in player_ratings[player_id] } # Initialize the ratings we need old_ratings = { player_id: Rating(*player_ratings[player_id][rating_type]) for player_id in player_ratings.keys() } new_ratings = rater.compute_rating(old_ratings) if not new_ratings: return None need_initial_ratings = { player_id: rating for player_id, rating in uninitialized_ratings.items() if player_id in new_ratings } if need_initial_ratings: # Ensure that leaderboard entries exist before calling persist. await self._create_initial_ratings( conn, rating_type, need_initial_ratings ) outcome_map = rater.get_outcome_map() # Now persist the changes for all players that get the adjustment. await self._persist_rating_changes( conn, game_id, rating_type, old_ratings, new_ratings, outcome_map, update_game_player_stats=update_game_player_stats ) return GameRatingResult( game_id, rating_type, old_ratings, new_ratings, outcome_map ) async def _create_initial_ratings( self, conn, rating_type: str, ratings: RatingDict ): assert self._rating_type_ids is not None leaderboard_id = self._rating_type_ids[rating_type] values = [ dict( login_id=player_id, mean=rating.mean, deviation=rating.dev, total_games=0, won_games=0, leaderboard_id=leaderboard_id, ) for player_id, rating in ratings.items() ] if values: await conn.execute( leaderboard_rating.insert(), values ) async def _get_all_player_ratings( self, conn, player_ids: list[PlayerID] ) -> dict[PlayerID, PlayerRatings]: sql = select( leaderboard_rating.c.login_id, leaderboard.c.technical_name, leaderboard_rating.c.mean, leaderboard_rating.c.deviation ).join(leaderboard).where( leaderboard_rating.c.login_id.in_(player_ids) ) result = await conn.execute(sql) player_ratings = { player_id: PlayerRatings(self.leaderboards, init=False) for player_id in player_ids } for row in result: player_id, rating_type = row.login_id, row.technical_name player_ratings[player_id][rating_type] = (row.mean, row.deviation) return player_ratings async def _persist_rating_changes( self, conn, game_id: int, rating_type: str, old_ratings: RatingDict, new_ratings: RatingDict, outcomes: dict[PlayerID, GameOutcome], update_game_player_stats: bool = True ) -> None: """ Persist computed ratings to the respective players' selected rating """ assert self._rating_type_ids is not None self._logger.debug("Saving rating change stats for game %i", game_id) ratings = [ (player_id, old_ratings[player_id], new_ratings[player_id]) for player_id in new_ratings.keys() ] for player_id, old_rating, new_rating in ratings: self._logger.debug( "New %s rating for player with id %s: %s -> %s", rating_type, player_id, old_rating, new_rating, ) if update_game_player_stats: # DEPRECATED: game_player_stats table contains rating data. # Use leaderboard_rating_journal instead gps_update_sql = ( game_player_stats.update() .where( and_( game_player_stats.c.playerId == bindparam("player_id"), game_player_stats.c.gameId == game_id, ) ) .values( after_mean=bindparam("after_mean"), after_deviation=bindparam("after_deviation"), mean=bindparam("mean"), deviation=bindparam("deviation"), scoreTime=func.now() ) ) try: result = await conn.execute(gps_update_sql, [ dict( player_id=player_id, after_mean=new_rating.mean, after_deviation=new_rating.dev, mean=old_rating.mean, deviation=old_rating.dev, ) for player_id, old_rating, new_rating in ratings ]) if result.rowcount != len(ratings): self._logger.warning( "gps_update_sql only updated %d out of %d rows for game_id %d", result.rowcount, len(ratings), game_id ) return except pymysql.OperationalError: # Could happen if we drop the rating columns from game_player_stats self._logger.warning( "gps_update_sql failed for game %d, ignoring...", game_id, exc_info=True ) leaderboard_id = self._rating_type_ids[rating_type] journal_insert_sql = leaderboard_rating_journal.insert().values( leaderboard_id=leaderboard_id, rating_mean_before=bindparam("rating_mean_before"), rating_deviation_before=bindparam("rating_deviation_before"), rating_mean_after=bindparam("rating_mean_after"), rating_deviation_after=bindparam("rating_deviation_after"), game_player_stats_id=select(game_player_stats.c.id).where( and_( game_player_stats.c.playerId == bindparam("player_id"), game_player_stats.c.gameId == game_id, ) ).scalar_subquery(), ) await conn.execute(journal_insert_sql, [ dict( player_id=player_id, rating_mean_before=old_rating.mean, rating_deviation_before=old_rating.dev, rating_mean_after=new_rating.mean, rating_deviation_after=new_rating.dev, ) for player_id, old_rating, new_rating in ratings ]) rating_update_sql = ( leaderboard_rating.update() .where( and_( leaderboard_rating.c.login_id == bindparam("player_id"), leaderboard_rating.c.leaderboard_id == leaderboard_id, ) ) .values( mean=bindparam("mean"), deviation=bindparam("deviation"), total_games=leaderboard_rating.c.total_games + 1, won_games=leaderboard_rating.c.won_games + bindparam("increment"), ) ) await conn.execute(rating_update_sql, [ dict( player_id=player_id, mean=new_rating.mean, deviation=new_rating.dev, increment=( 1 if outcomes[player_id] is GameOutcome.VICTORY else 0 ) ) for player_id, _, new_rating in ratings ]) for player_id, new_rating in new_ratings.items(): self._update_player_object(player_id, rating_type, new_rating) def _update_player_object( self, player_id: PlayerID, rating_type: str, new_rating: Rating ) -> None: if self._player_service_callback is None: self._logger.warning( "Tried to send rating change to player service, " "but no service was registered." ) return self._logger.debug( "Sending player rating update for player with id %i", player_id ) self._player_service_callback(player_id, rating_type, new_rating) async def _publish_rating_changes( self, game_id: int, rating_type: str, old_ratings: RatingDict, new_ratings: RatingDict, outcomes: dict[PlayerID, GameOutcome], ): for player_id, new_rating in new_ratings.items(): if player_id not in outcomes: self._logger.error("Missing outcome for player %i", player_id) continue if player_id not in old_ratings: self._logger.error("Missing old rating for player %i", player_id) continue old_rating = old_ratings[player_id] rating_change_dict = { "game_id": game_id, "player_id": player_id, "rating_type": rating_type, "new_rating_mean": new_rating.mean, "new_rating_deviation": new_rating.dev, "old_rating_mean": old_rating.mean, "old_rating_deviation": old_rating.dev, "outcome": outcomes[player_id].value } await self._message_queue_service.publish( config.MQ_EXCHANGE_NAME, "success.rating.update", rating_change_dict, ) async def _join_rating_queue(self) -> None: """ Offers a call that is blocking until the rating queue has been emptied. Mostly for testing purposes. """ await self._queue.join() async def shutdown(self) -> None: """ Finish rating all remaining games, then exit. """ self._accept_input = False self._logger.debug( "Shutdown initiated. Waiting on current queue: %s", self._queue ) if self._queue.empty() and self._task: self._task.cancel() await self._queue.join() self._task = None self._logger.debug("Queue emptied: %s", self._queue) def kill(self) -> None: """ Exit without waiting for the queue to join. """ self._accept_input = False if self._task is not None: self._task.cancel() self._task = None
Service responsible for calculating and saving trueskill rating updates. To avoid race conditions, rating updates from a single game ought to be atomic.
Ancestors
Methods
async def enqueue(self, game_info: dict[str]) ‑> None
-
Expand source code
async def enqueue(self, game_info: dict[str]) -> None: if not self._accept_input: self._logger.warning("Dropped rating request %s", game_info) raise ServiceNotReadyError( "RatingService not yet initialized or shutting down." ) summary = GameRatingSummary.from_game_info_dict(game_info) self._logger.debug("Queued up rating request %s", summary) await self._queue.put(summary) rating_service_backlog.set(self._queue.qsize())
def kill(self) ‑> None
-
Expand source code
def kill(self) -> None: """ Exit without waiting for the queue to join. """ self._accept_input = False if self._task is not None: self._task.cancel() self._task = None
Exit without waiting for the queue to join.
async def shutdown(self) ‑> None
-
Expand source code
async def shutdown(self) -> None: """ Finish rating all remaining games, then exit. """ self._accept_input = False self._logger.debug( "Shutdown initiated. Waiting on current queue: %s", self._queue ) if self._queue.empty() and self._task: self._task.cancel() await self._queue.join() self._task = None self._logger.debug("Queue emptied: %s", self._queue)
Finish rating all remaining games, then exit.
async def update_data(self)
-
Expand source code
async def update_data(self): async with self._db.acquire() as conn: initializer = leaderboard.alias() sql = select( leaderboard.c.id, leaderboard.c.technical_name, initializer.c.technical_name.label("initializer") ).select_from( leaderboard.outerjoin( initializer, leaderboard.c.initializer_id == initializer.c.id ) ) result = await conn.execute(sql) rows = result.fetchall() self.leaderboards.clear() self._rating_type_ids = {} for row in rows: self.leaderboards[row.technical_name] = Leaderboard( row.id, row.technical_name ) self._rating_type_ids[row.technical_name] = row.id # Link the initializers for row in rows: current = self.leaderboards[row.technical_name] init = self.leaderboards.get(row.initializer) if init: current.initializer = init
Inherited members
class ServerInstance (name: str,
database: FAFDatabase,
loop: asyncio.base_events.BaseEventLoop)-
Expand source code
class ServerInstance(object): """ A class representing a shared server state. Each `ServerInstance` may be exposed on multiple ports, but each port will share the same internal server state, i.e. the same players, games, etc. """ def __init__( self, name: str, database: FAFDatabase, loop: asyncio.BaseEventLoop, # For testing _override_services: Optional[dict[str, Service]] = None ): self.name = name self._logger = logging.getLogger(self.name) self.database = database self.loop = loop self.started = False self.contexts: set[ServerContext] = set() self.services = _override_services or create_services({ "server": self, "database": self.database, "loop": self.loop, }) self.connection_factory = lambda: LobbyConnection( database=database, geoip=self.services["geo_ip_service"], game_service=self.services["game_service"], players=self.services["player_service"], ladder_service=self.services["ladder_service"], party_service=self.services["party_service"], rating_service=self.services["rating_service"], oauth_service=self.services["oauth_service"], ) def write_broadcast( self, message, predicate=lambda conn: conn.authenticated ): """ Queue a message to be sent to all connected clients. """ self._logger.log(TRACE, "]]: %s", message) metrics.server_broadcasts.inc() for ctx in self.contexts: try: ctx.write_broadcast(message, predicate) except Exception: self._logger.exception( "Error writing '%s'", message.get("command", message) ) @synchronizedmethod async def start_services(self) -> None: if self.started: return num_services = len(self.services) self._logger.debug("Initializing %s services", num_services) async def initialize(service): start = time.perf_counter() await service.initialize() service._logger.debug( "%s initialized in %0.2f seconds", service.__class__.__name__, time.perf_counter() - start ) await asyncio.gather(*[ initialize(service) for service in self.services.values() ]) self._logger.debug("Initialized %s services", num_services) self.started = True async def listen( self, address: tuple[str, int], name: Optional[str] = None, protocol_class: type[Protocol] = QDataStreamProtocol, proxy: bool = False, ) -> ServerContext: """ Start listening on a new address. # Params - `address`: Tuple indicating the host, port to listen on. - `name`: String used to identify this context in log messages. The default is to use the `protocol_class` name. - `protocol_class`: The protocol class implementation to use. - `proxy`: Boolean indicating whether or not to use the PROXY protocol. See: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt """ if not self.started: await self.start_services() ctx = ServerContext( f"{self.name}[{name or protocol_class.__name__}]", self.connection_factory, list(self.services.values()), protocol_class ) await ctx.listen(*address, proxy=proxy) self.contexts.add(ctx) return ctx async def graceful_shutdown(self): """ Start a graceful shut down of the server. 1. Notify all services of graceful shutdown """ self._logger.info("Initiating graceful shutdown") await map_suppress( lambda service: service.graceful_shutdown(), self.services.values(), logger=self._logger, msg="when starting graceful shutdown of service " ) async def shutdown(self): """ Immediately shutdown the server. 1. Stop accepting new connections 2. Stop all services 3. Close all existing connections """ self._logger.info("Initiating full shutdown") await self._stop_contexts() await self._shutdown_services() await self._shutdown_contexts() self.contexts.clear() self.started = False async def drain(self): """ Wait for all games to end. """ game_service: GameService = self.services["game_service"] broadcast_service: BroadcastService = self.services["broadcast_service"] try: await asyncio.wait_for( game_service.drain_games(), timeout=config.SHUTDOWN_GRACE_PERIOD ) except asyncio.CancelledError: self._logger.debug( "Stopped waiting for games to end due to forced shutdown" ) except asyncio.TimeoutError: self._logger.warning( "Graceful shutdown period ended! %s games are still live!", len(game_service.live_games) ) finally: # The report_dirties loop is responsible for clearing dirty games # and broadcasting the update messages to players and to RabbitMQ. # We need to wait here for that loop to complete otherwise it is # possible for the services to be shut down inbetween clearing the # games and posting the messages, causing the posts to fail. await broadcast_service.wait_report_dirtes() async def _shutdown_services(self): await map_suppress( lambda service: service.shutdown(), self.services.values(), logger=self._logger, msg="when shutting down service " ) async def _stop_contexts(self): await map_suppress( lambda ctx: ctx.stop(), self.contexts, logger=self._logger, msg="when stopping context " ) async def _shutdown_contexts(self): await map_suppress( lambda ctx: ctx.shutdown(), self.contexts, logger=self._logger, msg="when shutting down context " )
A class representing a shared server state. Each
ServerInstance
may be exposed on multiple ports, but each port will share the same internal server state, i.e. the same players, games, etc.Methods
async def drain(self)
-
Expand source code
async def drain(self): """ Wait for all games to end. """ game_service: GameService = self.services["game_service"] broadcast_service: BroadcastService = self.services["broadcast_service"] try: await asyncio.wait_for( game_service.drain_games(), timeout=config.SHUTDOWN_GRACE_PERIOD ) except asyncio.CancelledError: self._logger.debug( "Stopped waiting for games to end due to forced shutdown" ) except asyncio.TimeoutError: self._logger.warning( "Graceful shutdown period ended! %s games are still live!", len(game_service.live_games) ) finally: # The report_dirties loop is responsible for clearing dirty games # and broadcasting the update messages to players and to RabbitMQ. # We need to wait here for that loop to complete otherwise it is # possible for the services to be shut down inbetween clearing the # games and posting the messages, causing the posts to fail. await broadcast_service.wait_report_dirtes()
Wait for all games to end.
async def graceful_shutdown(self)
-
Expand source code
async def graceful_shutdown(self): """ Start a graceful shut down of the server. 1. Notify all services of graceful shutdown """ self._logger.info("Initiating graceful shutdown") await map_suppress( lambda service: service.graceful_shutdown(), self.services.values(), logger=self._logger, msg="when starting graceful shutdown of service " )
Start a graceful shut down of the server.
- Notify all services of graceful shutdown
async def listen(self,
address: tuple[str, int],
name: str | None = None,
protocol_class: type[Protocol] = server.protocol.qdatastream.QDataStreamProtocol,
proxy: bool = False) ‑> ServerContext-
Expand source code
async def listen( self, address: tuple[str, int], name: Optional[str] = None, protocol_class: type[Protocol] = QDataStreamProtocol, proxy: bool = False, ) -> ServerContext: """ Start listening on a new address. # Params - `address`: Tuple indicating the host, port to listen on. - `name`: String used to identify this context in log messages. The default is to use the `protocol_class` name. - `protocol_class`: The protocol class implementation to use. - `proxy`: Boolean indicating whether or not to use the PROXY protocol. See: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt """ if not self.started: await self.start_services() ctx = ServerContext( f"{self.name}[{name or protocol_class.__name__}]", self.connection_factory, list(self.services.values()), protocol_class ) await ctx.listen(*address, proxy=proxy) self.contexts.add(ctx) return ctx
Start listening on a new address.
Params
address
: Tuple indicating the host, port to listen on.name
: String used to identify this context in log messages. The default is to use theprotocol_class
name.protocol_class
: The protocol class implementation to use.proxy
: Boolean indicating whether or not to use the PROXY protocol. See: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
async def shutdown(self)
-
Expand source code
async def shutdown(self): """ Immediately shutdown the server. 1. Stop accepting new connections 2. Stop all services 3. Close all existing connections """ self._logger.info("Initiating full shutdown") await self._stop_contexts() await self._shutdown_services() await self._shutdown_contexts() self.contexts.clear() self.started = False
Immediately shutdown the server.
- Stop accepting new connections
- Stop all services
- Close all existing connections
async def start_services(self) ‑> None
-
Expand source code
@synchronizedmethod async def start_services(self) -> None: if self.started: return num_services = len(self.services) self._logger.debug("Initializing %s services", num_services) async def initialize(service): start = time.perf_counter() await service.initialize() service._logger.debug( "%s initialized in %0.2f seconds", service.__class__.__name__, time.perf_counter() - start ) await asyncio.gather(*[ initialize(service) for service in self.services.values() ]) self._logger.debug("Initialized %s services", num_services) self.started = True
def write_broadcast(self, message, predicate=<function ServerInstance.<lambda>>)
-
Expand source code
def write_broadcast( self, message, predicate=lambda conn: conn.authenticated ): """ Queue a message to be sent to all connected clients. """ self._logger.log(TRACE, "]]: %s", message) metrics.server_broadcasts.inc() for ctx in self.contexts: try: ctx.write_broadcast(message, predicate) except Exception: self._logger.exception( "Error writing '%s'", message.get("command", message) )
Queue a message to be sent to all connected clients.
class ViolationService
-
Expand source code
@with_logger class ViolationService(Service): """ Track who is banned from searching and for how long. Apply progressive discipline for repeated violations. A violation could be anything, but it is usually any time a player fails to connect to a game. """ def __init__(self): # We store a reference to the original `Player` object for logging only self._violations: dict[int, tuple[Player, Violation]] = {} async def initialize(self): self._cleanup_task = at_interval(5, func=self.clear_expired) def clear_expired(self): now = datetime_now() for player, violation in list(self._violations.values()): if violation.is_expired(now): self._clear_violation(player) def register_violations(self, players: list[Player]): now = datetime_now() for player in players: violation = self.get_violation(player) if violation is None or violation.is_expired(now): violation = Violation(time=now) self.set_violation(player, violation) else: violation.register() player.write_message({ "command": "search_violation", **violation.to_dict() }) extra_text = "" if violation.count > 1: delta_text = humanize.precisedelta( violation.get_ban_expiration() - now ) extra_text = f" You can queue again in {delta_text}" player.write_message({ "command": "notice", "style": "info", "text": ( f"You have caused a matchmaking connection failure {violation.count} time(s). " "Multiple failures result in temporary time-outs from matchmaker. " "Please seek support on the forums or discord for persistent issues." + extra_text ) }) def get_violations(self, players: list[Player]) -> dict[Player, Violation]: now = datetime_now() result = {} for player in players: violation = self.get_violation(player) if not violation: continue elif violation.get_ban_expiration() > now: result[player] = violation elif violation.is_expired(now): self._clear_violation(player) return result def get_violation(self, player: Player) -> Optional[Violation]: _, violation = self._violations.get(player.id, (None, None)) return violation def set_violation(self, player: Player, violation: Violation): self._violations[player.id] = (player, violation) def _clear_violation(self, player: Player): violation = self.get_violation(player) self._logger.debug( "Cleared violation for player %s: %s", player.login, violation ) del self._violations[player.id]
Track who is banned from searching and for how long. Apply progressive discipline for repeated violations.
A violation could be anything, but it is usually any time a player fails to connect to a game.
Ancestors
Methods
def clear_expired(self)
-
Expand source code
def clear_expired(self): now = datetime_now() for player, violation in list(self._violations.values()): if violation.is_expired(now): self._clear_violation(player)
def get_violation(self,
player: Player) ‑> Violation | None-
Expand source code
def get_violation(self, player: Player) -> Optional[Violation]: _, violation = self._violations.get(player.id, (None, None)) return violation
def get_violations(self,
players: list[Player]) ‑> dict[Player, Violation]-
Expand source code
def get_violations(self, players: list[Player]) -> dict[Player, Violation]: now = datetime_now() result = {} for player in players: violation = self.get_violation(player) if not violation: continue elif violation.get_ban_expiration() > now: result[player] = violation elif violation.is_expired(now): self._clear_violation(player) return result
def register_violations(self,
players: list[Player])-
Expand source code
def register_violations(self, players: list[Player]): now = datetime_now() for player in players: violation = self.get_violation(player) if violation is None or violation.is_expired(now): violation = Violation(time=now) self.set_violation(player, violation) else: violation.register() player.write_message({ "command": "search_violation", **violation.to_dict() }) extra_text = "" if violation.count > 1: delta_text = humanize.precisedelta( violation.get_ban_expiration() - now ) extra_text = f" You can queue again in {delta_text}" player.write_message({ "command": "notice", "style": "info", "text": ( f"You have caused a matchmaking connection failure {violation.count} time(s). " "Multiple failures result in temporary time-outs from matchmaker. " "Please seek support on the forums or discord for persistent issues." + extra_text ) })
def set_violation(self,
player: Player,
violation: Violation)-
Expand source code
def set_violation(self, player: Player, violation: Violation): self._violations[player.id] = (player, violation)
Inherited members