Package server

Forged Alliance Forever lobby server.

Overview

The lobby server handles live state information for the FAF ecosystem. This includes maintaining a list of online players, a list of hosted and ongoing games, and a number of matchmakers. It also performs certain post-game actions like computing and persisting rating changes and updating achievements. Every online player maintains an active TCP connection to the server through which the server syncronizes the current state.

Social

The social components of the lobby server are relatively limited, as the primary social element, chat, is handled by a separate server. The social features handled by the lobby server are therefore limited to:

  • Syncronizing online player state
  • Enforcing global bans
  • Modifying a list of friends and a list of foes
  • Modifying the currently selected avatar

Games

The server supports two ways of discovering games with other players: custom lobbies and matchmakers. Ultimately however, the lobby server is only able to help players discover eachother, and maintain certain meta information about games. Game simulation happens entirely on the client side, and is completely un-controlled by the server. Certain messages sent between clients throughout the course of a game will also be relayed to the server. These can be used to determine if clients were able to connect to eachother, and what the outcome of the game was.

Custom Games

Historically, the standard way to play FAF has been for one player to host a game lobby, setup the desired map and game settings, and for other players to voluntarily join that lobby until the host is satisfied with the players and launches the game. The lobby server facilitates sending certain information about these custom lobbies to all online players (subject to friend/foe rules) as well as managing a game id that can be used to join a specific lobby. This information includes, but is not necessarily limited to:

  • Auto generated game uid
  • Host specified game name
  • Host selected map
  • List of connected players (non-AI only) and their team setup

Matchmaker games

Players may also choose to join a matchmaker queue, instead of hosting a game and finding people to play with manually. The matchmaker will attempt to create balanced games using players TrueSkill rating, and choose a game host for hosting an automatch lobby. From the server perspective, automatch games behave virtually identical to custom games, the exception being that players may not request to join them by game id. The exchange of game messages and connectivity establishment happens identically to custom games.

Connectivity Establishment

When a player requests to join a game, the lobby server initiates connection establishment between the joining player and the host, and then the joining player and all other players in the match. Connections are then established using the Interactive Connectivity Establishment (ICE) protocol, using the lobby server as a medium of exchanging candidate addresses between clients. If clients require a relay in order to connect to eachother, they will authenticate with a separate sturn or turn server using credentials supplied by a separate API service.

Achievements

When a game ends, each client will report a summary of the game in the form of a stat report. These stats are then parsed to extract information about events that occurred during the game, like units built, units killed, etc. and used to unlock or progress achievements for the players.

Technical Overview

This section is intended for developers and will outline technical details of how to interact with the server. It will remain relatively high level and implementation agnostic, instead linking to other sections of the documentation that go into more detail.

Protocol

TODO

Legal

  • Copyright © 2012-2014 Gael Honorez
  • Copyright © 2015-2016 Michael Søndergaard sheeo@faforever.com
  • Copyright © 2021 Forged Alliance Forever

Distributed under GPLv3, see license.txt

Sub-modules

server.asyncio_extensions

Some helper functions for common async tasks

server.broadcast_service
server.config

Server config variables

server.configuration_service

Manages periodic reloading of config variables

server.control

Tiny http server for introspecting state

server.core

Server framework …

server.db

Database interaction

server.decorators

Helper decorators

server.exceptions

Common exception definitions

server.factions

Supreme Commander known faction definitions

server.game_service

Manages the lifecycle of active games

server.gameconnection

Game communication over GpgNet

server.games

Type definitions for game objects

server.geoip_service

Manages the GeoIP database

server.health

Kubernetes compatible HTTP health check server.

server.info

Static meta information about the container/process

server.ladder_service
server.lobbyconnection

Handles requests from connected clients

server.matchmaker

The matchmaker system …

server.message_queue_service

Interfaces with RabbitMQ

server.metrics

Prometheus metric definitions

server.oauth_service
server.party_service

Manages interactions between players and parties

server.player_service

Manages connected and authenticated players

server.players

Player type definitions

server.profiler

Analysis of application performance

server.protocol

Protocol format definitions

server.rating

Type definitions for player ratings

server.rating_service

Post-game rating functionality

server.servercontext

Manages a group of connections using the same protocol over the same port

server.stats

Achievements and events

server.team_matchmaker

The team matchmaking system …

server.timing

Helpers for executing async functions on a timer

server.types

General type definitions

server.weakattr

Helpers for non-owned object attributes

Classes

class BroadcastService (server: ServerInstance,
message_queue_service: MessageQueueService,
game_service: GameService,
player_service: PlayerService)
Expand source code
@with_logger
class BroadcastService(Service):
    """
    Broadcast updates about changed entities.
    """

    def __init__(
        self,
        server: "ServerInstance",
        message_queue_service: MessageQueueService,
        game_service: GameService,
        player_service: PlayerService,
    ):
        self.server = server
        self.message_queue_service = message_queue_service
        self.game_service = game_service
        self.player_service = player_service
        self._report_dirties_event = None

    async def initialize(self):
        # Using a lazy interval timer so that the intervals can be changed
        # without restarting the server.
        self._broadcast_dirties_timer = LazyIntervalTimer(
            lambda: config.DIRTY_REPORT_INTERVAL,
            self._monitored_report_dirties,
            start=True
        )
        self._broadcast_ping_timer = LazyIntervalTimer(
            lambda: config.PING_INTERVAL,
            self.broadcast_ping,
            start=True
        )

    async def _monitored_report_dirties(self):
        event = asyncio.Event()
        self._report_dirties_event = event
        try:
            await self.report_dirties()
        finally:
            event.set()

    async def report_dirties(self):
        """
        Send updates about any dirty (changed) entities to connected players.
        This function is called at a fixed interval, which guarantees that any
        given object will not be written out to the clients more than once per
        interval.
        """
        self.game_service.update_active_game_metrics()
        dirty_games = self.game_service.pop_dirty_games()
        dirty_queues = self.game_service.pop_dirty_queues()
        dirty_players = self.player_service.pop_dirty_players()

        if dirty_queues:
            matchmaker_info = {
                "command": "matchmaker_info",
                "queues": [queue.to_dict() for queue in dirty_queues]
            }
            self.server.write_broadcast(matchmaker_info)

        if dirty_players:
            player_info = {
                "command": "player_info",
                "players": [player.to_dict() for player in dirty_players]
            }
            self.server.write_broadcast(player_info)

        game_info = {
            "command": "game_info",
            "games": []
        }
        # TODO: This spams squillions of messages: we should implement per-
        # connection message aggregation at the next abstraction layer down :P
        for game in dirty_games:
            if game.state == GameState.ENDED:
                self.game_service.remove_game(game)

            # So we're going to be broadcasting this to _somebody_...
            message = game.to_dict()
            game_info["games"].append(message)

            self.server.write_broadcast(
                message,
                lambda conn: (
                    conn.authenticated
                    and game.is_visible_to_player(conn.player)
                )
            )

        if dirty_queues:
            await self.message_queue_service.publish(
                config.MQ_EXCHANGE_NAME,
                "broadcast.matchmakerInfo.update",
                matchmaker_info,
                delivery_mode=DeliveryMode.NOT_PERSISTENT
            )

        if dirty_players:
            await self.message_queue_service.publish(
                config.MQ_EXCHANGE_NAME,
                "broadcast.playerInfo.update",
                player_info,
                delivery_mode=DeliveryMode.NOT_PERSISTENT
            )

        if dirty_games:
            await self.message_queue_service.publish(
                config.MQ_EXCHANGE_NAME,
                "broadcast.gameInfo.update",
                game_info,
                delivery_mode=DeliveryMode.NOT_PERSISTENT
            )

    def broadcast_ping(self):
        self.server.write_broadcast({"command": "ping"})

    async def wait_report_dirtes(self):
        """
        Wait for the current report_dirties task to complete.
        """
        if self._report_dirties_event is None:
            return

        await self._report_dirties_event.wait()

    async def graceful_shutdown(self):
        if config.SHUTDOWN_KICK_IDLE_PLAYERS:
            message = (
                "If you're in a game you can continue to play, otherwise you "
                "will be disconnected. If you aren't reconnected automatically "
                "please wait a few minutes and try to connect again."
            )
        else:
            message = (
                "If you're in a game you can continue to play, however, you "
                "will not be able to create any new games until the server has "
                "been restarted."
            )

        delta = humanize.precisedelta(config.SHUTDOWN_GRACE_PERIOD)
        self.server.write_broadcast({
            "command": "notice",
            "style": "info",
            "text": (
                f"The server will be shutting down for maintenance in {delta}! "
                f"{message}"
            )
        })

    async def shutdown(self):
        self.server.write_broadcast({
            "command": "notice",
            "style": "info",
            "text": (
                "The server has been shut down for maintenance "
                "but should be back online soon. If you experience any "
                "problems, please restart your client. <br/><br/>"
                "We apologize for this interruption."
            )
        })

Broadcast updates about changed entities.

Ancestors

Methods

def broadcast_ping(self)
Expand source code
def broadcast_ping(self):
    self.server.write_broadcast({"command": "ping"})
async def report_dirties(self)
Expand source code
async def report_dirties(self):
    """
    Send updates about any dirty (changed) entities to connected players.
    This function is called at a fixed interval, which guarantees that any
    given object will not be written out to the clients more than once per
    interval.
    """
    self.game_service.update_active_game_metrics()
    dirty_games = self.game_service.pop_dirty_games()
    dirty_queues = self.game_service.pop_dirty_queues()
    dirty_players = self.player_service.pop_dirty_players()

    if dirty_queues:
        matchmaker_info = {
            "command": "matchmaker_info",
            "queues": [queue.to_dict() for queue in dirty_queues]
        }
        self.server.write_broadcast(matchmaker_info)

    if dirty_players:
        player_info = {
            "command": "player_info",
            "players": [player.to_dict() for player in dirty_players]
        }
        self.server.write_broadcast(player_info)

    game_info = {
        "command": "game_info",
        "games": []
    }
    # TODO: This spams squillions of messages: we should implement per-
    # connection message aggregation at the next abstraction layer down :P
    for game in dirty_games:
        if game.state == GameState.ENDED:
            self.game_service.remove_game(game)

        # So we're going to be broadcasting this to _somebody_...
        message = game.to_dict()
        game_info["games"].append(message)

        self.server.write_broadcast(
            message,
            lambda conn: (
                conn.authenticated
                and game.is_visible_to_player(conn.player)
            )
        )

    if dirty_queues:
        await self.message_queue_service.publish(
            config.MQ_EXCHANGE_NAME,
            "broadcast.matchmakerInfo.update",
            matchmaker_info,
            delivery_mode=DeliveryMode.NOT_PERSISTENT
        )

    if dirty_players:
        await self.message_queue_service.publish(
            config.MQ_EXCHANGE_NAME,
            "broadcast.playerInfo.update",
            player_info,
            delivery_mode=DeliveryMode.NOT_PERSISTENT
        )

    if dirty_games:
        await self.message_queue_service.publish(
            config.MQ_EXCHANGE_NAME,
            "broadcast.gameInfo.update",
            game_info,
            delivery_mode=DeliveryMode.NOT_PERSISTENT
        )

Send updates about any dirty (changed) entities to connected players. This function is called at a fixed interval, which guarantees that any given object will not be written out to the clients more than once per interval.

async def wait_report_dirtes(self)
Expand source code
async def wait_report_dirtes(self):
    """
    Wait for the current report_dirties task to complete.
    """
    if self._report_dirties_event is None:
        return

    await self._report_dirties_event.wait()

Wait for the current report_dirties task to complete.

Inherited members

class ConfigurationService
Expand source code
@with_logger
class ConfigurationService(Service):
    def __init__(self) -> None:
        self._store = config
        self._task = None

    async def initialize(self) -> None:
        self._task = asyncio.create_task(self._worker_loop())
        self._logger.info("Configuration service initialized")

    async def _worker_loop(self) -> None:
        while True:
            try:
                self._logger.debug("Refreshing configuration variables")
                self._store.refresh()
                await asyncio.sleep(self._store.CONFIGURATION_REFRESH_TIME)
            except Exception:
                self._logger.exception("Error while refreshing config")
                # To prevent a busy loop
                await asyncio.sleep(60)

    async def shutdown(self) -> None:
        if self._task is not None:
            self._logger.info("Configuration service stopping.")
            self._task.cancel()
        self._task = None

All services should inherit from this class.

Services are singleton objects which manage some server task.

Ancestors

Inherited members

class GameConnection (database: FAFDatabase,
game: Game,
player: Player,
protocol: Protocol,
player_service: PlayerService,
games: GameService,
state: GameConnectionState = GameConnectionState.INITIALIZING,
setup_timeout: int = 60)
Expand source code
class GameConnection(GpgNetServerProtocol):
    """
    Responsible for connections to the game, using the GPGNet protocol
    """

    def __init__(
        self,
        database: FAFDatabase,
        game: Game,
        player: Player,
        protocol: Protocol,
        player_service: PlayerService,
        games: GameService,
        state: GameConnectionState = GameConnectionState.INITIALIZING,
        setup_timeout: int = 60,
    ):
        """
        Construct a new GameConnection
        """
        super().__init__()
        self._logger = logging.getLogger(
            f"{self.__class__.__qualname__}.{game.id}"
        )
        self._db = database

        self.protocol = protocol
        self._state = state
        self.game_service = games
        self.player_service = player_service

        self.player = player
        player.game_connection = self  # Set up weak reference to self
        self.game = game

        self.setup_timeout = setup_timeout

        self.finished_sim = False

        self._logger.debug("GameConnection initializing")
        if self.state is GameConnectionState.INITIALIZING:
            asyncio.get_event_loop().create_task(
                self.timeout_game_connection(setup_timeout)
            )

    async def timeout_game_connection(self, timeout):
        await asyncio.sleep(timeout)
        if self.state is GameConnectionState.INITIALIZING:
            self._logger.debug("GameConection timed out...")
            await self.abort("Player took too long to start the game")

    @property
    def state(self) -> GameConnectionState:
        return self._state

    def is_host(self) -> bool:
        if not self.game or not self.player:
            return False

        return (
            self.player.state == PlayerState.HOSTING and
            self.player == self.game.host
        )

    async def send(self, message):
        """
        Send a game message to the client.

        # Errors
        May raise `DisconnectedError`

        NOTE: When calling this on a connection other than `self` make sure to
        handle `DisconnectedError`, otherwise failure to send the message will
        cause the caller to be disconnected as well.
        """
        message["target"] = "game"

        self._logger.log(TRACE, ">> %s: %s", self.player.login, message)
        await self.protocol.send_message(message)

    async def _handle_idle_state(self):
        """
        This message is sent by FA when it doesn't know what to do.
        """
        assert self.game

        if self.player == self.game.host:
            self.game.state = GameState.LOBBY
            self._state = GameConnectionState.CONNECTED_TO_HOST
            self.game.add_game_connection(self)
            self.player.state = PlayerState.HOSTING
        else:
            self._state = GameConnectionState.INITIALIZED
            self.player.state = PlayerState.JOINING

    async def _handle_lobby_state(self):
        """
        The game has told us it is ready and listening on
        self.player.game_port for UDP.
        We determine the connectivity of the peer and respond
        appropriately
        """
        player_state = self.player.state
        if player_state == PlayerState.HOSTING:
            await self.send_HostGame(self.game.map.folder_name)
            self.game.set_hosted()
        # If the player is joining, we connect him to host
        # followed by the rest of the players.
        elif player_state == PlayerState.JOINING:
            await self.connect_to_host(self.game.host.game_connection)

            if self._state is GameConnectionState.ENDED:
                # We aborted while trying to connect
                return

            self._state = GameConnectionState.CONNECTED_TO_HOST

            try:
                self.game.add_game_connection(self)
            except GameError as e:
                await self.abort(f"GameError while joining {self.game.id}: {e}")
                return

            tasks = []
            for peer in self.game.connections:
                if peer != self and peer.player != self.game.host:
                    self._logger.debug("%s connecting to %s", self.player, peer)
                    tasks.append(self.connect_to_peer(peer))
            await asyncio.gather(*tasks)

    async def connect_to_host(self, peer: "GameConnection"):
        """
        Connect self to a given peer (host)
        """
        if not peer or peer.player.state != PlayerState.HOSTING:
            await self.abort("The host left the lobby")
            return

        await self.send_JoinGame(peer.player.login, peer.player.id)

        if not peer:
            await self.abort("The host left the lobby")
            return

        await peer.send_ConnectToPeer(
            player_name=self.player.login,
            player_uid=self.player.id,
            offer=True
        )

    async def connect_to_peer(self, peer: "GameConnection"):
        """
        Connect two peers
        """
        if peer is not None:
            await self.send_ConnectToPeer(
                player_name=peer.player.login,
                player_uid=peer.player.id,
                offer=True
            )

        if peer is not None:
            with contextlib.suppress(DisconnectedError):
                await peer.send_ConnectToPeer(
                    player_name=self.player.login,
                    player_uid=self.player.id,
                    offer=False
                )

    async def handle_action(self, command: str, args: list[Any]):
        """
        Handle GpgNetSend messages, wrapped in the JSON protocol
        """
        try:
            await COMMAND_HANDLERS[command](self, *args)
        except KeyError:
            self._logger.warning(
                "Unrecognized command %s: %s from player %s",
                command, args, self.player
            )
        except (TypeError, ValueError):
            self._logger.exception("Bad command arguments")
        except ConnectionError as e:
            raise e
        except Exception as e:  # pragma: no cover
            self._logger.exception(
                "Something awful happened in a game thread! %s",
                e
            )
            await self.abort()

    async def handle_desync(self, *_args):  # pragma: no cover
        self.game.desyncs += 1

    async def handle_game_option(self, key: str, value: Any):
        if not self.is_host():
            return

        await self.game.game_options.set_option(key, value)

        self._mark_dirty()

    async def handle_game_mods(self, mode: Any, args: list[Any]):
        if not self.is_host():
            return

        if mode == "activated":
            # In this case args is the number of mods
            if int(args) == 0:
                self.game.mods = {}

        elif mode == "uids":
            uids = str(args).split()
            self.game.mods = {uid: "Unknown sim mod" for uid in uids}
            async with self._db.acquire() as conn:
                rows = await conn.execute(
                    "SELECT `uid`, `name` from `table_mod` WHERE `uid` in :ids",
                    ids=tuple(uids)
                )
                for row in rows:
                    self.game.mods[row.uid] = row.name
        else:
            self._logger.warning("Ignoring game mod: %s, %s", mode, args)
            return

        self._mark_dirty()

    async def handle_player_option(
        self, player_id: Any, key: Any, value: Any
    ):
        if not self.is_host():
            return

        self.game.set_player_option(int(player_id), key, value)
        self._mark_dirty()

    async def handle_ai_option(self, name: Any, key: Any, value: Any):
        if not self.is_host():
            return

        self.game.set_ai_option(str(name), key, value)
        self._mark_dirty()

    async def handle_clear_slot(self, slot: Any):
        if not self.is_host():
            return

        self.game.clear_slot(int(slot))
        self._mark_dirty()

    async def handle_game_result(self, army: Any, result: Any):
        army = int(army)
        result = str(result).lower()

        try:
            *metadata, result_type, score = result.split()
        except ValueError:
            self._logger.warning("Invalid result for %s reported: %s", army, result)
        else:
            await self.game.add_result(
                self.player.id,
                army,
                result_type,
                int(score),
                frozenset(metadata),
            )

    async def handle_operation_complete(
        self, primary: Any, secondary: Any, delta: str
    ):
        """
        # Params
        - `primary`: are primary mission objectives complete?
        - `secondary`: are secondary mission objectives complete?
        - `delta`: the time it took to complete the mission
        """
        primary = FA.ENABLED == primary
        secondary = FA.ENABLED == secondary

        if not primary:
            return

        if not isinstance(self.game, CoopGame):
            self._logger.warning(
                "OperationComplete called for non-coop game: %s", self.game.id
            )
            return

        if self.game.validity != ValidityState.COOP_NOT_RANKED:
            return

        secondary, delta = secondary, str(delta)
        async with self._db.acquire() as conn:
            result = await conn.execute(
                select(coop_map.c.id).where(
                    coop_map.c.filename == self.game.map.file_path
                )
            )
            row = result.fetchone()
            if not row:
                self._logger.debug(
                    "can't find coop map: %s", self.game.map.file_path
                )
                return
            mission = row.id

            # Each player in a co-op game will send the OperationComplete
            # message but we only need to perform this insert once
            async with self.game.leaderboard_lock:
                if not self.game.leaderboard_saved:
                    await conn.execute(
                        coop_leaderboard.insert().values(
                            mission=mission,
                            gameuid=self.game.id,
                            secondary=secondary,
                            time=delta,
                            player_count=len(self.game.players),
                        )
                    )
                    self.game.leaderboard_saved = True

    async def handle_json_stats(self, stats: str):
        try:
            self.game.report_army_stats(stats)
        except json.JSONDecodeError as e:
            self._logger.warning(
                "Malformed game stats reported by %s: '...%s...'",
                self.player.login,
                stats[e.pos-20:e.pos+20]
            )

    async def handle_enforce_rating(self):
        self.game.enforce_rating = True

    async def handle_teamkill_report(
        self,
        gametime: Any,
        reporter_id: Any,
        reporter_name: str,
        teamkiller_id: Any,
        teamkiller_name: str,
    ):
        """
        Sent when a player is teamkilled and clicks the 'Report' button.

        # Params
        - `gametime`: seconds of gametime when kill happened
        - `reporter_id`: reporter id
        - `reporter_name`: reporter nickname (for debug purpose only)
        - `teamkiller_id`: teamkiller id
        - `teamkiller_name`: teamkiller nickname (for debug purpose only)
        """
        pass

    async def handle_teamkill_happened(
        self,
        gametime: Any,
        victim_id: Any,
        victim_name: str,
        teamkiller_id: Any,
        teamkiller_name: str,
    ):
        """
        Send automatically by the game whenever a teamkill happens. Takes
        the same parameters as TeamkillReport.

        # Params
        - `gametime`: seconds of gametime when kill happened
        - `victim_id`: victim id
        - `victim_name`: victim nickname (for debug purpose only)
        - `teamkiller_id`: teamkiller id
        - `teamkiller_name`: teamkiller nickname (for debug purpose only)
        """
        victim_id = int(victim_id)
        teamkiller_id = int(teamkiller_id)

        if 0 in (victim_id, teamkiller_id):
            self._logger.debug("Ignoring teamkill for AI player")
            return

        async with self._db.acquire() as conn:
            await conn.execute(
                teamkills.insert().values(
                    teamkiller=teamkiller_id,
                    victim=victim_id,
                    game_id=self.game.id,
                    gametime=gametime,
                )
            )

    async def handle_ice_message(self, receiver_id: Any, ice_msg: str):
        receiver_id = int(receiver_id)
        peer = self.player_service.get_player(receiver_id)
        if not peer:
            self._logger.debug(
                "Ignoring ICE message for unknown player: %s", receiver_id
            )
            return

        game_connection = peer.game_connection
        if not game_connection:
            self._logger.debug(
                "Ignoring ICE message for player without game connection: %s", receiver_id
            )
            return

        try:
            await game_connection.send({
                "command": "IceMsg",
                "args": [int(self.player.id), ice_msg]
            })
        except DisconnectedError:
            self._logger.debug(
                "Failed to send ICE message to player due to a disconnect: %s",
                receiver_id
            )

    async def handle_game_state(self, state: str):
        """
        Changes in game state
        """

        if state == "Idle":
            await self._handle_idle_state()
            # Don't mark as dirty
            return

        elif state == "Lobby":
            # TODO: Do we still need to schedule with `ensure_future`?
            #
            # We do not yield from the task, since we
            # need to keep processing other commands while it runs
            await self._handle_lobby_state()

        elif state == "Launching":
            if self.player.state != PlayerState.HOSTING:
                return

            if self.game.state is not GameState.LOBBY:
                self._logger.warning(
                    "Trying to launch game %s in invalid state %s",
                    self.game,
                    self.game.state
                )
                return

            self._logger.info("Launching game %s", self.game)

            await self.game.launch()

            if len(self.game.mods.keys()) > 0:
                async with self._db.acquire() as conn:
                    uids = list(self.game.mods.keys())
                    await conn.execute(
                        "UPDATE mod_stats s JOIN mod_version v ON "
                        "v.mod_id = s.mod_id "
                        "SET s.times_played = s.times_played + 1 "
                        "WHERE v.uid in :ids",
                        ids=tuple(uids)
                    )
        # Signals that the FA executable has been closed
        elif state == "Ended":
            await self.on_connection_closed()
        self._mark_dirty()

    async def handle_game_ended(self, *args: list[Any]):
        """
        Signals that the simulation has ended.
        """
        self.finished_sim = True
        await self.game.check_game_finish(self.player)

    async def handle_rehost(self, *args: list[Any]):
        """
        Signals that the user has rehosted the game. This is currently unused but
        included for documentation purposes.
        """
        pass

    async def handle_launch_status(self, status: str):
        """
        Currently is sent with status `Rejected` if a matchmaker game failed
        to start due to players using differing game settings.
        """
        pass

    async def handle_bottleneck(self, *args: list[Any]):
        """
        Not sure what this command means. This is currently unused but
        included for documentation purposes.
        """
        pass

    async def handle_bottleneck_cleared(self, *args: list[Any]):
        """
        Not sure what this command means. This is currently unused but
        included for documentation purposes.
        """
        pass

    async def handle_disconnected(self, *args: list[Any]):
        """
        Not sure what this command means. This is currently unused but
        included for documentation purposes.
        """
        pass

    async def handle_chat(self, message: str):
        """
        Whenever the player sends a chat message during the game lobby.
        """
        pass

    async def handle_game_full(self):
        """
        Sent when all game slots are full
        """
        pass

    def _mark_dirty(self):
        if self.game:
            self.game_service.mark_dirty(self.game)

    async def abort(self, log_message: str = ""):
        """
        Abort the connection

        Removes the GameConnection object from the any associated Game object,
        and deletes references to Player and Game held by this object.
        """
        try:
            if self._state is GameConnectionState.ENDED:
                return

            self._logger.debug("%s.abort(%s)", self, log_message)

            if self.game.state is GameState.LOBBY:
                await self.disconnect_all_peers()

            self._state = GameConnectionState.ENDED
            await self.game.remove_game_connection(self)
            self._mark_dirty()
            self.player.state = PlayerState.IDLE
            if self.player.lobby_connection:
                self.player.lobby_connection.game_connection = None
            del self.player.game
            del self.player.game_connection
        except Exception as ex:  # pragma: no cover
            self._logger.debug("Exception in abort(): %s", ex)

    async def disconnect_all_peers(self):
        tasks = []
        for peer in self.game.connections:
            if peer == self:
                continue

            tasks.append(peer.send_DisconnectFromPeer(self.player.id))

        for fut in asyncio.as_completed(tasks):
            try:
                await fut
            except Exception:
                self._logger.debug(
                    "peer_sendDisconnectFromPeer failed for player %i",
                    self.player.id,
                    exc_info=True
                )

    async def on_connection_closed(self):
        """
        The connection is closed by the player.
        """
        try:
            await self.game.disconnect_player(self.player)
        except Exception as e:  # pragma: no cover
            self._logger.exception(e)
        finally:
            await self.abort()

    async def on_connection_lost(self):
        """
        The connection is lost due to a disconnect from the lobby server.
        """
        try:
            await self.game.remove_game_connection(self)
        except Exception as e:  # pragma: no cover
            self._logger.exception(e)
        finally:
            await self.abort()

    def __str__(self):
        return f"GameConnection({self.player}, {self.game})"

Responsible for connections to the game, using the GPGNet protocol

Construct a new GameConnection

Ancestors

Instance variables

prop stateGameConnectionState
Expand source code
@property
def state(self) -> GameConnectionState:
    return self._state

Methods

async def abort(self, log_message: str = '')
Expand source code
async def abort(self, log_message: str = ""):
    """
    Abort the connection

    Removes the GameConnection object from the any associated Game object,
    and deletes references to Player and Game held by this object.
    """
    try:
        if self._state is GameConnectionState.ENDED:
            return

        self._logger.debug("%s.abort(%s)", self, log_message)

        if self.game.state is GameState.LOBBY:
            await self.disconnect_all_peers()

        self._state = GameConnectionState.ENDED
        await self.game.remove_game_connection(self)
        self._mark_dirty()
        self.player.state = PlayerState.IDLE
        if self.player.lobby_connection:
            self.player.lobby_connection.game_connection = None
        del self.player.game
        del self.player.game_connection
    except Exception as ex:  # pragma: no cover
        self._logger.debug("Exception in abort(): %s", ex)

Abort the connection

Removes the GameConnection object from the any associated Game object, and deletes references to Player and Game held by this object.

async def connect_to_host(self,
peer: GameConnection)
Expand source code
async def connect_to_host(self, peer: "GameConnection"):
    """
    Connect self to a given peer (host)
    """
    if not peer or peer.player.state != PlayerState.HOSTING:
        await self.abort("The host left the lobby")
        return

    await self.send_JoinGame(peer.player.login, peer.player.id)

    if not peer:
        await self.abort("The host left the lobby")
        return

    await peer.send_ConnectToPeer(
        player_name=self.player.login,
        player_uid=self.player.id,
        offer=True
    )

Connect self to a given peer (host)

async def connect_to_peer(self,
peer: GameConnection)
Expand source code
async def connect_to_peer(self, peer: "GameConnection"):
    """
    Connect two peers
    """
    if peer is not None:
        await self.send_ConnectToPeer(
            player_name=peer.player.login,
            player_uid=peer.player.id,
            offer=True
        )

    if peer is not None:
        with contextlib.suppress(DisconnectedError):
            await peer.send_ConnectToPeer(
                player_name=self.player.login,
                player_uid=self.player.id,
                offer=False
            )

Connect two peers

async def disconnect_all_peers(self)
Expand source code
async def disconnect_all_peers(self):
    tasks = []
    for peer in self.game.connections:
        if peer == self:
            continue

        tasks.append(peer.send_DisconnectFromPeer(self.player.id))

    for fut in asyncio.as_completed(tasks):
        try:
            await fut
        except Exception:
            self._logger.debug(
                "peer_sendDisconnectFromPeer failed for player %i",
                self.player.id,
                exc_info=True
            )
async def handle_action(self, command: str, args: list[typing.Any])
Expand source code
async def handle_action(self, command: str, args: list[Any]):
    """
    Handle GpgNetSend messages, wrapped in the JSON protocol
    """
    try:
        await COMMAND_HANDLERS[command](self, *args)
    except KeyError:
        self._logger.warning(
            "Unrecognized command %s: %s from player %s",
            command, args, self.player
        )
    except (TypeError, ValueError):
        self._logger.exception("Bad command arguments")
    except ConnectionError as e:
        raise e
    except Exception as e:  # pragma: no cover
        self._logger.exception(
            "Something awful happened in a game thread! %s",
            e
        )
        await self.abort()

Handle GpgNetSend messages, wrapped in the JSON protocol

async def handle_ai_option(self, name: Any, key: Any, value: Any)
Expand source code
async def handle_ai_option(self, name: Any, key: Any, value: Any):
    if not self.is_host():
        return

    self.game.set_ai_option(str(name), key, value)
    self._mark_dirty()
async def handle_bottleneck(self, *args: list[typing.Any])
Expand source code
async def handle_bottleneck(self, *args: list[Any]):
    """
    Not sure what this command means. This is currently unused but
    included for documentation purposes.
    """
    pass

Not sure what this command means. This is currently unused but included for documentation purposes.

async def handle_bottleneck_cleared(self, *args: list[typing.Any])
Expand source code
async def handle_bottleneck_cleared(self, *args: list[Any]):
    """
    Not sure what this command means. This is currently unused but
    included for documentation purposes.
    """
    pass

Not sure what this command means. This is currently unused but included for documentation purposes.

async def handle_chat(self, message: str)
Expand source code
async def handle_chat(self, message: str):
    """
    Whenever the player sends a chat message during the game lobby.
    """
    pass

Whenever the player sends a chat message during the game lobby.

async def handle_clear_slot(self, slot: Any)
Expand source code
async def handle_clear_slot(self, slot: Any):
    if not self.is_host():
        return

    self.game.clear_slot(int(slot))
    self._mark_dirty()
async def handle_desync(self, *_args)
Expand source code
async def handle_desync(self, *_args):  # pragma: no cover
    self.game.desyncs += 1
async def handle_disconnected(self, *args: list[typing.Any])
Expand source code
async def handle_disconnected(self, *args: list[Any]):
    """
    Not sure what this command means. This is currently unused but
    included for documentation purposes.
    """
    pass

Not sure what this command means. This is currently unused but included for documentation purposes.

async def handle_enforce_rating(self)
Expand source code
async def handle_enforce_rating(self):
    self.game.enforce_rating = True
async def handle_game_ended(self, *args: list[typing.Any])
Expand source code
async def handle_game_ended(self, *args: list[Any]):
    """
    Signals that the simulation has ended.
    """
    self.finished_sim = True
    await self.game.check_game_finish(self.player)

Signals that the simulation has ended.

async def handle_game_full(self)
Expand source code
async def handle_game_full(self):
    """
    Sent when all game slots are full
    """
    pass

Sent when all game slots are full

async def handle_game_mods(self, mode: Any, args: list[typing.Any])
Expand source code
async def handle_game_mods(self, mode: Any, args: list[Any]):
    if not self.is_host():
        return

    if mode == "activated":
        # In this case args is the number of mods
        if int(args) == 0:
            self.game.mods = {}

    elif mode == "uids":
        uids = str(args).split()
        self.game.mods = {uid: "Unknown sim mod" for uid in uids}
        async with self._db.acquire() as conn:
            rows = await conn.execute(
                "SELECT `uid`, `name` from `table_mod` WHERE `uid` in :ids",
                ids=tuple(uids)
            )
            for row in rows:
                self.game.mods[row.uid] = row.name
    else:
        self._logger.warning("Ignoring game mod: %s, %s", mode, args)
        return

    self._mark_dirty()
async def handle_game_option(self, key: str, value: Any)
Expand source code
async def handle_game_option(self, key: str, value: Any):
    if not self.is_host():
        return

    await self.game.game_options.set_option(key, value)

    self._mark_dirty()
async def handle_game_result(self, army: Any, result: Any)
Expand source code
async def handle_game_result(self, army: Any, result: Any):
    army = int(army)
    result = str(result).lower()

    try:
        *metadata, result_type, score = result.split()
    except ValueError:
        self._logger.warning("Invalid result for %s reported: %s", army, result)
    else:
        await self.game.add_result(
            self.player.id,
            army,
            result_type,
            int(score),
            frozenset(metadata),
        )
async def handle_game_state(self, state: str)
Expand source code
async def handle_game_state(self, state: str):
    """
    Changes in game state
    """

    if state == "Idle":
        await self._handle_idle_state()
        # Don't mark as dirty
        return

    elif state == "Lobby":
        # TODO: Do we still need to schedule with `ensure_future`?
        #
        # We do not yield from the task, since we
        # need to keep processing other commands while it runs
        await self._handle_lobby_state()

    elif state == "Launching":
        if self.player.state != PlayerState.HOSTING:
            return

        if self.game.state is not GameState.LOBBY:
            self._logger.warning(
                "Trying to launch game %s in invalid state %s",
                self.game,
                self.game.state
            )
            return

        self._logger.info("Launching game %s", self.game)

        await self.game.launch()

        if len(self.game.mods.keys()) > 0:
            async with self._db.acquire() as conn:
                uids = list(self.game.mods.keys())
                await conn.execute(
                    "UPDATE mod_stats s JOIN mod_version v ON "
                    "v.mod_id = s.mod_id "
                    "SET s.times_played = s.times_played + 1 "
                    "WHERE v.uid in :ids",
                    ids=tuple(uids)
                )
    # Signals that the FA executable has been closed
    elif state == "Ended":
        await self.on_connection_closed()
    self._mark_dirty()

Changes in game state

async def handle_ice_message(self, receiver_id: Any, ice_msg: str)
Expand source code
async def handle_ice_message(self, receiver_id: Any, ice_msg: str):
    receiver_id = int(receiver_id)
    peer = self.player_service.get_player(receiver_id)
    if not peer:
        self._logger.debug(
            "Ignoring ICE message for unknown player: %s", receiver_id
        )
        return

    game_connection = peer.game_connection
    if not game_connection:
        self._logger.debug(
            "Ignoring ICE message for player without game connection: %s", receiver_id
        )
        return

    try:
        await game_connection.send({
            "command": "IceMsg",
            "args": [int(self.player.id), ice_msg]
        })
    except DisconnectedError:
        self._logger.debug(
            "Failed to send ICE message to player due to a disconnect: %s",
            receiver_id
        )
async def handle_json_stats(self, stats: str)
Expand source code
async def handle_json_stats(self, stats: str):
    try:
        self.game.report_army_stats(stats)
    except json.JSONDecodeError as e:
        self._logger.warning(
            "Malformed game stats reported by %s: '...%s...'",
            self.player.login,
            stats[e.pos-20:e.pos+20]
        )
async def handle_launch_status(self, status: str)
Expand source code
async def handle_launch_status(self, status: str):
    """
    Currently is sent with status `Rejected` if a matchmaker game failed
    to start due to players using differing game settings.
    """
    pass

Currently is sent with status Rejected if a matchmaker game failed to start due to players using differing game settings.

async def handle_operation_complete(self, primary: Any, secondary: Any, delta: str)
Expand source code
async def handle_operation_complete(
    self, primary: Any, secondary: Any, delta: str
):
    """
    # Params
    - `primary`: are primary mission objectives complete?
    - `secondary`: are secondary mission objectives complete?
    - `delta`: the time it took to complete the mission
    """
    primary = FA.ENABLED == primary
    secondary = FA.ENABLED == secondary

    if not primary:
        return

    if not isinstance(self.game, CoopGame):
        self._logger.warning(
            "OperationComplete called for non-coop game: %s", self.game.id
        )
        return

    if self.game.validity != ValidityState.COOP_NOT_RANKED:
        return

    secondary, delta = secondary, str(delta)
    async with self._db.acquire() as conn:
        result = await conn.execute(
            select(coop_map.c.id).where(
                coop_map.c.filename == self.game.map.file_path
            )
        )
        row = result.fetchone()
        if not row:
            self._logger.debug(
                "can't find coop map: %s", self.game.map.file_path
            )
            return
        mission = row.id

        # Each player in a co-op game will send the OperationComplete
        # message but we only need to perform this insert once
        async with self.game.leaderboard_lock:
            if not self.game.leaderboard_saved:
                await conn.execute(
                    coop_leaderboard.insert().values(
                        mission=mission,
                        gameuid=self.game.id,
                        secondary=secondary,
                        time=delta,
                        player_count=len(self.game.players),
                    )
                )
                self.game.leaderboard_saved = True

Params

  • primary: are primary mission objectives complete?
  • secondary: are secondary mission objectives complete?
  • delta: the time it took to complete the mission
async def handle_player_option(self, player_id: Any, key: Any, value: Any)
Expand source code
async def handle_player_option(
    self, player_id: Any, key: Any, value: Any
):
    if not self.is_host():
        return

    self.game.set_player_option(int(player_id), key, value)
    self._mark_dirty()
async def handle_rehost(self, *args: list[typing.Any])
Expand source code
async def handle_rehost(self, *args: list[Any]):
    """
    Signals that the user has rehosted the game. This is currently unused but
    included for documentation purposes.
    """
    pass

Signals that the user has rehosted the game. This is currently unused but included for documentation purposes.

async def handle_teamkill_happened(self,
gametime: Any,
victim_id: Any,
victim_name: str,
teamkiller_id: Any,
teamkiller_name: str)
Expand source code
async def handle_teamkill_happened(
    self,
    gametime: Any,
    victim_id: Any,
    victim_name: str,
    teamkiller_id: Any,
    teamkiller_name: str,
):
    """
    Send automatically by the game whenever a teamkill happens. Takes
    the same parameters as TeamkillReport.

    # Params
    - `gametime`: seconds of gametime when kill happened
    - `victim_id`: victim id
    - `victim_name`: victim nickname (for debug purpose only)
    - `teamkiller_id`: teamkiller id
    - `teamkiller_name`: teamkiller nickname (for debug purpose only)
    """
    victim_id = int(victim_id)
    teamkiller_id = int(teamkiller_id)

    if 0 in (victim_id, teamkiller_id):
        self._logger.debug("Ignoring teamkill for AI player")
        return

    async with self._db.acquire() as conn:
        await conn.execute(
            teamkills.insert().values(
                teamkiller=teamkiller_id,
                victim=victim_id,
                game_id=self.game.id,
                gametime=gametime,
            )
        )

Send automatically by the game whenever a teamkill happens. Takes the same parameters as TeamkillReport.

Params

  • gametime: seconds of gametime when kill happened
  • victim_id: victim id
  • victim_name: victim nickname (for debug purpose only)
  • teamkiller_id: teamkiller id
  • teamkiller_name: teamkiller nickname (for debug purpose only)
async def handle_teamkill_report(self,
gametime: Any,
reporter_id: Any,
reporter_name: str,
teamkiller_id: Any,
teamkiller_name: str)
Expand source code
async def handle_teamkill_report(
    self,
    gametime: Any,
    reporter_id: Any,
    reporter_name: str,
    teamkiller_id: Any,
    teamkiller_name: str,
):
    """
    Sent when a player is teamkilled and clicks the 'Report' button.

    # Params
    - `gametime`: seconds of gametime when kill happened
    - `reporter_id`: reporter id
    - `reporter_name`: reporter nickname (for debug purpose only)
    - `teamkiller_id`: teamkiller id
    - `teamkiller_name`: teamkiller nickname (for debug purpose only)
    """
    pass

Sent when a player is teamkilled and clicks the 'Report' button.

Params

  • gametime: seconds of gametime when kill happened
  • reporter_id: reporter id
  • reporter_name: reporter nickname (for debug purpose only)
  • teamkiller_id: teamkiller id
  • teamkiller_name: teamkiller nickname (for debug purpose only)
def is_host(self) ‑> bool
Expand source code
def is_host(self) -> bool:
    if not self.game or not self.player:
        return False

    return (
        self.player.state == PlayerState.HOSTING and
        self.player == self.game.host
    )
async def on_connection_closed(self)
Expand source code
async def on_connection_closed(self):
    """
    The connection is closed by the player.
    """
    try:
        await self.game.disconnect_player(self.player)
    except Exception as e:  # pragma: no cover
        self._logger.exception(e)
    finally:
        await self.abort()

The connection is closed by the player.

async def on_connection_lost(self)
Expand source code
async def on_connection_lost(self):
    """
    The connection is lost due to a disconnect from the lobby server.
    """
    try:
        await self.game.remove_game_connection(self)
    except Exception as e:  # pragma: no cover
        self._logger.exception(e)
    finally:
        await self.abort()

The connection is lost due to a disconnect from the lobby server.

async def send(self, message)
Expand source code
async def send(self, message):
    """
    Send a game message to the client.

    # Errors
    May raise `DisconnectedError`

    NOTE: When calling this on a connection other than `self` make sure to
    handle `DisconnectedError`, otherwise failure to send the message will
    cause the caller to be disconnected as well.
    """
    message["target"] = "game"

    self._logger.log(TRACE, ">> %s: %s", self.player.login, message)
    await self.protocol.send_message(message)

Send a game message to the client.

Errors

May raise DisconnectedError

NOTE: When calling this on a connection other than self make sure to handle DisconnectedError, otherwise failure to send the message will cause the caller to be disconnected as well.

async def timeout_game_connection(self, timeout)
Expand source code
async def timeout_game_connection(self, timeout):
    await asyncio.sleep(timeout)
    if self.state is GameConnectionState.INITIALIZING:
        self._logger.debug("GameConection timed out...")
        await self.abort("Player took too long to start the game")

Inherited members

class GameService (database: FAFDatabase,
player_service,
game_stats_service,
rating_service: RatingService,
message_queue_service: MessageQueueService)
Expand source code
@with_logger
class GameService(Service):
    """
    Utility class for maintaining lifecycle of games
    """

    def __init__(
        self,
        database: FAFDatabase,
        player_service,
        game_stats_service,
        rating_service: RatingService,
        message_queue_service: MessageQueueService
    ):
        self._db = database
        self._dirty_games: set[Game] = set()
        self._dirty_queues: set[MatchmakerQueue] = set()
        self.player_service = player_service
        self.game_stats_service = game_stats_service
        self._rating_service = rating_service
        self._message_queue_service = message_queue_service
        self.game_id_counter = 0
        self._allow_new_games = False
        self._drain_event = None

        # Populated below in update_data.
        self.featured_mods = dict()

        # A set of mod ids that are allowed in ranked games
        self.ranked_mods: set[str] = set()

        # A cache of map_version info needed by Game
        self.map_info_cache = LRUCache(maxsize=256)

        # The set of active games
        self._games: dict[int, Game] = dict()

    async def initialize(self) -> None:
        await self.initialise_game_counter()
        await self.update_data()
        self._update_cron = aiocron.crontab(
            "*/10 * * * *", func=self.update_data
        )
        self._allow_new_games = True

    async def initialise_game_counter(self):
        async with self._db.acquire() as conn:
            # InnoDB, unusually, doesn't allow insertion of values greater than the next expected
            # value into an auto_increment field. We'd like to do that, because we no longer insert
            # games into the database when they don't start, so game ids aren't contiguous (as
            # unstarted games consume ids that never get written out).
            # So, id has to just be an integer primary key, no auto-increment: we handle its
            # incrementing here in game service, but have to do this slightly expensive query on
            # startup (though the primary key index probably makes it super fast anyway).
            # This is definitely a better choice than inserting useless rows when games are created,
            # doing LAST_UPDATE_ID to get the id number, and then doing an UPDATE when the actual
            # data to go into the row becomes available: we now only do a single insert for each
            # game, and don't end up with 800,000 junk rows in the database.
            sql = "SELECT MAX(id) FROM game_stats"
            self.game_id_counter = await conn.scalar(sql) or 0

    async def update_data(self):
        """
        Loads from the database the mostly-constant things that it doesn't make sense to query every
        time we need, but which can in principle change over time.
        """
        async with self._db.acquire() as conn:
            rows = await conn.execute(
                select(
                    game_featuredMods.c.id,
                    game_featuredMods.c.gamemod,
                    game_featuredMods.c.name,
                    game_featuredMods.c.description,
                    game_featuredMods.c.publish,
                    game_featuredMods.c.order
                )
            )

            for row in rows:
                self.featured_mods[row.gamemod] = FeaturedMod(
                    row.id,
                    row.gamemod,
                    row.name,
                    row.description,
                    row.publish,
                    row.order
                )

            result = await conn.execute(
                "SELECT uid FROM table_mod WHERE ranked = 1"
            )

            # Turn resultset into a list of uids
            self.ranked_mods = {row.uid for row in result}

    async def get_map(self, folder_name: str) -> Map:
        folder_name = folder_name.lower()
        filename = f"maps/{folder_name}.zip"

        map = self.map_info_cache.get(filename)
        if map is not None:
            return map

        async with self._db.acquire() as conn:
            result = await conn.execute(
                select(
                    map_version.c.id,
                    map_version.c.filename,
                    map_version.c.ranked,
                )
                .where(
                    func.lower(map_version.c.filename) == filename
                )
            )
            row = result.fetchone()
            if not row:
                # The map requested is not in the database. This is fine as
                # players may be using privately shared or generated maps that
                # are not in the vault.
                return Map(
                    id=None,
                    folder_name=folder_name,
                    ranked=NeroxisGeneratedMap.is_neroxis_map(folder_name),
                )

            map = Map(
                id=row.id,
                folder_name=folder_name,
                ranked=row.ranked
            )
            self.map_info_cache[filename] = map
            return map

    def mark_dirty(self, obj: Union[Game, MatchmakerQueue]):
        if isinstance(obj, Game):
            self._dirty_games.add(obj)
        elif isinstance(obj, MatchmakerQueue):
            self._dirty_queues.add(obj)

    def pop_dirty_games(self) -> set[Game]:
        dirty_games = self._dirty_games
        self._dirty_games = set()

        return dirty_games

    def pop_dirty_queues(self) -> set[MatchmakerQueue]:
        dirty_queues = self._dirty_queues
        self._dirty_queues = set()

        return dirty_queues

    def create_uid(self) -> int:
        self.game_id_counter += 1

        return self.game_id_counter

    def create_game(
        self,
        game_mode: str,
        game_class: type[Game] = CustomGame,
        visibility=VisibilityState.PUBLIC,
        host: Optional[Player] = None,
        name: Optional[str] = None,
        map: Map = MAP_DEFAULT,
        password: Optional[str] = None,
        matchmaker_queue_id: Optional[int] = None,
        **kwargs
    ):
        """
        Main entrypoint for creating new games
        """
        if not self._allow_new_games:
            raise DisabledError()

        game_id = self.create_uid()
        game_args = {
            "database": self._db,
            "id": game_id,
            "host": host,
            "name": name,
            "map": map,
            "game_mode": game_mode,
            "game_service": self,
            "game_stats_service": self.game_stats_service,
            "matchmaker_queue_id": matchmaker_queue_id,
        }
        game_args.update(kwargs)
        game = game_class(**game_args)

        self._games[game_id] = game

        game.visibility = visibility
        game.password = password

        self.mark_dirty(game)
        return game

    def update_active_game_metrics(self):
        modes = list(self.featured_mods.keys())

        game_counter = Counter(
            (
                game.game_mode if game.game_mode in modes else "other",
                game.state
            )
            for game in self._games.values()
        )

        for state in GameState:
            for mode in modes + ["other"]:
                metrics.active_games.labels(mode, state.name).set(
                    game_counter[(mode, state)]
                )

        rating_type_counter = Counter(
            (
                game.rating_type,
                game.state
            )
            for game in self._games.values()
        )

        for state in GameState:
            for rating_type in rating_type_counter.keys():
                metrics.active_games_by_rating_type.labels(rating_type, state.name).set(
                    rating_type_counter[(rating_type, state)]
                )

    @property
    def all_games(self) -> ValuesView[Game]:
        return self._games.values()

    @property
    def live_games(self) -> list[Game]:
        return [
            game
            for game in self.all_games
            if game.state is GameState.LIVE
        ]

    @property
    def open_games(self) -> list[Game]:
        """
        Return all games that meet the client's definition of "not closed".
        Server game states are mapped to client game states as follows:

            GameState.LOBBY: "open",
            GameState.LIVE: "playing",
            GameState.ENDED: "closed",
            GameState.INITIALIZING: "closed",

        The client ignores everything "closed". This property fetches all such not-closed games.
        """
        return [
            game
            for game in self.all_games
            if game.state in (GameState.LOBBY, GameState.LIVE)
        ]

    @property
    def pending_games(self) -> list[Game]:
        return [
            game
            for game in self.all_games
            if game.state in (GameState.LOBBY, GameState.INITIALIZING)
        ]

    def remove_game(self, game: Game):
        if game.id in self._games:
            self._logger.debug("Removing game %s", game)
            del self._games[game.id]

        if (
            self._drain_event is not None
            and not self._drain_event.is_set()
            and not self._games
        ):
            self._drain_event.set()

    def __getitem__(self, item: int) -> Game:
        return self._games[item]

    def __contains__(self, item):
        return item in self._games

    async def publish_game_results(self, game_results: EndedGameInfo):
        result_dict = game_results.to_dict()
        await self._message_queue_service.publish(
            config.MQ_EXCHANGE_NAME,
            "success.gameResults.create",
            result_dict,
        )

        if (
            game_results.validity is ValidityState.VALID
            and game_results.rating_type is not None
        ):
            metrics.rated_games.labels(game_results.rating_type).inc()
            # TODO: Remove when rating service starts listening to message queue
            await self._rating_service.enqueue(result_dict)

    async def drain_games(self):
        """
        Wait for all games to finish.
        """
        if not self._games:
            return

        if not self._drain_event:
            self._drain_event = asyncio.Event()

        await self._drain_event.wait()

    async def graceful_shutdown(self):
        self._allow_new_games = False

        await self.close_lobby_games()

    async def close_lobby_games(self):
        self._logger.info("Closing all games currently in lobby")
        for game in self.pending_games:
            for game_connection in list(game.connections):
                # Tell the client to kill the FA process
                game_connection.player.write_message({
                    "command": "notice",
                    "style": "kill"
                })
                await game_connection.abort()

Utility class for maintaining lifecycle of games

Ancestors

Instance variables

prop all_games : ValuesView[Game]
Expand source code
@property
def all_games(self) -> ValuesView[Game]:
    return self._games.values()
prop live_games : list[Game]
Expand source code
@property
def live_games(self) -> list[Game]:
    return [
        game
        for game in self.all_games
        if game.state is GameState.LIVE
    ]
prop open_games : list[Game]
Expand source code
@property
def open_games(self) -> list[Game]:
    """
    Return all games that meet the client's definition of "not closed".
    Server game states are mapped to client game states as follows:

        GameState.LOBBY: "open",
        GameState.LIVE: "playing",
        GameState.ENDED: "closed",
        GameState.INITIALIZING: "closed",

    The client ignores everything "closed". This property fetches all such not-closed games.
    """
    return [
        game
        for game in self.all_games
        if game.state in (GameState.LOBBY, GameState.LIVE)
    ]

Return all games that meet the client's definition of "not closed". Server game states are mapped to client game states as follows:

GameState.LOBBY: "open",
GameState.LIVE: "playing",
GameState.ENDED: "closed",
GameState.INITIALIZING: "closed",

The client ignores everything "closed". This property fetches all such not-closed games.

prop pending_games : list[Game]
Expand source code
@property
def pending_games(self) -> list[Game]:
    return [
        game
        for game in self.all_games
        if game.state in (GameState.LOBBY, GameState.INITIALIZING)
    ]

Methods

async def close_lobby_games(self)
Expand source code
async def close_lobby_games(self):
    self._logger.info("Closing all games currently in lobby")
    for game in self.pending_games:
        for game_connection in list(game.connections):
            # Tell the client to kill the FA process
            game_connection.player.write_message({
                "command": "notice",
                "style": "kill"
            })
            await game_connection.abort()
def create_game(self,
game_mode: str,
game_class: type[Game] = server.games.custom_game.CustomGame,
visibility=VisibilityState.PUBLIC,
host: Player | None = None,
name: str | None = None,
map: Map = Map(id=None, folder_name='scmp_007', ranked=False, weight=1),
password: str | None = None,
matchmaker_queue_id: int | None = None,
**kwargs)
Expand source code
def create_game(
    self,
    game_mode: str,
    game_class: type[Game] = CustomGame,
    visibility=VisibilityState.PUBLIC,
    host: Optional[Player] = None,
    name: Optional[str] = None,
    map: Map = MAP_DEFAULT,
    password: Optional[str] = None,
    matchmaker_queue_id: Optional[int] = None,
    **kwargs
):
    """
    Main entrypoint for creating new games
    """
    if not self._allow_new_games:
        raise DisabledError()

    game_id = self.create_uid()
    game_args = {
        "database": self._db,
        "id": game_id,
        "host": host,
        "name": name,
        "map": map,
        "game_mode": game_mode,
        "game_service": self,
        "game_stats_service": self.game_stats_service,
        "matchmaker_queue_id": matchmaker_queue_id,
    }
    game_args.update(kwargs)
    game = game_class(**game_args)

    self._games[game_id] = game

    game.visibility = visibility
    game.password = password

    self.mark_dirty(game)
    return game

Main entrypoint for creating new games

def create_uid(self) ‑> int
Expand source code
def create_uid(self) -> int:
    self.game_id_counter += 1

    return self.game_id_counter
async def drain_games(self)
Expand source code
async def drain_games(self):
    """
    Wait for all games to finish.
    """
    if not self._games:
        return

    if not self._drain_event:
        self._drain_event = asyncio.Event()

    await self._drain_event.wait()

Wait for all games to finish.

async def get_map(self, folder_name: str) ‑> Map
Expand source code
async def get_map(self, folder_name: str) -> Map:
    folder_name = folder_name.lower()
    filename = f"maps/{folder_name}.zip"

    map = self.map_info_cache.get(filename)
    if map is not None:
        return map

    async with self._db.acquire() as conn:
        result = await conn.execute(
            select(
                map_version.c.id,
                map_version.c.filename,
                map_version.c.ranked,
            )
            .where(
                func.lower(map_version.c.filename) == filename
            )
        )
        row = result.fetchone()
        if not row:
            # The map requested is not in the database. This is fine as
            # players may be using privately shared or generated maps that
            # are not in the vault.
            return Map(
                id=None,
                folder_name=folder_name,
                ranked=NeroxisGeneratedMap.is_neroxis_map(folder_name),
            )

        map = Map(
            id=row.id,
            folder_name=folder_name,
            ranked=row.ranked
        )
        self.map_info_cache[filename] = map
        return map
async def initialise_game_counter(self)
Expand source code
async def initialise_game_counter(self):
    async with self._db.acquire() as conn:
        # InnoDB, unusually, doesn't allow insertion of values greater than the next expected
        # value into an auto_increment field. We'd like to do that, because we no longer insert
        # games into the database when they don't start, so game ids aren't contiguous (as
        # unstarted games consume ids that never get written out).
        # So, id has to just be an integer primary key, no auto-increment: we handle its
        # incrementing here in game service, but have to do this slightly expensive query on
        # startup (though the primary key index probably makes it super fast anyway).
        # This is definitely a better choice than inserting useless rows when games are created,
        # doing LAST_UPDATE_ID to get the id number, and then doing an UPDATE when the actual
        # data to go into the row becomes available: we now only do a single insert for each
        # game, and don't end up with 800,000 junk rows in the database.
        sql = "SELECT MAX(id) FROM game_stats"
        self.game_id_counter = await conn.scalar(sql) or 0
def mark_dirty(self,
obj: Game | MatchmakerQueue)
Expand source code
def mark_dirty(self, obj: Union[Game, MatchmakerQueue]):
    if isinstance(obj, Game):
        self._dirty_games.add(obj)
    elif isinstance(obj, MatchmakerQueue):
        self._dirty_queues.add(obj)
def pop_dirty_games(self) ‑> set[Game]
Expand source code
def pop_dirty_games(self) -> set[Game]:
    dirty_games = self._dirty_games
    self._dirty_games = set()

    return dirty_games
def pop_dirty_queues(self) ‑> set[MatchmakerQueue]
Expand source code
def pop_dirty_queues(self) -> set[MatchmakerQueue]:
    dirty_queues = self._dirty_queues
    self._dirty_queues = set()

    return dirty_queues
async def publish_game_results(self,
game_results: EndedGameInfo)
Expand source code
async def publish_game_results(self, game_results: EndedGameInfo):
    result_dict = game_results.to_dict()
    await self._message_queue_service.publish(
        config.MQ_EXCHANGE_NAME,
        "success.gameResults.create",
        result_dict,
    )

    if (
        game_results.validity is ValidityState.VALID
        and game_results.rating_type is not None
    ):
        metrics.rated_games.labels(game_results.rating_type).inc()
        # TODO: Remove when rating service starts listening to message queue
        await self._rating_service.enqueue(result_dict)
def remove_game(self,
game: Game)
Expand source code
def remove_game(self, game: Game):
    if game.id in self._games:
        self._logger.debug("Removing game %s", game)
        del self._games[game.id]

    if (
        self._drain_event is not None
        and not self._drain_event.is_set()
        and not self._games
    ):
        self._drain_event.set()
def update_active_game_metrics(self)
Expand source code
def update_active_game_metrics(self):
    modes = list(self.featured_mods.keys())

    game_counter = Counter(
        (
            game.game_mode if game.game_mode in modes else "other",
            game.state
        )
        for game in self._games.values()
    )

    for state in GameState:
        for mode in modes + ["other"]:
            metrics.active_games.labels(mode, state.name).set(
                game_counter[(mode, state)]
            )

    rating_type_counter = Counter(
        (
            game.rating_type,
            game.state
        )
        for game in self._games.values()
    )

    for state in GameState:
        for rating_type in rating_type_counter.keys():
            metrics.active_games_by_rating_type.labels(rating_type, state.name).set(
                rating_type_counter[(rating_type, state)]
            )
async def update_data(self)
Expand source code
async def update_data(self):
    """
    Loads from the database the mostly-constant things that it doesn't make sense to query every
    time we need, but which can in principle change over time.
    """
    async with self._db.acquire() as conn:
        rows = await conn.execute(
            select(
                game_featuredMods.c.id,
                game_featuredMods.c.gamemod,
                game_featuredMods.c.name,
                game_featuredMods.c.description,
                game_featuredMods.c.publish,
                game_featuredMods.c.order
            )
        )

        for row in rows:
            self.featured_mods[row.gamemod] = FeaturedMod(
                row.id,
                row.gamemod,
                row.name,
                row.description,
                row.publish,
                row.order
            )

        result = await conn.execute(
            "SELECT uid FROM table_mod WHERE ranked = 1"
        )

        # Turn resultset into a list of uids
        self.ranked_mods = {row.uid for row in result}

Loads from the database the mostly-constant things that it doesn't make sense to query every time we need, but which can in principle change over time.

Inherited members

class GameStatsService (event_service: EventService,
achievement_service: AchievementService)
Expand source code
@with_logger
class GameStatsService(Service):
    def __init__(
        self,
        event_service: EventService,
        achievement_service: AchievementService
    ):
        self._event_service = event_service
        self._achievement_service = achievement_service

    async def process_game_stats(
        self,
        player: Player,
        game: Game,
        army_stats_list: list
    ):
        try:
            await self._process_game_stats(player, game, army_stats_list)
        except KeyError as e:
            self._logger.info("Malformed game stats. KeyError: %s", e)
        except Exception:
            self._logger.exception(
                "Error processing game stats for %s in game %d",
                player.login,
                game.id
            )

    async def _process_game_stats(
        self,
        player: Player,
        game: Game,
        army_stats_list: list
    ):
        stats = None
        number_of_humans = 0
        highest_score = 0
        highest_scorer = None

        for army_stats in army_stats_list:
            if army_stats["type"] == "AI" and army_stats["name"] != "civilian":
                self._logger.debug("Ignoring AI game reported by %s", player.login)
                return

            if army_stats["type"] == "Human":
                number_of_humans += 1

                if highest_score < army_stats["general"]["score"]:
                    highest_score = army_stats["general"]["score"]
                    highest_scorer = army_stats["name"]

            if army_stats["name"] == player.login:
                stats = army_stats

        if number_of_humans < 2:
            self._logger.debug("Ignoring single player game reported by %s", player.login)
            return

        if stats is None:
            self._logger.warning("Player %s reported stats of a game he was not part of", player.login)
            return

        army_result = game.get_player_outcome(player)
        if army_result is ArmyOutcome.UNKNOWN:
            self._logger.warning("No army result available for player %s", player.login)
            return

        self._logger.debug("Processing game stats for player: %s", player.login)

        faction = stats["faction"]
        # Stores achievements to batch update
        a_queue = []
        # Stores events to batch update
        e_queue = []
        self._logger.debug("Army result for %s => %s ", player, army_result)

        survived = army_result is ArmyOutcome.VICTORY
        blueprint_stats = stats["blueprints"]
        unit_stats = stats["units"]
        scored_highest = highest_scorer == player.login

        if survived and game.rating_type == RatingType.LADDER_1V1:
            self._unlock(ACH_FIRST_SUCCESS, a_queue)

        self._increment(ACH_NOVICE, 1, a_queue)
        self._increment(ACH_JUNIOR, 1, a_queue)
        self._increment(ACH_SENIOR, 1, a_queue)
        self._increment(ACH_VETERAN, 1, a_queue)
        self._increment(ACH_ADDICT, 1, a_queue)

        self._faction_played(faction, survived, a_queue, e_queue)
        self._category_stats(unit_stats, survived, a_queue, e_queue)
        self._killed_acus(unit_stats, survived, a_queue)
        self._built_mercies(_count_built_units(blueprint_stats, Unit.MERCY), a_queue)
        self._built_fire_beetles(_count_built_units(blueprint_stats, Unit.FIRE_BEETLE), a_queue)
        self._built_salvations(_count_built_units(blueprint_stats, Unit.SALVATION), survived, a_queue)
        self._built_yolona_oss(_count_built_units(blueprint_stats, Unit.YOLONA_OSS), survived, a_queue)
        self._built_paragons(_count_built_units(blueprint_stats, Unit.PARAGON), survived, a_queue)
        self._built_atlantis(_count_built_units(blueprint_stats, Unit.ATLANTIS), a_queue)
        self._built_tempests(_count_built_units(blueprint_stats, Unit.TEMPEST), a_queue)
        self._built_scathis(_count_built_units(blueprint_stats, Unit.SCATHIS), survived, a_queue)
        self._built_mavors(_count_built_units(blueprint_stats, Unit.MAVOR), survived, a_queue)
        self._built_czars(_count_built_units(blueprint_stats, Unit.CZAR), a_queue)
        self._built_ahwassas(_count_built_units(blueprint_stats, Unit.AHWASSA), a_queue)
        self._built_ythothas(_count_built_units(blueprint_stats, Unit.YTHOTHA), a_queue)
        self._built_fatboys(_count_built_units(blueprint_stats, Unit.FATBOY), a_queue)
        self._built_monkeylords(_count_built_units(blueprint_stats, Unit.MONKEYLORD), a_queue)
        self._built_galactic_colossus(_count_built_units(blueprint_stats, Unit.GALACTIC_COLOSSUS), a_queue)
        self._built_soul_rippers(_count_built_units(blueprint_stats, Unit.SOUL_RIPPER), a_queue)
        self._built_megaliths(_count_built_units(blueprint_stats, Unit.MEGALITH), a_queue)
        self._built_asfs(_count_built_units(blueprint_stats, *ASFS), a_queue)
        self._built_transports(unit_stats["transportation"].get("built", 0), a_queue)
        self._built_sacus(unit_stats["sacu"].get("built", 0), a_queue)
        self._lowest_acu_health(_count(blueprint_stats, lambda x: x.get("lowest_health", 0), *ACUS), survived, a_queue)
        self._highscore(scored_highest, number_of_humans, a_queue)

        await self._achievement_service.execute_batch_update(player.id, a_queue)
        await self._event_service.execute_batch_update(player.id, e_queue)

    def _category_stats(self, unit_stats, survived, achievements_queue, events_queue):
        built_air = unit_stats["air"].get("built", 0)
        built_land = unit_stats["land"].get("built", 0)
        built_naval = unit_stats["naval"].get("built", 0)
        built_experimentals = unit_stats["experimental"].get("built", 0)

        self._record_event(EVENT_BUILT_AIR_UNITS, built_air, events_queue)
        self._record_event(EVENT_LOST_AIR_UNITS, unit_stats["air"].get("lost", 0), events_queue)
        self._record_event(EVENT_BUILT_LAND_UNITS, built_land, events_queue)
        self._record_event(EVENT_LOST_LAND_UNITS, unit_stats["land"].get("lost", 0), events_queue)
        self._record_event(EVENT_BUILT_NAVAL_UNITS, built_naval, events_queue)
        self._record_event(EVENT_LOST_NAVAL_UNITS, unit_stats["naval"].get("lost", 0), events_queue)
        self._record_event(EVENT_LOST_ACUS, unit_stats["cdr"].get("lost", 0), events_queue)
        self._record_event(EVENT_BUILT_TECH_1_UNITS, unit_stats["tech1"].get("built", 0), events_queue)
        self._record_event(EVENT_LOST_TECH_1_UNITS, unit_stats["tech1"].get("lost", 0), events_queue)
        self._record_event(EVENT_BUILT_TECH_2_UNITS, unit_stats["tech2"].get("built", 0), events_queue)
        self._record_event(EVENT_LOST_TECH_2_UNITS, unit_stats["tech2"].get("lost", 0), events_queue)
        self._record_event(EVENT_BUILT_TECH_3_UNITS, unit_stats["tech3"].get("built", 0), events_queue)
        self._record_event(EVENT_LOST_TECH_3_UNITS, unit_stats["tech3"].get("lost", 0), events_queue)
        self._record_event(EVENT_BUILT_EXPERIMENTALS, built_experimentals, events_queue)
        self._record_event(EVENT_LOST_EXPERIMENTALS, unit_stats["experimental"].get("lost", 0), events_queue)
        self._record_event(EVENT_BUILT_ENGINEERS, unit_stats["engineer"].get("built", 0), events_queue)
        self._record_event(EVENT_LOST_ENGINEERS, unit_stats["engineer"].get("lost", 0), events_queue)

        if survived:
            if built_air > built_land and built_air > built_naval:
                self._increment(ACH_WRIGHT_BROTHER, 1, achievements_queue)
                self._increment(ACH_WINGMAN, 1, achievements_queue)
                self._increment(ACH_KING_OF_THE_SKIES, 1, achievements_queue)
            elif built_land > built_air and built_land > built_naval:
                self._increment(ACH_MILITIAMAN, 1, achievements_queue)
                self._increment(ACH_GRENADIER, 1, achievements_queue)
                self._increment(ACH_FIELD_MARSHAL, 1, achievements_queue)
            elif built_naval > built_land and built_naval > built_air:
                self._increment(ACH_LANDLUBBER, 1, achievements_queue)
                self._increment(ACH_SEAMAN, 1, achievements_queue)
                self._increment(ACH_ADMIRAL_OF_THE_FLEET, 1, achievements_queue)

            if built_experimentals > 0:
                self._increment(ACH_DR_EVIL, built_experimentals, achievements_queue)

                if built_experimentals >= 3:
                    self._increment(ACH_TECHIE, 1, achievements_queue)
                    self._increment(ACH_I_LOVE_BIG_TOYS, 1, achievements_queue)
                    self._increment(ACH_EXPERIMENTALIST, 1, achievements_queue)

    def _faction_played(self, faction, survived, achievements_queue, events_queue):
        if faction == Faction.aeon:
            self._record_event(EVENT_AEON_PLAYS, 1, events_queue)

            if survived:
                self._record_event(EVENT_AEON_WINS, 1, events_queue)
                self._increment(ACH_AURORA, 1, achievements_queue)
                self._increment(ACH_BLAZE, 1, achievements_queue)
                self._increment(ACH_SERENITY, 1, achievements_queue)
        elif faction == Faction.cybran:
            self._record_event(EVENT_CYBRAN_PLAYS, 1, events_queue)

            if survived:
                self._record_event(EVENT_CYBRAN_WINS, 1, events_queue)
                self._increment(ACH_MANTIS, 1, achievements_queue)
                self._increment(ACH_WAGNER, 1, achievements_queue)
                self._increment(ACH_TREBUCHET, 1, achievements_queue)
        elif faction == Faction.uef:
            self._record_event(EVENT_UEF_PLAYS, 1, events_queue)

            if survived:
                self._record_event(EVENT_UEF_WINS, 1, events_queue)
                self._increment(ACH_MA12_STRIKER, 1, achievements_queue)
                self._increment(ACH_RIPTIDE, 1, achievements_queue)
                self._increment(ACH_DEMOLISHER, 1, achievements_queue)
        elif faction == Faction.seraphim:
            self._record_event(EVENT_SERAPHIM_PLAYS, 1, events_queue)

            if survived:
                self._record_event(EVENT_SERAPHIM_WINS, 1, events_queue)
                self._increment(ACH_THAAM, 1, achievements_queue)
                self._increment(ACH_YENZYNE, 1, achievements_queue)
                self._increment(ACH_SUTHANUS, 1, achievements_queue)

    def _killed_acus(self, unit_stats, survived, achievements_queue):
        killed_acus = unit_stats["cdr"].get("kills", 0)

        if killed_acus > 0:
            self._increment(ACH_DONT_MESS_WITH_ME, killed_acus, achievements_queue)

        if killed_acus >= 3 and survived:
            self._unlock(ACH_HATTRICK, achievements_queue)

    def _built_mercies(self, count, achievements_queue):
        self._increment(ACH_NO_MERCY, count, achievements_queue)

    def _built_fire_beetles(self, count, achievements_queue):
        self._increment(ACH_DEADLY_BUGS, count, achievements_queue)

    def _built_salvations(self, count, survived, achievements_queue):
        if survived and count > 0:
            self._unlock(ACH_RAINMAKER, achievements_queue)

    def _built_yolona_oss(self, count, survived, achievements_queue):
        if survived and count > 0:
            self._unlock(ACH_NUCLEAR_WAR, achievements_queue)

    def _built_paragons(self, count, survived, achievements_queue):
        if survived and count > 0:
            self._unlock(ACH_SO_MUCH_RESOURCES, achievements_queue)

    def _built_atlantis(self, count, achievements_queue):
        self._increment(ACH_IT_AINT_A_CITY, count, achievements_queue)

    def _built_tempests(self, count, achievements_queue):
        self._increment(ACH_STORMY_SEA, count, achievements_queue)

    def _built_scathis(self, count, survived, achievements_queue):
        if survived and count > 0:
            self._unlock(ACH_MAKE_IT_HAIL, achievements_queue)

    def _built_mavors(self, count, survived, achievements_queue):
        if survived and count > 0:
            self._unlock(ACH_I_HAVE_A_CANON, achievements_queue)

    def _built_czars(self, count, achievements_queue):
        self._increment(ACH_DEATH_FROM_ABOVE, count, achievements_queue)

    def _built_ahwassas(self, count, achievements_queue):
        self._increment(ACH_ASS_WASHER, count, achievements_queue)

    def _built_ythothas(self, count, achievements_queue):
        self._increment(ACH_ALIEN_INVASION, count, achievements_queue)

    def _built_fatboys(self, count, achievements_queue):
        self._increment(ACH_FATTER_IS_BETTER, count, achievements_queue)

    def _built_monkeylords(self, count, achievements_queue):
        self._increment(ACH_ARACHNOLOGIST, count, achievements_queue)

    def _built_galactic_colossus(self, count, achievements_queue):
        self._increment(ACH_INCOMING_ROBOTS, count, achievements_queue)

    def _built_soul_rippers(self, count, achievements_queue):
        self._increment(ACH_FLYING_DEATH, count, achievements_queue)

    def _built_megaliths(self, count, achievements_queue):
        self._increment(ACH_HOLY_CRAB, count, achievements_queue)

    def _built_transports(self, count, achievements_queue):
        self._increment(ACH_THE_TRANSPORTER, count, achievements_queue)

    def _built_sacus(self, count, achievements_queue):
        self._set_steps_at_least(ACH_WHO_NEEDS_SUPPORT, count, achievements_queue)

    def _built_asfs(self, count, achievements_queue):
        self._set_steps_at_least(ACH_WHAT_A_SWARM, count, achievements_queue)

    def _lowest_acu_health(self, health, survived, achievements_queue):
        if 0 < health < 500 and survived:
            self._unlock(ACH_THAT_WAS_CLOSE, achievements_queue)

    def _highscore(self, scored_highest, number_of_humans, achievements_queue):
        if scored_highest and number_of_humans >= 8:
            self._unlock(ACH_TOP_SCORE, achievements_queue)
            self._increment(ACH_UNBEATABLE, 1, achievements_queue)

    def _unlock(self, achievement_id, achievements_queue):
        self._achievement_service.unlock(achievement_id, achievements_queue)

    def _increment(self, achievement_id, steps, achievements_queue):
        self._achievement_service.increment(achievement_id, steps, achievements_queue)

    def _set_steps_at_least(self, achievement_id, steps, achievements_queue):
        self._achievement_service.set_steps_at_least(achievement_id, steps, achievements_queue)

    def _record_event(self, event_id, count, events_queue):
        self._event_service.record_event(event_id, count, events_queue)

All services should inherit from this class.

Services are singleton objects which manage some server task.

Ancestors

Methods

async def process_game_stats(self,
player: Player,
game: Game,
army_stats_list: list)
Expand source code
async def process_game_stats(
    self,
    player: Player,
    game: Game,
    army_stats_list: list
):
    try:
        await self._process_game_stats(player, game, army_stats_list)
    except KeyError as e:
        self._logger.info("Malformed game stats. KeyError: %s", e)
    except Exception:
        self._logger.exception(
            "Error processing game stats for %s in game %d",
            player.login,
            game.id
        )

Inherited members

class GeoIpService
Expand source code
@with_logger
class GeoIpService(Service):
    """
    Service for managing the GeoIp database. This includes an asyncio crontab
    which periodically checks if the current file is out of date. If it is, then
    the service will try to download a new file from tue url in `server.config`.

    Provides an interface for getting data out of the database.
    """

    def __init__(self):
        self.refresh_file_path()
        config.register_callback("GEO_IP_DATABASE_PATH", self.refresh_file_path)

        self.db = None
        self.db_update_time = None

    def refresh_file_path(self):
        self.file_path = config.GEO_IP_DATABASE_PATH

    async def initialize(self) -> None:
        self.check_geoip_db_file_updated()

        await self.check_update_geoip_db()
        # crontab: min hour day month day_of_week
        # Run every Wednesday because GeoLite2 is updated every first Tuesday
        # of the month.
        self._update_cron = aiocron.crontab(
            "0 0 * * 3", func=self.check_update_geoip_db
        )
        self._check_file_timer = Timer(
            60 * 10, self.check_geoip_db_file_updated, start=True
        )

    def check_geoip_db_file_updated(self):
        """
        Checks if the local database file has been updated by a server admin
        and loads it if it has.
        """
        if not os.path.isfile(self.file_path):
            return

        if self.db is None:
            # We haven't loaded the file before
            self.load_db()
        else:
            assert self.db_update_time is not None
            # We have loaded the file, so check if it has been updated

            date_modified = datetime.fromtimestamp(
                os.path.getmtime(self.file_path)
            )
            if date_modified > self.db_update_time:
                self.load_db()

    async def check_update_geoip_db(self) -> None:
        """
        Check if the geoip database is old and update it if so.
        """
        if not config.GEO_IP_LICENSE_KEY:
            self._logger.warning(
                "GEO_IP_LICENSE_KEY not set! Unable to download GeoIP database!"
            )
            return

        self._logger.debug("Checking if geoip database needs updating")
        try:
            date_modified = datetime.fromtimestamp(
                os.path.getmtime(self.file_path)
            )
            delta = datetime.now() - date_modified

            if delta.days > config.GEO_IP_DATABASE_MAX_AGE_DAYS:
                self._logger.info("Geoip database is out of date")
                await self.download_geoip_db()
        except FileNotFoundError:    # pragma: no cover
            self._logger.warning("Geoip database is missing...")
            await self.download_geoip_db()
        except asyncio.TimeoutError:    # pragma: no cover
            self._logger.warning(
                "Failed to download database file! "
                "Check the network connection and try again"
            )
        except Exception as e:    # pragma: no cover
            self._logger.exception(e)
            raise e

        self.load_db()

    async def download_geoip_db(self) -> None:
        """
        Download the geoip database to a file. If the downloaded file is not
        a valid gzip file, then it does NOT overwrite the old file.
        """
        assert config.GEO_IP_LICENSE_KEY is not None

        self._logger.info("Downloading new geoip database")

        # Download new file to a temp location
        with TemporaryFile() as temp_file:
            await self._download_file(
                config.GEO_IP_DATABASE_URL,
                config.GEO_IP_LICENSE_KEY,
                temp_file
            )
            temp_file.seek(0)

            # Unzip the archive and overwrite the old file
            try:
                with tarfile.open(fileobj=temp_file, mode="r:gz") as tar:
                    with open(self.file_path, "wb") as f_out:
                        f_in = extract_file(tar, "GeoLite2-Country.mmdb")
                        shutil.copyfileobj(f_in, f_out)
            except (tarfile.TarError) as e:    # pragma: no cover
                self._logger.warning("Failed to extract downloaded file!")
                raise e
        self._logger.info("New database download complete")

    async def _download_file(
        self,
        url: str,
        license_key: str,
        fileobj: IO[bytes]
    ) -> None:
        """
        Download a file using aiohttp and save it to a file.

        # Params
        - `url`: The url to download from
        - `file_path`: Path to save the file at
        """

        chunk_size = 1024
        params = {
            "edition_id": "GeoLite2-Country",
            "license_key": license_key,
            "suffix": "tar.gz"
        }

        async def get_checksum(session):
            async with session.get(url, params={
                **params,
                "suffix": params["suffix"] + ".md5"
            }, timeout=60 * 20) as resp:
                return await resp.text()

        async def get_db_file_with_checksum(session):
            hasher = hashlib.md5()
            async with session.get(url, params=params, timeout=60 * 20) as resp:
                while True:
                    chunk = await resp.content.read(chunk_size)
                    if not chunk:
                        break
                    fileobj.write(chunk)
                    hasher.update(chunk)

            return hasher.hexdigest()

        async with aiohttp.ClientSession(raise_for_status=True) as session:
            checksum, our_hash = await asyncio.gather(
                get_checksum(session),
                get_db_file_with_checksum(session)
            )

        if checksum != our_hash:
            raise Exception(
                f"Hashes did not match! Expected {checksum} got {our_hash}"
            )

    def load_db(self) -> None:
        """
        Loads the database into memory.
        """
        # Set the time first, if the file is corrupted we don't need to try
        # loading it again anyways
        self.db_update_time = datetime.now()

        try:
            new_db = maxminddb.open_database(self.file_path)
        except (InvalidDatabaseError, OSError, ValueError):
            self._logger.exception(
                "Failed to load maxmind db! Maybe the download was interrupted"
            )
        else:
            if self.db is not None:
                self.db.close()

            self.db = new_db
            self._logger.info(
                "File loaded successfully from %s", self.file_path
            )

    def country(self, address: str) -> str:
        """
        Look up an ip address in the db and return it's country code.
        """
        default_value = ""
        if self.db is None:
            return default_value

        entry = self.db.get(address)
        if entry is None:
            return default_value

        return str(entry.get("country", {}).get("iso_code", default_value))

    async def shutdown(self):
        if self.db is not None:
            self.db.close()

Service for managing the GeoIp database. This includes an asyncio crontab which periodically checks if the current file is out of date. If it is, then the service will try to download a new file from tue url in server.config.

Provides an interface for getting data out of the database.

Ancestors

Methods

def check_geoip_db_file_updated(self)
Expand source code
def check_geoip_db_file_updated(self):
    """
    Checks if the local database file has been updated by a server admin
    and loads it if it has.
    """
    if not os.path.isfile(self.file_path):
        return

    if self.db is None:
        # We haven't loaded the file before
        self.load_db()
    else:
        assert self.db_update_time is not None
        # We have loaded the file, so check if it has been updated

        date_modified = datetime.fromtimestamp(
            os.path.getmtime(self.file_path)
        )
        if date_modified > self.db_update_time:
            self.load_db()

Checks if the local database file has been updated by a server admin and loads it if it has.

async def check_update_geoip_db(self) ‑> None
Expand source code
async def check_update_geoip_db(self) -> None:
    """
    Check if the geoip database is old and update it if so.
    """
    if not config.GEO_IP_LICENSE_KEY:
        self._logger.warning(
            "GEO_IP_LICENSE_KEY not set! Unable to download GeoIP database!"
        )
        return

    self._logger.debug("Checking if geoip database needs updating")
    try:
        date_modified = datetime.fromtimestamp(
            os.path.getmtime(self.file_path)
        )
        delta = datetime.now() - date_modified

        if delta.days > config.GEO_IP_DATABASE_MAX_AGE_DAYS:
            self._logger.info("Geoip database is out of date")
            await self.download_geoip_db()
    except FileNotFoundError:    # pragma: no cover
        self._logger.warning("Geoip database is missing...")
        await self.download_geoip_db()
    except asyncio.TimeoutError:    # pragma: no cover
        self._logger.warning(
            "Failed to download database file! "
            "Check the network connection and try again"
        )
    except Exception as e:    # pragma: no cover
        self._logger.exception(e)
        raise e

    self.load_db()

Check if the geoip database is old and update it if so.

def country(self, address: str) ‑> str
Expand source code
def country(self, address: str) -> str:
    """
    Look up an ip address in the db and return it's country code.
    """
    default_value = ""
    if self.db is None:
        return default_value

    entry = self.db.get(address)
    if entry is None:
        return default_value

    return str(entry.get("country", {}).get("iso_code", default_value))

Look up an ip address in the db and return it's country code.

async def download_geoip_db(self) ‑> None
Expand source code
async def download_geoip_db(self) -> None:
    """
    Download the geoip database to a file. If the downloaded file is not
    a valid gzip file, then it does NOT overwrite the old file.
    """
    assert config.GEO_IP_LICENSE_KEY is not None

    self._logger.info("Downloading new geoip database")

    # Download new file to a temp location
    with TemporaryFile() as temp_file:
        await self._download_file(
            config.GEO_IP_DATABASE_URL,
            config.GEO_IP_LICENSE_KEY,
            temp_file
        )
        temp_file.seek(0)

        # Unzip the archive and overwrite the old file
        try:
            with tarfile.open(fileobj=temp_file, mode="r:gz") as tar:
                with open(self.file_path, "wb") as f_out:
                    f_in = extract_file(tar, "GeoLite2-Country.mmdb")
                    shutil.copyfileobj(f_in, f_out)
        except (tarfile.TarError) as e:    # pragma: no cover
            self._logger.warning("Failed to extract downloaded file!")
            raise e
    self._logger.info("New database download complete")

Download the geoip database to a file. If the downloaded file is not a valid gzip file, then it does NOT overwrite the old file.

def load_db(self) ‑> None
Expand source code
def load_db(self) -> None:
    """
    Loads the database into memory.
    """
    # Set the time first, if the file is corrupted we don't need to try
    # loading it again anyways
    self.db_update_time = datetime.now()

    try:
        new_db = maxminddb.open_database(self.file_path)
    except (InvalidDatabaseError, OSError, ValueError):
        self._logger.exception(
            "Failed to load maxmind db! Maybe the download was interrupted"
        )
    else:
        if self.db is not None:
            self.db.close()

        self.db = new_db
        self._logger.info(
            "File loaded successfully from %s", self.file_path
        )

Loads the database into memory.

def refresh_file_path(self)
Expand source code
def refresh_file_path(self):
    self.file_path = config.GEO_IP_DATABASE_PATH

Inherited members

class LadderService (database: FAFDatabase,
game_service: GameService,
violation_service: ViolationService)
Expand source code
@with_logger
class LadderService(Service):
    """
    Service responsible for managing the automatches. Does matchmaking, updates
    statistics, and launches the games.
    """

    def __init__(
        self,
        database: FAFDatabase,
        game_service: GameService,
        violation_service: ViolationService,
    ):
        self._db = database
        self._informed_players: set[Player] = set()
        self.game_service = game_service
        self.queues = {}
        self.violation_service = violation_service

        self._searches: dict[Player, dict[str, Search]] = defaultdict(dict)
        self._allow_new_searches = True

    async def initialize(self) -> None:
        await self.update_data()
        self._update_cron = aiocron.crontab("*/10 * * * *", func=self.update_data)

    async def update_data(self) -> None:
        async with self._db.acquire() as conn:
            map_pool_maps = await self.fetch_map_pools(conn)
            db_queues = await self.fetch_matchmaker_queues(conn)

        for name, info in db_queues.items():
            if name not in self.queues:
                queue = MatchmakerQueue(
                    self.game_service,
                    self.on_match_found,
                    name=name,
                    queue_id=info["id"],
                    featured_mod=info["mod"],
                    rating_type=info["rating_type"],
                    team_size=info["team_size"],
                    params=info.get("params")
                )
                self.queues[name] = queue
                queue.initialize()
            else:
                queue = self.queues[name]
                queue.featured_mod = info["mod"]
                queue.rating_type = info["rating_type"]
                queue.team_size = info["team_size"]
                queue.rating_peak = await self.fetch_rating_peak(info["rating_type"])
            queue.map_pools.clear()
            for map_pool_id, min_rating, max_rating in info["map_pools"]:
                map_pool_name, map_list = map_pool_maps[map_pool_id]
                if not map_list:
                    self._logger.warning(
                        "Map pool '%s' is empty! Some %s games will "
                        "likely fail to start!",
                        map_pool_name,
                        name
                    )
                queue.add_map_pool(
                    MapPool(map_pool_id, map_pool_name, map_list),
                    min_rating,
                    max_rating
                )
        # Remove queues that don't exist anymore
        for queue_name in list(self.queues.keys()):
            if queue_name not in db_queues:
                self.queues[queue_name].shutdown()
                del self.queues[queue_name]

    async def fetch_map_pools(self, conn) -> dict[int, tuple[str, list[Map]]]:
        result = await conn.execute(
            select(
                map_pool.c.id,
                map_pool.c.name,
                map_pool_map_version.c.weight,
                map_pool_map_version.c.map_params,
                map_version.c.id.label("map_id"),
                map_version.c.filename,
                map_version.c.ranked,
            ).select_from(
                map_pool.outerjoin(map_pool_map_version)
                .outerjoin(map_version)
            )
        )
        map_pool_maps = {}
        for row in result:
            id_ = row.id
            name = row.name
            if id_ not in map_pool_maps:
                map_pool_maps[id_] = (name, list())
            _, map_list = map_pool_maps[id_]
            if row.map_id is not None:
                # Database filenames contain the maps/ prefix and .zip suffix.
                # This comes from the content server which hosts the files at
                # https://content.faforever.com/maps/name.zip
                folder_name = re.match(r"maps/(.+)\.zip", row.filename).group(1)
                map_list.append(
                    Map(
                        id=row.map_id,
                        folder_name=folder_name,
                        ranked=row.ranked,
                        weight=row.weight,
                    )
                )
            elif row.map_params is not None:
                try:
                    params = json.loads(row.map_params)
                    map_type = params["type"]
                    if map_type == "neroxis":
                        map_list.append(
                            NeroxisGeneratedMap.of(params, row.weight)
                        )
                    else:
                        self._logger.warning(
                            "Unsupported map type %s in pool %s",
                            map_type,
                            row.id
                        )

                except Exception:
                    self._logger.warning(
                        "Failed to load map in map pool %d. "
                        "Parameters are '%s'",
                        row.id,
                        row.map_params,
                        exc_info=True
                    )

        return map_pool_maps

    async def fetch_matchmaker_queues(self, conn):
        result = await conn.execute(
            select(
                matchmaker_queue.c.id,
                matchmaker_queue.c.technical_name,
                matchmaker_queue.c.team_size,
                matchmaker_queue.c.params,
                matchmaker_queue_map_pool.c.map_pool_id,
                matchmaker_queue_map_pool.c.min_rating,
                matchmaker_queue_map_pool.c.max_rating,
                game_featuredMods.c.gamemod,
                leaderboard.c.technical_name.label("rating_type")
            )
            .select_from(
                matchmaker_queue
                .join(matchmaker_queue_map_pool)
                .join(game_featuredMods)
                .join(leaderboard)
            ).where(matchmaker_queue.c.enabled == true())
        )
        # So we don't log the same error multiple times when a queue has several
        # map pools
        errored = set()
        matchmaker_queues = defaultdict(lambda: defaultdict(list))
        for row in result:
            name = row.technical_name
            if name in errored:
                continue
            info = matchmaker_queues[name]
            try:
                info["id"] = row.id
                info["mod"] = row.gamemod
                info["rating_type"] = row.rating_type
                info["team_size"] = row.team_size
                info["params"] = json.loads(row.params) if row.params else None
                info["map_pools"].append((
                    row.map_pool_id,
                    row.min_rating,
                    row.max_rating
                ))
            except Exception:
                self._logger.warning(
                    "Unable to load queue '%s'!",
                    name,
                    exc_info=True
                )
                del matchmaker_queues[name]
                errored.add(name)
        return matchmaker_queues

    async def fetch_rating_peak(self, rating_type):
        async with self._db.acquire() as conn:
            result = await conn.execute(
                select(
                    leaderboard_rating_journal.c.rating_mean_before,
                    leaderboard_rating_journal.c.rating_deviation_before
                )
                .select_from(leaderboard_rating_journal.join(leaderboard))
                .where(leaderboard.c.technical_name == rating_type)
                .order_by(leaderboard_rating_journal.c.id.desc())
                .limit(1000)
            )
            rows = result.fetchall()
            rowcount = len(rows)

            rating_peak = 1000.0
            if rowcount > 0:
                rating_peak = statistics.mean(
                    row.rating_mean_before - 3 * row.rating_deviation_before for row in rows
                )
            metrics.leaderboard_rating_peak.labels(rating_type).set(rating_peak)

            if rowcount < 100:
                self._logger.warning(
                    "Could only fetch %s ratings for %s queue.",
                    rowcount,
                    rating_type
                )

            if rating_peak < 600 or rating_peak > 1200:
                self._logger.warning(
                    "Estimated rating peak for %s is %s. This could lead to issues with matchmaking.",
                    rating_type,
                    rating_peak
                )
            else:
                self._logger.info(
                    "Estimated rating peak for %s is %s.",
                    rating_type,
                    rating_peak
                )

            return rating_peak

    def start_search(
        self,
        players: list[Player],
        queue_name: str,
        on_matched: OnMatchedCallback = lambda _1, _2: None
    ):
        if not self._allow_new_searches:
            raise DisabledError()

        timeouts = self.violation_service.get_violations(players)
        if timeouts:
            self._logger.debug("timeouts: %s", timeouts)
            times = [
                {
                    "player": p.id,
                    "expires_at": violation.get_ban_expiration().isoformat()
                }
                for p, violation in timeouts.items()
            ]
            for player in players:
                player.write_message({
                    "command": "search_timeout",
                    "timeouts": times
                })
                # TODO: Do we need this or is `search_timeout` enough?
                player.write_message({
                    "command": "search_info",
                    "queue_name": queue_name,
                    "state": "stop"
                })
                # For compatibility with clients that don't understand
                # `search_timeout` only. This may be removed at any time.
                if len(times) == 1:
                    s = ""
                    are = "is"
                else:
                    s = "s"
                    are = "are"
                names = ", ".join(p.login for p in timeouts)
                max_time = humanize.naturaldelta(
                    max(
                        timeouts.values(),
                        key=lambda v: v.get_ban_expiration()
                    ).get_remaining()
                )
                player.write_message({
                    "command": "notice",
                    "style": "info",
                    "text": f"Player{s} {names} {are} timed out for {max_time}"
                })
            return
        # Cancel any existing searches that players have for this queue
        for player in players:
            if queue_name in self._searches[player]:
                self._cancel_search(player, queue_name)

        queue = self.queues[queue_name]
        search = Search(
            players,
            rating_type=queue.rating_type,
            on_matched=on_matched
        )

        for player in players:
            player.state = PlayerState.SEARCHING_LADDER

            self.write_rating_progress(player, queue.rating_type)

            player.write_message({
                "command": "search_info",
                "queue_name": queue_name,
                "state": "start"
            })

            self._searches[player][queue_name] = search

        self._logger.info("%s started searching for %s", search, queue_name)

        asyncio.create_task(queue.search(search))

    def cancel_search(
        self,
        initiator: Player,
        queue_name: Optional[str] = None
    ) -> None:
        if queue_name is None:
            queue_names = list(self._searches[initiator].keys())
        else:
            queue_names = [queue_name]

        for queue_name in queue_names:
            self._cancel_search(initiator, queue_name)

    def _cancel_search(self, initiator: Player, queue_name: str) -> None:
        """
        Cancel search for a specific player/queue.
        """
        cancelled_search = self._clear_search(initiator, queue_name)
        if cancelled_search is None:
            self._logger.debug(
                "Ignoring request to cancel a search that does not exist: "
                "%s, %s",
                initiator,
                queue_name
            )
            return
        cancelled_search.cancel()

        for player in cancelled_search.players:
            player.write_message({
                "command": "search_info",
                "queue_name": queue_name,
                "state": "stop"
            })
            if (
                not self._searches[player]
                and player.state == PlayerState.SEARCHING_LADDER
            ):
                player.state = PlayerState.IDLE
        self._logger.info(
            "%s stopped searching for %s", cancelled_search, queue_name
        )

    def _clear_search(
        self,
        initiator: Player,
        queue_name: str
    ) -> Optional[Search]:
        """
        Remove a search from the searches dictionary.

        Does NOT cancel the search.
        """
        search = self._searches[initiator].get(queue_name)

        if search is not None:
            for player in search.players:
                del self._searches[player][queue_name]

        return search

    def write_rating_progress(self, player: Player, rating_type: str) -> None:
        if player not in self._informed_players:
            self._informed_players.add(player)
            _, deviation = player.ratings[rating_type]

            if deviation > 490:
                player.write_message({
                    "command": "notice",
                    "style": "info",
                    "text": (
                        "<i>Welcome to the matchmaker</i><br><br><b>The "
                        "matchmaking system needs to calibrate your skill level; "
                        "your first few games may be more imbalanced as the "
                        "system attempts to learn your capability as a player."
                        "</b><br><b>"
                        "Afterwards, you'll be more reliably matched up with "
                        "people of your skill level: so don't worry if your "
                        "first few games are uneven. This will improve as you "
                        "play!</b>"
                    )
                })

    def on_match_found(
        self,
        s1: Search,
        s2: Search,
        queue: MatchmakerQueue
    ) -> None:
        """
        Callback for when a match is generated by a matchmaker queue.

        NOTE: This function is called while the matchmaker search lock is held,
        so it should only perform fast operations.
        """
        try:
            msg = {"command": "match_found", "queue_name": queue.name}

            for player in s1.players + s2.players:
                player.state = PlayerState.STARTING_AUTOMATCH
                player.write_message(msg)

                # Cancel any other searches
                queue_names = list(
                    name for name in self._searches[player].keys()
                    if name != queue.name
                )
                for queue_name in queue_names:
                    self._cancel_search(player, queue_name)

                self._clear_search(player, queue.name)

            asyncio.create_task(self.start_game(s1.players, s2.players, queue))
        except Exception:
            self._logger.exception(
                "Error processing match between searches %s, and %s",
                s1, s2
            )

    def start_game(
        self,
        team1: list[Player],
        team2: list[Player],
        queue: MatchmakerQueue
    ) -> Awaitable[None]:
        # We want assertion errors to trigger when the caller attempts to
        # create the async function, not when the function starts executing.
        assert len(team1) == len(team2)

        return self._start_game(team1, team2, queue)

    async def _start_game(
        self,
        team1: list[Player],
        team2: list[Player],
        queue: MatchmakerQueue
    ) -> None:
        self._logger.debug(
            "Starting %s game between %s and %s",
            queue.name,
            [p.login for p in team1],
            [p.login for p in team2]
        )
        game = None
        try:
            host = team1[0]
            all_players = team1 + team2
            all_guests = all_players[1:]

            played_map_ids = await self.get_game_history(
                all_players,
                queue.id,
                limit=config.LADDER_ANTI_REPETITION_LIMIT
            )

            def get_displayed_rating(player: Player) -> float:
                return player.ratings[queue.rating_type].displayed()

            ratings = (get_displayed_rating(player) for player in all_players)
            func = MAP_POOL_RATING_SELECTION_FUNCTIONS.get(
                config.MAP_POOL_RATING_SELECTION,
                statistics.mean
            )
            rating = func(ratings)

            pool = queue.get_map_pool_for_rating(rating)
            if not pool:
                raise RuntimeError(f"No map pool available for rating {rating}!")
            game_map = pool.choose_map(played_map_ids)

            game = self.game_service.create_game(
                game_class=LadderGame,
                game_mode=queue.featured_mod,
                host=host,
                name="Matchmaker Game",
                map=game_map,
                matchmaker_queue_id=queue.id,
                rating_type=queue.rating_type,
                max_players=len(all_players),
            )
            game.init_mode = InitMode.AUTO_LOBBY
            game.set_name_unchecked(game_name(team1, team2))

            team1 = sorted(team1, key=get_displayed_rating)
            team2 = sorted(team2, key=get_displayed_rating)

            # Shuffle the teams such that direct opponents remain the same
            zipped_teams = list(zip(team1, team2))
            random.shuffle(zipped_teams)

            for i, player in enumerate(
                player for pair in zipped_teams for player in pair
            ):
                # FA uses lua and lua arrays are 1-indexed
                slot = i + 1
                # 2 if even, 3 if odd
                team = (i % 2) + 2
                player.game = game

                # Set player options without triggering the logic for
                # determining that players have actually connected to the game.
                game._player_options[player.id]["Faction"] = player.faction.value
                game._player_options[player.id]["Team"] = team
                game._player_options[player.id]["StartSpot"] = slot
                game._player_options[player.id]["Army"] = slot
                game._player_options[player.id]["Color"] = slot

            game_options = queue.get_game_options()
            if game_options:
                game.game_options.update(game_options)

            self._logger.debug("Starting ladder game: %s", game)

            def make_game_options(player: Player) -> GameLaunchOptions:
                return GameLaunchOptions(
                    mapname=game_map.folder_name,
                    expected_players=len(all_players),
                    game_options=game_options,
                    team=game.get_player_option(player.id, "Team"),
                    faction=game.get_player_option(player.id, "Faction"),
                    map_position=game.get_player_option(player.id, "StartSpot")
                )

            await self.launch_match(game, host, all_guests, make_game_options)
            self._logger.debug("Ladder game launched successfully %s", game)
            metrics.matches.labels(queue.name, MatchLaunch.SUCCESSFUL).inc()
        except Exception as e:
            abandoning_players = []
            if isinstance(e, NotConnectedError):
                self._logger.info(
                    "Ladder game failed to start! %s setup timed out",
                    game
                )
                metrics.matches.labels(queue.name, MatchLaunch.TIMED_OUT).inc()
                abandoning_players = e.players
            elif isinstance(e, GameClosedError):
                self._logger.info(
                    "Ladder game %s failed to start! "
                    "Player %s closed their game instance",
                    game, e.player
                )
                metrics.matches.labels(queue.name, MatchLaunch.ABORTED_BY_PLAYER).inc()
                abandoning_players = [e.player]
            else:
                # All timeout errors should be transformed by the match starter.
                assert not isinstance(e, asyncio.TimeoutError)

                self._logger.exception("Ladder game failed to start %s", game)
                metrics.matches.labels(queue.name, MatchLaunch.ERRORED).inc()

            if game:
                await game.on_game_finish()

            game_id = game.id if game else None
            msg = {"command": "match_cancelled", "game_id": game_id}
            for player in all_players:
                player.write_message(msg)

            if abandoning_players:
                self._logger.info(
                    "Players failed to connect: %s",
                    abandoning_players
                )
                self.violation_service.register_violations(abandoning_players)

    async def launch_match(
        self,
        game: LadderGame,
        host: Player,
        guests: list[Player],
        make_game_options: Callable[[Player], GameLaunchOptions]
    ):
        # Launch the host
        if host.lobby_connection is None:
            raise NotConnectedError([host])

        host.lobby_connection.write_launch_game(
            game,
            is_host=True,
            options=make_game_options(host)
        )

        try:
            await game.wait_hosted(60)
        except asyncio.TimeoutError:
            raise NotConnectedError([host])
        finally:
            # TODO: Once the client supports `match_cancelled`, don't
            # send `launch_game` to the client if the host timed out. Until
            # then, failing to send `launch_game` will cause the client to
            # think it is searching for ladder, even though the server has
            # already removed it from the queue.

            # Launch the guests
            not_connected_guests = [
                player for player in guests
                if player.lobby_connection is None
            ]
            if not_connected_guests:
                raise NotConnectedError(not_connected_guests)

            for guest in guests:
                assert guest.lobby_connection is not None

                guest.lobby_connection.write_launch_game(
                    game,
                    is_host=False,
                    options=make_game_options(guest)
                )
        try:
            await game.wait_launched(60 + 10 * len(guests))
        except asyncio.TimeoutError:
            connected_players = game.get_connected_players()
            raise NotConnectedError([
                player for player in guests
                if player not in connected_players
            ])

    async def get_game_history(
        self,
        players: list[Player],
        queue_id: int,
        limit: int = 3
    ) -> list[int]:
        async with self._db.acquire() as conn:
            result = []
            for player in players:
                query = select(
                    game_stats.c.mapId,
                ).select_from(
                    game_player_stats
                    .join(game_stats)
                    .join(matchmaker_queue_game)
                ).where(
                    and_(
                        game_player_stats.c.playerId == player.id,
                        game_stats.c.startTime >= func.DATE_SUB(
                            func.now(),
                            text("interval 1 day")
                        ),
                        matchmaker_queue_game.c.matchmaker_queue_id == queue_id
                    )
                ).order_by(
                    game_stats.c.startTime.desc(),
                    # Timestamps only have second resolution, so for this to
                    # work correctly in the unit tests we also need id
                    game_stats.c.id.desc()
                ).limit(limit)

                result.extend([
                    row.mapId for row in await conn.execute(query)
                ])
        return result

    def on_connection_lost(self, conn: "LobbyConnection") -> None:
        if not conn.player:
            return

        player = conn.player
        self.cancel_search(player)
        del self._searches[player]
        if player in self._informed_players:
            self._informed_players.remove(player)

    async def graceful_shutdown(self):
        self._allow_new_searches = False

        for queue in self.queues.values():
            queue.shutdown()

        for player, searches in self._searches.items():
            for queue_name in list(searches.keys()):
                self._cancel_search(player, queue_name)

Service responsible for managing the automatches. Does matchmaking, updates statistics, and launches the games.

Ancestors

Methods

Expand source code
def cancel_search(
    self,
    initiator: Player,
    queue_name: Optional[str] = None
) -> None:
    if queue_name is None:
        queue_names = list(self._searches[initiator].keys())
    else:
        queue_names = [queue_name]

    for queue_name in queue_names:
        self._cancel_search(initiator, queue_name)
async def fetch_map_pools(self, conn) ‑> dict[int, tuple[str, list[Map]]]
Expand source code
async def fetch_map_pools(self, conn) -> dict[int, tuple[str, list[Map]]]:
    result = await conn.execute(
        select(
            map_pool.c.id,
            map_pool.c.name,
            map_pool_map_version.c.weight,
            map_pool_map_version.c.map_params,
            map_version.c.id.label("map_id"),
            map_version.c.filename,
            map_version.c.ranked,
        ).select_from(
            map_pool.outerjoin(map_pool_map_version)
            .outerjoin(map_version)
        )
    )
    map_pool_maps = {}
    for row in result:
        id_ = row.id
        name = row.name
        if id_ not in map_pool_maps:
            map_pool_maps[id_] = (name, list())
        _, map_list = map_pool_maps[id_]
        if row.map_id is not None:
            # Database filenames contain the maps/ prefix and .zip suffix.
            # This comes from the content server which hosts the files at
            # https://content.faforever.com/maps/name.zip
            folder_name = re.match(r"maps/(.+)\.zip", row.filename).group(1)
            map_list.append(
                Map(
                    id=row.map_id,
                    folder_name=folder_name,
                    ranked=row.ranked,
                    weight=row.weight,
                )
            )
        elif row.map_params is not None:
            try:
                params = json.loads(row.map_params)
                map_type = params["type"]
                if map_type == "neroxis":
                    map_list.append(
                        NeroxisGeneratedMap.of(params, row.weight)
                    )
                else:
                    self._logger.warning(
                        "Unsupported map type %s in pool %s",
                        map_type,
                        row.id
                    )

            except Exception:
                self._logger.warning(
                    "Failed to load map in map pool %d. "
                    "Parameters are '%s'",
                    row.id,
                    row.map_params,
                    exc_info=True
                )

    return map_pool_maps
async def fetch_matchmaker_queues(self, conn)
Expand source code
async def fetch_matchmaker_queues(self, conn):
    result = await conn.execute(
        select(
            matchmaker_queue.c.id,
            matchmaker_queue.c.technical_name,
            matchmaker_queue.c.team_size,
            matchmaker_queue.c.params,
            matchmaker_queue_map_pool.c.map_pool_id,
            matchmaker_queue_map_pool.c.min_rating,
            matchmaker_queue_map_pool.c.max_rating,
            game_featuredMods.c.gamemod,
            leaderboard.c.technical_name.label("rating_type")
        )
        .select_from(
            matchmaker_queue
            .join(matchmaker_queue_map_pool)
            .join(game_featuredMods)
            .join(leaderboard)
        ).where(matchmaker_queue.c.enabled == true())
    )
    # So we don't log the same error multiple times when a queue has several
    # map pools
    errored = set()
    matchmaker_queues = defaultdict(lambda: defaultdict(list))
    for row in result:
        name = row.technical_name
        if name in errored:
            continue
        info = matchmaker_queues[name]
        try:
            info["id"] = row.id
            info["mod"] = row.gamemod
            info["rating_type"] = row.rating_type
            info["team_size"] = row.team_size
            info["params"] = json.loads(row.params) if row.params else None
            info["map_pools"].append((
                row.map_pool_id,
                row.min_rating,
                row.max_rating
            ))
        except Exception:
            self._logger.warning(
                "Unable to load queue '%s'!",
                name,
                exc_info=True
            )
            del matchmaker_queues[name]
            errored.add(name)
    return matchmaker_queues
async def fetch_rating_peak(self, rating_type)
Expand source code
async def fetch_rating_peak(self, rating_type):
    async with self._db.acquire() as conn:
        result = await conn.execute(
            select(
                leaderboard_rating_journal.c.rating_mean_before,
                leaderboard_rating_journal.c.rating_deviation_before
            )
            .select_from(leaderboard_rating_journal.join(leaderboard))
            .where(leaderboard.c.technical_name == rating_type)
            .order_by(leaderboard_rating_journal.c.id.desc())
            .limit(1000)
        )
        rows = result.fetchall()
        rowcount = len(rows)

        rating_peak = 1000.0
        if rowcount > 0:
            rating_peak = statistics.mean(
                row.rating_mean_before - 3 * row.rating_deviation_before for row in rows
            )
        metrics.leaderboard_rating_peak.labels(rating_type).set(rating_peak)

        if rowcount < 100:
            self._logger.warning(
                "Could only fetch %s ratings for %s queue.",
                rowcount,
                rating_type
            )

        if rating_peak < 600 or rating_peak > 1200:
            self._logger.warning(
                "Estimated rating peak for %s is %s. This could lead to issues with matchmaking.",
                rating_type,
                rating_peak
            )
        else:
            self._logger.info(
                "Estimated rating peak for %s is %s.",
                rating_type,
                rating_peak
            )

        return rating_peak
async def get_game_history(self,
players: list[Player],
queue_id: int,
limit: int = 3) ‑> list[int]
Expand source code
async def get_game_history(
    self,
    players: list[Player],
    queue_id: int,
    limit: int = 3
) -> list[int]:
    async with self._db.acquire() as conn:
        result = []
        for player in players:
            query = select(
                game_stats.c.mapId,
            ).select_from(
                game_player_stats
                .join(game_stats)
                .join(matchmaker_queue_game)
            ).where(
                and_(
                    game_player_stats.c.playerId == player.id,
                    game_stats.c.startTime >= func.DATE_SUB(
                        func.now(),
                        text("interval 1 day")
                    ),
                    matchmaker_queue_game.c.matchmaker_queue_id == queue_id
                )
            ).order_by(
                game_stats.c.startTime.desc(),
                # Timestamps only have second resolution, so for this to
                # work correctly in the unit tests we also need id
                game_stats.c.id.desc()
            ).limit(limit)

            result.extend([
                row.mapId for row in await conn.execute(query)
            ])
    return result
async def launch_match(self,
game: LadderGame,
host: Player,
guests: list[Player],
make_game_options: Callable[[Player], GameLaunchOptions])
Expand source code
async def launch_match(
    self,
    game: LadderGame,
    host: Player,
    guests: list[Player],
    make_game_options: Callable[[Player], GameLaunchOptions]
):
    # Launch the host
    if host.lobby_connection is None:
        raise NotConnectedError([host])

    host.lobby_connection.write_launch_game(
        game,
        is_host=True,
        options=make_game_options(host)
    )

    try:
        await game.wait_hosted(60)
    except asyncio.TimeoutError:
        raise NotConnectedError([host])
    finally:
        # TODO: Once the client supports `match_cancelled`, don't
        # send `launch_game` to the client if the host timed out. Until
        # then, failing to send `launch_game` will cause the client to
        # think it is searching for ladder, even though the server has
        # already removed it from the queue.

        # Launch the guests
        not_connected_guests = [
            player for player in guests
            if player.lobby_connection is None
        ]
        if not_connected_guests:
            raise NotConnectedError(not_connected_guests)

        for guest in guests:
            assert guest.lobby_connection is not None

            guest.lobby_connection.write_launch_game(
                game,
                is_host=False,
                options=make_game_options(guest)
            )
    try:
        await game.wait_launched(60 + 10 * len(guests))
    except asyncio.TimeoutError:
        connected_players = game.get_connected_players()
        raise NotConnectedError([
            player for player in guests
            if player not in connected_players
        ])
def on_match_found(self,
s1: Search,
s2: Search,
queue: MatchmakerQueue) ‑> None
Expand source code
def on_match_found(
    self,
    s1: Search,
    s2: Search,
    queue: MatchmakerQueue
) -> None:
    """
    Callback for when a match is generated by a matchmaker queue.

    NOTE: This function is called while the matchmaker search lock is held,
    so it should only perform fast operations.
    """
    try:
        msg = {"command": "match_found", "queue_name": queue.name}

        for player in s1.players + s2.players:
            player.state = PlayerState.STARTING_AUTOMATCH
            player.write_message(msg)

            # Cancel any other searches
            queue_names = list(
                name for name in self._searches[player].keys()
                if name != queue.name
            )
            for queue_name in queue_names:
                self._cancel_search(player, queue_name)

            self._clear_search(player, queue.name)

        asyncio.create_task(self.start_game(s1.players, s2.players, queue))
    except Exception:
        self._logger.exception(
            "Error processing match between searches %s, and %s",
            s1, s2
        )

Callback for when a match is generated by a matchmaker queue.

NOTE: This function is called while the matchmaker search lock is held, so it should only perform fast operations.

def start_game(self,
team1: list[Player],
team2: list[Player],
queue: MatchmakerQueue) ‑> Awaitable[None]
Expand source code
def start_game(
    self,
    team1: list[Player],
    team2: list[Player],
    queue: MatchmakerQueue
) -> Awaitable[None]:
    # We want assertion errors to trigger when the caller attempts to
    # create the async function, not when the function starts executing.
    assert len(team1) == len(team2)

    return self._start_game(team1, team2, queue)
Expand source code
def start_search(
    self,
    players: list[Player],
    queue_name: str,
    on_matched: OnMatchedCallback = lambda _1, _2: None
):
    if not self._allow_new_searches:
        raise DisabledError()

    timeouts = self.violation_service.get_violations(players)
    if timeouts:
        self._logger.debug("timeouts: %s", timeouts)
        times = [
            {
                "player": p.id,
                "expires_at": violation.get_ban_expiration().isoformat()
            }
            for p, violation in timeouts.items()
        ]
        for player in players:
            player.write_message({
                "command": "search_timeout",
                "timeouts": times
            })
            # TODO: Do we need this or is `search_timeout` enough?
            player.write_message({
                "command": "search_info",
                "queue_name": queue_name,
                "state": "stop"
            })
            # For compatibility with clients that don't understand
            # `search_timeout` only. This may be removed at any time.
            if len(times) == 1:
                s = ""
                are = "is"
            else:
                s = "s"
                are = "are"
            names = ", ".join(p.login for p in timeouts)
            max_time = humanize.naturaldelta(
                max(
                    timeouts.values(),
                    key=lambda v: v.get_ban_expiration()
                ).get_remaining()
            )
            player.write_message({
                "command": "notice",
                "style": "info",
                "text": f"Player{s} {names} {are} timed out for {max_time}"
            })
        return
    # Cancel any existing searches that players have for this queue
    for player in players:
        if queue_name in self._searches[player]:
            self._cancel_search(player, queue_name)

    queue = self.queues[queue_name]
    search = Search(
        players,
        rating_type=queue.rating_type,
        on_matched=on_matched
    )

    for player in players:
        player.state = PlayerState.SEARCHING_LADDER

        self.write_rating_progress(player, queue.rating_type)

        player.write_message({
            "command": "search_info",
            "queue_name": queue_name,
            "state": "start"
        })

        self._searches[player][queue_name] = search

    self._logger.info("%s started searching for %s", search, queue_name)

    asyncio.create_task(queue.search(search))
async def update_data(self) ‑> None
Expand source code
async def update_data(self) -> None:
    async with self._db.acquire() as conn:
        map_pool_maps = await self.fetch_map_pools(conn)
        db_queues = await self.fetch_matchmaker_queues(conn)

    for name, info in db_queues.items():
        if name not in self.queues:
            queue = MatchmakerQueue(
                self.game_service,
                self.on_match_found,
                name=name,
                queue_id=info["id"],
                featured_mod=info["mod"],
                rating_type=info["rating_type"],
                team_size=info["team_size"],
                params=info.get("params")
            )
            self.queues[name] = queue
            queue.initialize()
        else:
            queue = self.queues[name]
            queue.featured_mod = info["mod"]
            queue.rating_type = info["rating_type"]
            queue.team_size = info["team_size"]
            queue.rating_peak = await self.fetch_rating_peak(info["rating_type"])
        queue.map_pools.clear()
        for map_pool_id, min_rating, max_rating in info["map_pools"]:
            map_pool_name, map_list = map_pool_maps[map_pool_id]
            if not map_list:
                self._logger.warning(
                    "Map pool '%s' is empty! Some %s games will "
                    "likely fail to start!",
                    map_pool_name,
                    name
                )
            queue.add_map_pool(
                MapPool(map_pool_id, map_pool_name, map_list),
                min_rating,
                max_rating
            )
    # Remove queues that don't exist anymore
    for queue_name in list(self.queues.keys()):
        if queue_name not in db_queues:
            self.queues[queue_name].shutdown()
            del self.queues[queue_name]
def write_rating_progress(self,
player: Player,
rating_type: str) ‑> None
Expand source code
def write_rating_progress(self, player: Player, rating_type: str) -> None:
    if player not in self._informed_players:
        self._informed_players.add(player)
        _, deviation = player.ratings[rating_type]

        if deviation > 490:
            player.write_message({
                "command": "notice",
                "style": "info",
                "text": (
                    "<i>Welcome to the matchmaker</i><br><br><b>The "
                    "matchmaking system needs to calibrate your skill level; "
                    "your first few games may be more imbalanced as the "
                    "system attempts to learn your capability as a player."
                    "</b><br><b>"
                    "Afterwards, you'll be more reliably matched up with "
                    "people of your skill level: so don't worry if your "
                    "first few games are uneven. This will improve as you "
                    "play!</b>"
                )
            })

Inherited members

class MessageQueueService
Expand source code
@with_logger
class MessageQueueService(Service):
    """
    Service handling connection to the message queue
    and providing an interface to publish messages.
    """

    def __init__(self) -> None:
        self._connection = None
        self._channel = None
        self._exchanges = {}
        self._exchange_types = {}
        self._is_ready = False

        config.register_callback("MQ_USER", self.reconnect)
        config.register_callback("MQ_PASSWORD", self.reconnect)
        config.register_callback("MQ_VHOST", self.reconnect)
        config.register_callback("MQ_SERVER", self.reconnect)
        config.register_callback("MQ_PORT", self.reconnect)

    @synchronizedmethod("initialization_lock")
    async def initialize(self) -> None:
        if self._is_ready:
            return

        try:
            await self._connect()
        except ConnectionAttemptFailed:
            return
        self._is_ready = True

        await self._declare_exchange(config.MQ_EXCHANGE_NAME, ExchangeType.TOPIC)

    async def _connect(self) -> None:
        try:
            self._connection = await aio_pika.connect_robust(
                "amqp://{user}:{password}@{server}:{port}/{vhost}".format(
                    user=config.MQ_USER,
                    password=config.MQ_PASSWORD,
                    vhost=config.MQ_VHOST,
                    server=config.MQ_SERVER,
                    port=config.MQ_PORT,
                ),
                loop=asyncio.get_running_loop(),
            )
        except ConnectionError as e:
            self._logger.warning(
                "Unable to connect to RabbitMQ. Is it running?", exc_info=True
            )
            raise ConnectionAttemptFailed from e
        except ProbableAuthenticationError as e:
            self._logger.warning(
                "Unable to connect to RabbitMQ. Incorrect credentials?", exc_info=True
            )
            raise ConnectionAttemptFailed from e
        except Exception as e:
            self._logger.warning(
                "Unable to connect to RabbitMQ due to unhandled excpetion %s. Incorrect vhost?",
                e,
                exc_info=True,
            )
            raise ConnectionAttemptFailed from e

        self._channel = await self._connection.channel(publisher_confirms=False)
        self._logger.debug("Connected to RabbitMQ %r", self._connection)

    async def declare_exchange(
        self, exchange_name: str, exchange_type: ExchangeType = ExchangeType.TOPIC, durable: bool = True
    ) -> None:
        await self.initialize()
        if not self._is_ready:
            self._logger.warning(
                "Not connected to RabbitMQ, unable to declare exchange."
            )
            return

        await self._declare_exchange(exchange_name, exchange_type, durable)

    async def _declare_exchange(
        self, exchange_name: str, exchange_type: ExchangeType, durable: bool = True
    ) -> None:
        new_exchange = await self._channel.declare_exchange(
            exchange_name, exchange_type, durable
        )

        self._exchanges[exchange_name] = new_exchange
        self._exchange_types[exchange_name] = exchange_type

    @synchronizedmethod("initialization_lock")
    async def shutdown(self) -> None:
        self._is_ready = False
        await self._shutdown()

    async def _shutdown(self) -> None:
        if self._channel is not None:
            await self._channel.close()
            self._channel = None

        if self._connection is not None:
            await self._connection.close()
            self._connection = None

    async def publish(
        self,
        exchange_name: str,
        routing: str,
        payload: dict,
        mandatory: bool = False,
        delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
    ) -> None:
        await self.publish_many(
            exchange_name,
            routing,
            [payload],
            mandatory=mandatory,
            delivery_mode=delivery_mode
        )

    async def publish_many(
        self,
        exchange_name: str,
        routing: str,
        payloads: Iterable[dict],
        mandatory: bool = False,
        delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
    ) -> None:
        if not self._is_ready:
            self._logger.warning(
                "Not connected to RabbitMQ, unable to publish message."
            )
            return

        exchange = self._exchanges.get(exchange_name)
        if exchange is None:
            raise KeyError(f"Unknown exchange {exchange_name}.")

        async with self._channel.transaction():
            for payload in payloads:
                message = aio_pika.Message(
                    json.dumps(payload).encode(),
                    delivery_mode=delivery_mode
                )
                await exchange.publish(
                    message,
                    routing_key=routing,
                    mandatory=mandatory
                )
                self._logger.log(
                    TRACE,
                    "Published message %s to %s/%s",
                    payload,
                    exchange_name,
                    routing
                )

    @synchronizedmethod("initialization_lock")
    async def reconnect(self) -> None:
        self._is_ready = False
        await self._shutdown()

        try:
            await self._connect()
        except ConnectionAttemptFailed:
            return

        for exchange_name in list(self._exchanges.keys()):
            await self._declare_exchange(
                exchange_name, self._exchange_types[exchange_name]
            )
        self._is_ready = True

Service handling connection to the message queue and providing an interface to publish messages.

Ancestors

Methods

async def declare_exchange(self,
exchange_name: str,
exchange_type: aio_pika.abc.ExchangeType = ExchangeType.TOPIC,
durable: bool = True) ‑> None
Expand source code
async def declare_exchange(
    self, exchange_name: str, exchange_type: ExchangeType = ExchangeType.TOPIC, durable: bool = True
) -> None:
    await self.initialize()
    if not self._is_ready:
        self._logger.warning(
            "Not connected to RabbitMQ, unable to declare exchange."
        )
        return

    await self._declare_exchange(exchange_name, exchange_type, durable)
async def publish(self,
exchange_name: str,
routing: str,
payload: dict,
mandatory: bool = False,
delivery_mode: aio_pika.abc.DeliveryMode = DeliveryMode.PERSISTENT) ‑> None
Expand source code
async def publish(
    self,
    exchange_name: str,
    routing: str,
    payload: dict,
    mandatory: bool = False,
    delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
) -> None:
    await self.publish_many(
        exchange_name,
        routing,
        [payload],
        mandatory=mandatory,
        delivery_mode=delivery_mode
    )
async def publish_many(self,
exchange_name: str,
routing: str,
payloads: Iterable[dict],
mandatory: bool = False,
delivery_mode: aio_pika.abc.DeliveryMode = DeliveryMode.PERSISTENT) ‑> None
Expand source code
async def publish_many(
    self,
    exchange_name: str,
    routing: str,
    payloads: Iterable[dict],
    mandatory: bool = False,
    delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
) -> None:
    if not self._is_ready:
        self._logger.warning(
            "Not connected to RabbitMQ, unable to publish message."
        )
        return

    exchange = self._exchanges.get(exchange_name)
    if exchange is None:
        raise KeyError(f"Unknown exchange {exchange_name}.")

    async with self._channel.transaction():
        for payload in payloads:
            message = aio_pika.Message(
                json.dumps(payload).encode(),
                delivery_mode=delivery_mode
            )
            await exchange.publish(
                message,
                routing_key=routing,
                mandatory=mandatory
            )
            self._logger.log(
                TRACE,
                "Published message %s to %s/%s",
                payload,
                exchange_name,
                routing
            )
async def reconnect(self) ‑> None
Expand source code
@synchronizedmethod("initialization_lock")
async def reconnect(self) -> None:
    self._is_ready = False
    await self._shutdown()

    try:
        await self._connect()
    except ConnectionAttemptFailed:
        return

    for exchange_name in list(self._exchanges.keys()):
        await self._declare_exchange(
            exchange_name, self._exchange_types[exchange_name]
        )
    self._is_ready = True

Inherited members

class OAuthService
Expand source code
@with_logger
class OAuthService(Service, name="oauth_service"):
    """
    Service for managing the OAuth token logins and verification.
    """

    def __init__(self):
        self.public_keys = {}
        self._last_key_fetch_time = None

    async def initialize(self) -> None:
        await self.retrieve_public_keys()
        # crontab: min hour day month day_of_week
        # Run every 10 minutes to update public keys.
        self._update_cron = aiocron.crontab(
            "*/10 * * * *", func=self.retrieve_public_keys
        )

    @synchronizedmethod
    async def get_public_keys(self) -> dict:
        """
        Return cached keys, or fetch them if they're missing
        """
        if not self.public_keys:
            # Rate limit requests so we don't spam the endpoint when it's down
            if (
                not self._last_key_fetch_time or
                time.monotonic() - self._last_key_fetch_time > 5
            ):
                await self.retrieve_public_keys()

            if not self.public_keys:
                raise RuntimeError("jwks could not be retrieved")

        return self.public_keys

    async def retrieve_public_keys(self) -> None:
        """
        Get the latest jwks from the hydra endpoint
        """
        self._last_key_fetch_time = time.monotonic()
        try:
            async with aiohttp.ClientSession(raise_for_status=True) as session:
                async with session.get(config.HYDRA_JWKS_URI) as resp:
                    jwks = await resp.json()
                    self.public_keys = {
                        jwk["kid"]: RSAAlgorithm.from_jwk(jwk)
                        for jwk in jwks["keys"]
                    }
            self._logger.info("Got public keys from %s", config.HYDRA_JWKS_URI)
        except Exception:
            self._logger.exception(
                "Unable to retrieve jwks, token login will be unavailable!"
            )

    async def get_player_id_from_token(self, token: str) -> int:
        """
        Decode the JWT to get the player_id
        """
        # Ensures that if we're missing the jwks we will try to fetch them on
        # each new login request. This way our login functionality will be
        # restored as soon as possible
        keys = await self.get_public_keys()
        try:
            kid = jwt.get_unverified_header(token)["kid"]
            key = keys[kid]
            decoded = jwt.decode(
                token,
                key=key,
                algorithms="RS256",
                options={"verify_aud": False}
            )

            if "lobby" not in decoded["scp"]:
                raise AuthenticationError(
                    "Token does not have permission to login to the lobby server",
                    "token"
                )

            return int(decoded["sub"])
        except (InvalidTokenError, KeyError, ValueError):
            raise AuthenticationError("Token signature was invalid", "token")

Service for managing the OAuth token logins and verification.

Ancestors

Methods

async def get_player_id_from_token(self, token: str) ‑> int
Expand source code
async def get_player_id_from_token(self, token: str) -> int:
    """
    Decode the JWT to get the player_id
    """
    # Ensures that if we're missing the jwks we will try to fetch them on
    # each new login request. This way our login functionality will be
    # restored as soon as possible
    keys = await self.get_public_keys()
    try:
        kid = jwt.get_unverified_header(token)["kid"]
        key = keys[kid]
        decoded = jwt.decode(
            token,
            key=key,
            algorithms="RS256",
            options={"verify_aud": False}
        )

        if "lobby" not in decoded["scp"]:
            raise AuthenticationError(
                "Token does not have permission to login to the lobby server",
                "token"
            )

        return int(decoded["sub"])
    except (InvalidTokenError, KeyError, ValueError):
        raise AuthenticationError("Token signature was invalid", "token")

Decode the JWT to get the player_id

async def get_public_keys(self) ‑> dict
Expand source code
@synchronizedmethod
async def get_public_keys(self) -> dict:
    """
    Return cached keys, or fetch them if they're missing
    """
    if not self.public_keys:
        # Rate limit requests so we don't spam the endpoint when it's down
        if (
            not self._last_key_fetch_time or
            time.monotonic() - self._last_key_fetch_time > 5
        ):
            await self.retrieve_public_keys()

        if not self.public_keys:
            raise RuntimeError("jwks could not be retrieved")

    return self.public_keys

Return cached keys, or fetch them if they're missing

async def retrieve_public_keys(self) ‑> None
Expand source code
async def retrieve_public_keys(self) -> None:
    """
    Get the latest jwks from the hydra endpoint
    """
    self._last_key_fetch_time = time.monotonic()
    try:
        async with aiohttp.ClientSession(raise_for_status=True) as session:
            async with session.get(config.HYDRA_JWKS_URI) as resp:
                jwks = await resp.json()
                self.public_keys = {
                    jwk["kid"]: RSAAlgorithm.from_jwk(jwk)
                    for jwk in jwks["keys"]
                }
        self._logger.info("Got public keys from %s", config.HYDRA_JWKS_URI)
    except Exception:
        self._logger.exception(
            "Unable to retrieve jwks, token login will be unavailable!"
        )

Get the latest jwks from the hydra endpoint

Inherited members

class PartyService (game_service: GameService)
Expand source code
@with_logger
class PartyService(Service):
    """
    Service responsible for managing the player parties.

    Logically, we consider players to always be in a party, either alone, or
    with other players.
    """

    def __init__(self, game_service: GameService):
        self.game_service = game_service
        self.player_parties: dict[Player, PlayerParty] = {}
        self._dirty_parties: set[PlayerParty] = set()

    async def initialize(self):
        self._update_task = at_interval(1, self.update_dirties)

    async def shutdown(self):
        self._update_task.stop()

    async def update_dirties(self):
        if not self._dirty_parties:
            return

        dirty_parties = self._dirty_parties
        self._dirty_parties = set()

        for party in dirty_parties:
            try:
                self.write_broadcast_party(party)
            except Exception:  # pragma: no cover
                self._logger.exception(
                    "Unexpected exception while sending party updates!"
                )

    def write_broadcast_party(self, party, members=None):
        """
        Send a party update to all players in the party
        """
        if not members:
            members = iter(party)
        msg = {
            "command": "update_party",
            **party.to_dict()
        }
        for member in members:
            # Will re-encode the message for each player
            member.player.write_message(msg)

    def get_party(self, owner: Player) -> PlayerParty:
        party = self.player_parties.get(owner)
        if not party:
            party = PlayerParty(owner)
            self.player_parties[owner] = party

        return party

    def mark_dirty(self, party: PlayerParty):
        self._dirty_parties.add(party)

    def invite_player_to_party(self, sender: Player, recipient: Player):
        """
        Creates a new party for `sender` if one doesn't exist, and invites
        `recipient` to that party.
        """
        if sender not in self.player_parties:
            self.player_parties[sender] = PlayerParty(sender)

        party = self.player_parties[sender]

        if party.owner != sender:
            raise ClientError("You do not own this party.", recoverable=True)

        party.add_invited_player(recipient)
        recipient.write_message({
            "command": "party_invite",
            "sender": sender.id
        })

    async def accept_invite(self, recipient: Player, sender: Player):
        party = self.player_parties.get(sender)
        if (
            not party or
            recipient not in party.invited_players or
            party.invited_players[recipient].is_expired()
        ):
            # TODO: Localize with a proper message
            raise ClientError("You are not invited to that party (anymore)", recoverable=True)

        if sender.state is PlayerState.SEARCHING_LADDER:
            # TODO: Localize with a proper message
            raise ClientError("That party is already in queue", recoverable=True)

        old_party = self.player_parties.get(recipient)
        if old_party is not None:
            # Preserve state (like faction selection) from the old party
            member = old_party.get_member_by_player(recipient)
            assert member is not None

            await self.leave_party(recipient)
            party.add_member(member)
        else:
            party.add_player(recipient)

        self.player_parties[recipient] = party
        self.mark_dirty(party)

    async def kick_player_from_party(self, owner: Player, kicked_player: Player):
        if owner not in self.player_parties:
            raise ClientError("You are not in a party.", recoverable=True)

        party = self.player_parties[owner]

        if party.owner != owner:
            raise ClientError("You do not own that party.", recoverable=True)

        if kicked_player not in party:
            # Client state appears to be out of date
            await party.send_party(owner)
            return

        party.remove_player(kicked_player)
        del self.player_parties[kicked_player]

        kicked_player.write_message({"command": "kicked_from_party"})

        self.mark_dirty(party)

    async def leave_party(self, player: Player):
        if player not in self.player_parties:
            raise ClientError("You are not in a party.", recoverable=True)

        party = self.player_parties[player]
        self._remove_player_from_party(player, party)
        # TODO: Remove?
        await party.send_party(player)

    def _remove_player_from_party(self, player, party):
        party.remove_player(player)
        del self.player_parties[player]

        if party.is_disbanded():
            self.remove_party(party)
            return

        self.mark_dirty(party)

    def set_factions(self, player: Player, factions: list[Faction]):
        if player not in self.player_parties:
            self.player_parties[player] = PlayerParty(player)

        party = self.player_parties[player]
        party.set_factions(player, factions)
        self.mark_dirty(party)

    def remove_party(self, party):
        # Remove all players who were in the party
        for member in party:
            self._logger.info("Removing party for player %s", member.player)
            if party == self.player_parties.get(member.player):
                del self.player_parties[member.player]
            else:
                self._logger.warning(
                    "Player %s was in two parties at once!", member.player
                )

        members = party.members
        party.clear()
        # TODO: Send a special "disbanded" command?
        self.write_broadcast_party(party, members=members)

    def on_connection_lost(self, conn: "LobbyConnection") -> None:
        if not conn.player or conn.player not in self.player_parties:
            return

        self._remove_player_from_party(
            conn.player,
            self.player_parties[conn.player]
        )

Service responsible for managing the player parties.

Logically, we consider players to always be in a party, either alone, or with other players.

Ancestors

Methods

async def accept_invite(self,
recipient: Player,
sender: Player)
Expand source code
async def accept_invite(self, recipient: Player, sender: Player):
    party = self.player_parties.get(sender)
    if (
        not party or
        recipient not in party.invited_players or
        party.invited_players[recipient].is_expired()
    ):
        # TODO: Localize with a proper message
        raise ClientError("You are not invited to that party (anymore)", recoverable=True)

    if sender.state is PlayerState.SEARCHING_LADDER:
        # TODO: Localize with a proper message
        raise ClientError("That party is already in queue", recoverable=True)

    old_party = self.player_parties.get(recipient)
    if old_party is not None:
        # Preserve state (like faction selection) from the old party
        member = old_party.get_member_by_player(recipient)
        assert member is not None

        await self.leave_party(recipient)
        party.add_member(member)
    else:
        party.add_player(recipient)

    self.player_parties[recipient] = party
    self.mark_dirty(party)
def get_party(self,
owner: Player) ‑> PlayerParty
Expand source code
def get_party(self, owner: Player) -> PlayerParty:
    party = self.player_parties.get(owner)
    if not party:
        party = PlayerParty(owner)
        self.player_parties[owner] = party

    return party
def invite_player_to_party(self,
sender: Player,
recipient: Player)
Expand source code
def invite_player_to_party(self, sender: Player, recipient: Player):
    """
    Creates a new party for `sender` if one doesn't exist, and invites
    `recipient` to that party.
    """
    if sender not in self.player_parties:
        self.player_parties[sender] = PlayerParty(sender)

    party = self.player_parties[sender]

    if party.owner != sender:
        raise ClientError("You do not own this party.", recoverable=True)

    party.add_invited_player(recipient)
    recipient.write_message({
        "command": "party_invite",
        "sender": sender.id
    })

Creates a new party for sender if one doesn't exist, and invites recipient to that party.

async def kick_player_from_party(self,
owner: Player,
kicked_player: Player)
Expand source code
async def kick_player_from_party(self, owner: Player, kicked_player: Player):
    if owner not in self.player_parties:
        raise ClientError("You are not in a party.", recoverable=True)

    party = self.player_parties[owner]

    if party.owner != owner:
        raise ClientError("You do not own that party.", recoverable=True)

    if kicked_player not in party:
        # Client state appears to be out of date
        await party.send_party(owner)
        return

    party.remove_player(kicked_player)
    del self.player_parties[kicked_player]

    kicked_player.write_message({"command": "kicked_from_party"})

    self.mark_dirty(party)
async def leave_party(self,
player: Player)
Expand source code
async def leave_party(self, player: Player):
    if player not in self.player_parties:
        raise ClientError("You are not in a party.", recoverable=True)

    party = self.player_parties[player]
    self._remove_player_from_party(player, party)
    # TODO: Remove?
    await party.send_party(player)
def mark_dirty(self,
party: PlayerParty)
Expand source code
def mark_dirty(self, party: PlayerParty):
    self._dirty_parties.add(party)
def remove_party(self, party)
Expand source code
def remove_party(self, party):
    # Remove all players who were in the party
    for member in party:
        self._logger.info("Removing party for player %s", member.player)
        if party == self.player_parties.get(member.player):
            del self.player_parties[member.player]
        else:
            self._logger.warning(
                "Player %s was in two parties at once!", member.player
            )

    members = party.members
    party.clear()
    # TODO: Send a special "disbanded" command?
    self.write_broadcast_party(party, members=members)
def set_factions(self,
player: Player,
factions: list[Faction])
Expand source code
def set_factions(self, player: Player, factions: list[Faction]):
    if player not in self.player_parties:
        self.player_parties[player] = PlayerParty(player)

    party = self.player_parties[player]
    party.set_factions(player, factions)
    self.mark_dirty(party)
async def update_dirties(self)
Expand source code
async def update_dirties(self):
    if not self._dirty_parties:
        return

    dirty_parties = self._dirty_parties
    self._dirty_parties = set()

    for party in dirty_parties:
        try:
            self.write_broadcast_party(party)
        except Exception:  # pragma: no cover
            self._logger.exception(
                "Unexpected exception while sending party updates!"
            )
def write_broadcast_party(self, party, members=None)
Expand source code
def write_broadcast_party(self, party, members=None):
    """
    Send a party update to all players in the party
    """
    if not members:
        members = iter(party)
    msg = {
        "command": "update_party",
        **party.to_dict()
    }
    for member in members:
        # Will re-encode the message for each player
        member.player.write_message(msg)

Send a party update to all players in the party

Inherited members

class PlayerService (database: FAFDatabase)
Expand source code
@with_logger
class PlayerService(Service):
    def __init__(self, database: FAFDatabase):
        self._db = database
        self._players = dict()

        # Static-ish data fields.
        self.uniqueid_exempt = {}
        self._dirty_players = set()

    async def initialize(self) -> None:
        await self.update_data()
        self._update_cron = aiocron.crontab(
            "*/10 * * * *", func=self.update_data
        )

    def __len__(self):
        return len(self._players)

    def __iter__(self):
        return self._players.values().__iter__()

    def __getitem__(self, player_id: int) -> Optional[Player]:
        return self._players.get(player_id)

    def __setitem__(self, player_id: int, player: Player):
        self._players[player_id] = player
        metrics.players_online.set(len(self._players))

    @property
    def all_players(self) -> ValuesView[Player]:
        return self._players.values()

    def mark_dirty(self, player: Player):
        self._dirty_players.add(player)

    def pop_dirty_players(self) -> set[Player]:
        dirty_players = self._dirty_players
        self._dirty_players = set()

        return dirty_players

    async def fetch_player_data(self, player):
        async with self._db.acquire() as conn:
            result = await conn.execute(
                select(user_group.c.technical_name)
                .select_from(user_group_assignment.join(user_group))
                .where(user_group_assignment.c.user_id == player.id)
            )
            player.user_groups = {row.technical_name for row in result}

            sql = select(
                avatars_list.c.url,
                avatars_list.c.tooltip,
                clan.c.tag
            ).select_from(
                login
                .outerjoin(clan_membership)
                .outerjoin(clan)
                .outerjoin(
                    avatars,
                    onclause=and_(
                        avatars.c.idUser == login.c.id,
                        avatars.c.selected == 1
                    )
                )
                .outerjoin(avatars_list)
            ).where(login.c.id == player.id)  # yapf: disable

            result = await conn.execute(sql)
            row = result.fetchone()
            if not row:
                self._logger.warning(
                    "Did not find data for player with id %i",
                    player.id
                )
                return

            row = row._mapping
            player.clan = row.get(clan.c.tag)

            url, tooltip = (
                row.get(avatars_list.c.url),
                row.get(avatars_list.c.tooltip)
            )
            if url and tooltip:
                player.avatar = {"url": url, "tooltip": tooltip}

            await self._fetch_player_ratings(player, conn)

    async def _fetch_player_ratings(self, player, conn):
        sql = select(
            leaderboard_rating.c.mean,
            leaderboard_rating.c.deviation,
            leaderboard_rating.c.total_games,
            leaderboard.c.technical_name,
        ).select_from(
            leaderboard.join(leaderboard_rating)
        ).where(
            leaderboard_rating.c.login_id == player.id
        )
        result = await conn.execute(sql)

        retrieved_ratings = {
            row.technical_name: (
                (row.mean, row.deviation),
                row.total_games
            )
            for row in result
        }
        for rating_type, (rating, total_games) in retrieved_ratings.items():
            player.ratings[rating_type] = rating
            player.game_count[rating_type] = total_games

    def remove_player(self, player: Player):
        if player.id in self._players:
            # This signals that the player is now disconnected
            del player.lobby_connection
            del self._players[player.id]
            metrics.players_online.set(len(self._players))
            self.mark_dirty(player)

    async def has_permission_role(self, player: Player, role_name: str) -> bool:
        async with self._db.acquire() as conn:
            result = await conn.execute(
                select(group_permission.c.id)
                .select_from(
                    user_group_assignment
                    .join(group_permission_assignment, onclause=(
                        user_group_assignment.c.group_id ==
                        group_permission_assignment.c.group_id
                    ))
                    .join(group_permission)
                )
                .where(
                    and_(
                        user_group_assignment.c.user_id == player.id,
                        group_permission.c.technical_name == role_name
                    )
                )
            )
            row = result.fetchone()
            return row is not None

    def is_uniqueid_exempt(self, user_id: int) -> bool:
        return user_id in self.uniqueid_exempt

    def get_player(self, player_id: int) -> Optional[Player]:
        return self._players.get(player_id)

    def signal_player_rating_change(
        self, player_id: int, rating_type: str, new_rating: Rating
    ) -> None:
        player = self.get_player(player_id)
        if player is None:
            self._logger.debug(
                "Received rating change for player with id %i not in PlayerService.",
                player_id
            )
            return

        self._logger.debug(
            "Received rating change for player %s.", player
        )
        player.ratings[rating_type] = new_rating
        player.game_count[rating_type] += 1
        self.mark_dirty(player)

    async def update_data(self):
        """
        Update rarely-changing data, such as the admin list and the list of users exempt from the
        uniqueid check.
        """
        async with self._db.acquire() as conn:
            # UniqueID-exempt users.
            result = await conn.execute(
                "SELECT `user_id` FROM uniqueid_exempt"
            )
            self.uniqueid_exempt = frozenset(map(lambda x: x[0], result))

    async def kick_idle_players(self):
        for fut in asyncio.as_completed([
            player.lobby_connection.abort("Graceful shutdown.")
            for player in self.all_players
            if player.state == PlayerState.IDLE
            if player.lobby_connection is not None
        ]):
            try:
                await fut
            except Exception:
                self._logger.debug(
                    "Error while aborting connection",
                    exc_info=True
                )

    def on_connection_lost(self, conn: "LobbyConnection") -> None:
        if not conn.player:
            return

        self.remove_player(conn.player)

        self._logger.debug(
            "Removed player %d, %s, %d",
            conn.player.id,
            conn.player.login,
            conn.session
        )

    async def graceful_shutdown(self):
        if config.SHUTDOWN_KICK_IDLE_PLAYERS:
            self._kick_idle_task = at_interval(1, self.kick_idle_players)

All services should inherit from this class.

Services are singleton objects which manage some server task.

Ancestors

Instance variables

prop all_players : ValuesView[Player]
Expand source code
@property
def all_players(self) -> ValuesView[Player]:
    return self._players.values()

Methods

async def fetch_player_data(self, player)
Expand source code
async def fetch_player_data(self, player):
    async with self._db.acquire() as conn:
        result = await conn.execute(
            select(user_group.c.technical_name)
            .select_from(user_group_assignment.join(user_group))
            .where(user_group_assignment.c.user_id == player.id)
        )
        player.user_groups = {row.technical_name for row in result}

        sql = select(
            avatars_list.c.url,
            avatars_list.c.tooltip,
            clan.c.tag
        ).select_from(
            login
            .outerjoin(clan_membership)
            .outerjoin(clan)
            .outerjoin(
                avatars,
                onclause=and_(
                    avatars.c.idUser == login.c.id,
                    avatars.c.selected == 1
                )
            )
            .outerjoin(avatars_list)
        ).where(login.c.id == player.id)  # yapf: disable

        result = await conn.execute(sql)
        row = result.fetchone()
        if not row:
            self._logger.warning(
                "Did not find data for player with id %i",
                player.id
            )
            return

        row = row._mapping
        player.clan = row.get(clan.c.tag)

        url, tooltip = (
            row.get(avatars_list.c.url),
            row.get(avatars_list.c.tooltip)
        )
        if url and tooltip:
            player.avatar = {"url": url, "tooltip": tooltip}

        await self._fetch_player_ratings(player, conn)
def get_player(self, player_id: int) ‑> Player | None
Expand source code
def get_player(self, player_id: int) -> Optional[Player]:
    return self._players.get(player_id)
async def has_permission_role(self,
player: Player,
role_name: str) ‑> bool
Expand source code
async def has_permission_role(self, player: Player, role_name: str) -> bool:
    async with self._db.acquire() as conn:
        result = await conn.execute(
            select(group_permission.c.id)
            .select_from(
                user_group_assignment
                .join(group_permission_assignment, onclause=(
                    user_group_assignment.c.group_id ==
                    group_permission_assignment.c.group_id
                ))
                .join(group_permission)
            )
            .where(
                and_(
                    user_group_assignment.c.user_id == player.id,
                    group_permission.c.technical_name == role_name
                )
            )
        )
        row = result.fetchone()
        return row is not None
def is_uniqueid_exempt(self, user_id: int) ‑> bool
Expand source code
def is_uniqueid_exempt(self, user_id: int) -> bool:
    return user_id in self.uniqueid_exempt
async def kick_idle_players(self)
Expand source code
async def kick_idle_players(self):
    for fut in asyncio.as_completed([
        player.lobby_connection.abort("Graceful shutdown.")
        for player in self.all_players
        if player.state == PlayerState.IDLE
        if player.lobby_connection is not None
    ]):
        try:
            await fut
        except Exception:
            self._logger.debug(
                "Error while aborting connection",
                exc_info=True
            )
def mark_dirty(self,
player: Player)
Expand source code
def mark_dirty(self, player: Player):
    self._dirty_players.add(player)
def pop_dirty_players(self) ‑> set[Player]
Expand source code
def pop_dirty_players(self) -> set[Player]:
    dirty_players = self._dirty_players
    self._dirty_players = set()

    return dirty_players
def remove_player(self,
player: Player)
Expand source code
def remove_player(self, player: Player):
    if player.id in self._players:
        # This signals that the player is now disconnected
        del player.lobby_connection
        del self._players[player.id]
        metrics.players_online.set(len(self._players))
        self.mark_dirty(player)
def signal_player_rating_change(self, player_id: int, rating_type: str, new_rating: trueskill.Rating) ‑> None
Expand source code
def signal_player_rating_change(
    self, player_id: int, rating_type: str, new_rating: Rating
) -> None:
    player = self.get_player(player_id)
    if player is None:
        self._logger.debug(
            "Received rating change for player with id %i not in PlayerService.",
            player_id
        )
        return

    self._logger.debug(
        "Received rating change for player %s.", player
    )
    player.ratings[rating_type] = new_rating
    player.game_count[rating_type] += 1
    self.mark_dirty(player)
async def update_data(self)
Expand source code
async def update_data(self):
    """
    Update rarely-changing data, such as the admin list and the list of users exempt from the
    uniqueid check.
    """
    async with self._db.acquire() as conn:
        # UniqueID-exempt users.
        result = await conn.execute(
            "SELECT `user_id` FROM uniqueid_exempt"
        )
        self.uniqueid_exempt = frozenset(map(lambda x: x[0], result))

Update rarely-changing data, such as the admin list and the list of users exempt from the uniqueid check.

Inherited members

class RatingService (database: FAFDatabase,
player_service: PlayerService,
message_queue_service: MessageQueueService)
Expand source code
@with_logger
class RatingService(Service):
    """
    Service responsible for calculating and saving trueskill rating updates.
    To avoid race conditions, rating updates from a single game ought to be
    atomic.
    """

    def __init__(
        self,
        database: FAFDatabase,
        player_service: PlayerService,
        message_queue_service: MessageQueueService
    ):
        self._db = database
        self._player_service_callback = player_service.signal_player_rating_change
        self._accept_input = False
        self._queue = asyncio.Queue()
        self._task = None
        self._rating_type_ids: Optional[dict[str, int]] = None
        self.leaderboards: dict[str, Leaderboard] = {}
        self._message_queue_service = message_queue_service

    async def initialize(self) -> None:
        if self._task is not None:
            self._logger.error("Service already runnning or not properly shut down.")
            return

        await self.update_data()
        self._update_cron = aiocron.crontab("*/10 * * * *", func=self.update_data)
        self._accept_input = True
        self._task = asyncio.create_task(self._handle_rating_queue())

    async def update_data(self):
        async with self._db.acquire() as conn:
            initializer = leaderboard.alias()
            sql = select(
                leaderboard.c.id,
                leaderboard.c.technical_name,
                initializer.c.technical_name.label("initializer")
            ).select_from(
                leaderboard.outerjoin(
                    initializer,
                    leaderboard.c.initializer_id == initializer.c.id
                )
            )
            result = await conn.execute(sql)
            rows = result.fetchall()

            self.leaderboards.clear()
            self._rating_type_ids = {}
            for row in rows:
                self.leaderboards[row.technical_name] = Leaderboard(
                    row.id,
                    row.technical_name
                )
                self._rating_type_ids[row.technical_name] = row.id

            # Link the initializers
            for row in rows:
                current = self.leaderboards[row.technical_name]
                init = self.leaderboards.get(row.initializer)
                if init:
                    current.initializer = init

    async def enqueue(self, game_info: dict[str]) -> None:
        if not self._accept_input:
            self._logger.warning("Dropped rating request %s", game_info)
            raise ServiceNotReadyError(
                "RatingService not yet initialized or shutting down."
            )

        summary = GameRatingSummary.from_game_info_dict(game_info)
        self._logger.debug("Queued up rating request %s", summary)
        await self._queue.put(summary)
        rating_service_backlog.set(self._queue.qsize())

    async def _handle_rating_queue(self) -> None:
        self._logger.debug("RatingService started!")
        try:
            while self._accept_input or not self._queue.empty():
                summary = await self._queue.get()
                self._logger.debug("Now rating request %s", summary)

                try:
                    # Make sure we finish writing rating changes even if the
                    # server is shutting down
                    await asyncio.shield(self._rate(summary))
                except GameRatingError:
                    self._logger.warning("Error rating game %s", summary)
                except Exception:  # pragma: no cover
                    self._logger.exception("Failed rating request %s", summary)
                else:
                    self._logger.debug("Done rating request.")
                finally:
                    self._queue.task_done()
                rating_service_backlog.set(self._queue.qsize())
        except asyncio.CancelledError:
            pass
        except Exception:  # pragma: no cover
            self._logger.critical(
                "Unexpected exception while handling rating queue.",
                exc_info=True
            )

        self._logger.debug("RatingService stopped.")

    async def _rate(self, summary: GameRatingSummary) -> None:
        assert self._rating_type_ids is not None

        if summary.rating_type not in self._rating_type_ids:
            raise GameRatingError(f"Unknown rating type {summary.rating_type}.")

        rater = GameRater(summary)
        rating_results = []

        async with self._db.acquire() as conn:
            # Fetch all players rating info from the database
            player_ratings = await self._get_all_player_ratings(
                conn, rater.player_ids
            )
            rating_result = await self._rate_for_leaderboard(
                conn,
                summary.game_id,
                summary.rating_type,
                player_ratings,
                rater
            )
            assert rating_result is not None
            rating_results.append(rating_result)

            # TODO: If we add hidden ratings, make sure to check for them here.
            # Hidden ratings should not affect global.
            # TODO: Use game_type == "matchmaker" instead?
            if summary.rating_type != RatingType.GLOBAL:
                self._logger.debug(
                    "Performing global rating adjustment for players: %s",
                    rater.player_ids
                )
                adjustment_rater = AdjustmentGameRater(
                    rater,
                    rating_result.old_ratings
                )
                global_rating_result = await self._rate_for_leaderboard(
                    conn,
                    summary.game_id,
                    RatingType.GLOBAL,
                    player_ratings,
                    adjustment_rater,
                    update_game_player_stats=False
                )
                if global_rating_result:
                    rating_results.append(global_rating_result)

        for rating_result in rating_results:
            await self._publish_rating_changes(
                rating_result.game_id,
                rating_result.rating_type,
                rating_result.old_ratings,
                rating_result.new_ratings,
                rating_result.outcome_map
            )

    async def _rate_for_leaderboard(
        self,
        conn,
        game_id: int,
        rating_type: str,
        player_ratings: dict[PlayerID, PlayerRatings],
        rater: GameRater,
        update_game_player_stats: bool = True
    ) -> Optional[GameRatingResult]:
        """
        Rates a game using a particular rating_type and GameRater.
        """
        uninitialized_ratings = {
            # Querying the key will create the value using rating
            # initialization, sort of like a defaultdict.
            player_id: Rating(*player_ratings[player_id][rating_type])
            for player_id in player_ratings.keys()
            if rating_type not in player_ratings[player_id]
        }
        # Initialize the ratings we need
        old_ratings = {
            player_id: Rating(*player_ratings[player_id][rating_type])
            for player_id in player_ratings.keys()
        }

        new_ratings = rater.compute_rating(old_ratings)
        if not new_ratings:
            return None

        need_initial_ratings = {
            player_id: rating
            for player_id, rating in uninitialized_ratings.items()
            if player_id in new_ratings
        }
        if need_initial_ratings:
            # Ensure that leaderboard entries exist before calling persist.
            await self._create_initial_ratings(
                conn,
                rating_type,
                need_initial_ratings
            )

        outcome_map = rater.get_outcome_map()
        # Now persist the changes for all players that get the adjustment.
        await self._persist_rating_changes(
            conn,
            game_id,
            rating_type,
            old_ratings,
            new_ratings,
            outcome_map,
            update_game_player_stats=update_game_player_stats
        )

        return GameRatingResult(
            game_id,
            rating_type,
            old_ratings,
            new_ratings,
            outcome_map
        )

    async def _create_initial_ratings(
        self,
        conn,
        rating_type: str,
        ratings: RatingDict
    ):
        assert self._rating_type_ids is not None

        leaderboard_id = self._rating_type_ids[rating_type]

        values = [
            dict(
                login_id=player_id,
                mean=rating.mean,
                deviation=rating.dev,
                total_games=0,
                won_games=0,
                leaderboard_id=leaderboard_id,
            )
            for player_id, rating in ratings.items()
        ]
        if values:
            await conn.execute(
                leaderboard_rating.insert(),
                values
            )

    async def _get_all_player_ratings(
        self, conn, player_ids: list[PlayerID]
    ) -> dict[PlayerID, PlayerRatings]:
        sql = select(
            leaderboard_rating.c.login_id,
            leaderboard.c.technical_name,
            leaderboard_rating.c.mean,
            leaderboard_rating.c.deviation
        ).join(leaderboard).where(
            leaderboard_rating.c.login_id.in_(player_ids)
        )
        result = await conn.execute(sql)

        player_ratings = {
            player_id: PlayerRatings(self.leaderboards, init=False)
            for player_id in player_ids
        }

        for row in result:
            player_id, rating_type = row.login_id, row.technical_name
            player_ratings[player_id][rating_type] = (row.mean, row.deviation)

        return player_ratings

    async def _persist_rating_changes(
        self,
        conn,
        game_id: int,
        rating_type: str,
        old_ratings: RatingDict,
        new_ratings: RatingDict,
        outcomes: dict[PlayerID, GameOutcome],
        update_game_player_stats: bool = True
    ) -> None:
        """
        Persist computed ratings to the respective players' selected rating
        """
        assert self._rating_type_ids is not None

        self._logger.debug("Saving rating change stats for game %i", game_id)

        ratings = [
            (player_id, old_ratings[player_id], new_ratings[player_id])
            for player_id in new_ratings.keys()
        ]

        for player_id, old_rating, new_rating in ratings:
            self._logger.debug(
                "New %s rating for player with id %s: %s -> %s",
                rating_type,
                player_id,
                old_rating,
                new_rating,
            )

        if update_game_player_stats:
            # DEPRECATED: game_player_stats table contains rating data.
            # Use leaderboard_rating_journal instead
            gps_update_sql = (
                game_player_stats.update()
                .where(
                    and_(
                        game_player_stats.c.playerId == bindparam("player_id"),
                        game_player_stats.c.gameId == game_id,
                    )
                )
                .values(
                    after_mean=bindparam("after_mean"),
                    after_deviation=bindparam("after_deviation"),
                    mean=bindparam("mean"),
                    deviation=bindparam("deviation"),
                    scoreTime=func.now()
                )
            )
            try:
                result = await conn.execute(gps_update_sql, [
                    dict(
                        player_id=player_id,
                        after_mean=new_rating.mean,
                        after_deviation=new_rating.dev,
                        mean=old_rating.mean,
                        deviation=old_rating.dev,
                    )
                    for player_id, old_rating, new_rating in ratings
                ])

                if result.rowcount != len(ratings):
                    self._logger.warning(
                        "gps_update_sql only updated %d out of %d rows for game_id %d",
                        result.rowcount,
                        len(ratings),
                        game_id
                    )
                    return
            except pymysql.OperationalError:
                # Could happen if we drop the rating columns from game_player_stats
                self._logger.warning(
                    "gps_update_sql failed for game %d, ignoring...",
                    game_id,
                    exc_info=True
                )

        leaderboard_id = self._rating_type_ids[rating_type]

        journal_insert_sql = leaderboard_rating_journal.insert().values(
            leaderboard_id=leaderboard_id,
            rating_mean_before=bindparam("rating_mean_before"),
            rating_deviation_before=bindparam("rating_deviation_before"),
            rating_mean_after=bindparam("rating_mean_after"),
            rating_deviation_after=bindparam("rating_deviation_after"),
            game_player_stats_id=select(game_player_stats.c.id).where(
                and_(
                    game_player_stats.c.playerId == bindparam("player_id"),
                    game_player_stats.c.gameId == game_id,
                )
            ).scalar_subquery(),
        )
        await conn.execute(journal_insert_sql, [
            dict(
                player_id=player_id,
                rating_mean_before=old_rating.mean,
                rating_deviation_before=old_rating.dev,
                rating_mean_after=new_rating.mean,
                rating_deviation_after=new_rating.dev,
            )
            for player_id, old_rating, new_rating in ratings
        ])

        rating_update_sql = (
            leaderboard_rating.update()
            .where(
                and_(
                    leaderboard_rating.c.login_id == bindparam("player_id"),
                    leaderboard_rating.c.leaderboard_id == leaderboard_id,
                )
            )
            .values(
                mean=bindparam("mean"),
                deviation=bindparam("deviation"),
                total_games=leaderboard_rating.c.total_games + 1,
                won_games=leaderboard_rating.c.won_games + bindparam("increment"),
            )
        )
        await conn.execute(rating_update_sql, [
            dict(
                player_id=player_id,
                mean=new_rating.mean,
                deviation=new_rating.dev,
                increment=(
                    1 if outcomes[player_id] is GameOutcome.VICTORY else 0
                )
            )
            for player_id, _, new_rating in ratings
        ])

        for player_id, new_rating in new_ratings.items():
            self._update_player_object(player_id, rating_type, new_rating)

    def _update_player_object(
        self, player_id: PlayerID, rating_type: str, new_rating: Rating
    ) -> None:
        if self._player_service_callback is None:
            self._logger.warning(
                "Tried to send rating change to player service, "
                "but no service was registered."
            )
            return

        self._logger.debug(
            "Sending player rating update for player with id %i", player_id
        )
        self._player_service_callback(player_id, rating_type, new_rating)

    async def _publish_rating_changes(
        self,
        game_id: int,
        rating_type: str,
        old_ratings: RatingDict,
        new_ratings: RatingDict,
        outcomes: dict[PlayerID, GameOutcome],
    ):
        for player_id, new_rating in new_ratings.items():
            if player_id not in outcomes:
                self._logger.error("Missing outcome for player %i", player_id)
                continue
            if player_id not in old_ratings:
                self._logger.error("Missing old rating for player %i", player_id)
                continue

            old_rating = old_ratings[player_id]

            rating_change_dict = {
                "game_id": game_id,
                "player_id": player_id,
                "rating_type": rating_type,
                "new_rating_mean": new_rating.mean,
                "new_rating_deviation": new_rating.dev,
                "old_rating_mean": old_rating.mean,
                "old_rating_deviation": old_rating.dev,
                "outcome": outcomes[player_id].value
            }

            await self._message_queue_service.publish(
                config.MQ_EXCHANGE_NAME,
                "success.rating.update",
                rating_change_dict,
            )

    async def _join_rating_queue(self) -> None:
        """
        Offers a call that is blocking until the rating queue has been emptied.
        Mostly for testing purposes.
        """
        await self._queue.join()

    async def shutdown(self) -> None:
        """
        Finish rating all remaining games, then exit.
        """
        self._accept_input = False
        self._logger.debug(
            "Shutdown initiated. Waiting on current queue: %s", self._queue
        )
        if self._queue.empty() and self._task:
            self._task.cancel()
        await self._queue.join()
        self._task = None
        self._logger.debug("Queue emptied: %s", self._queue)

    def kill(self) -> None:
        """
        Exit without waiting for the queue to join.
        """
        self._accept_input = False
        if self._task is not None:
            self._task.cancel()
            self._task = None

Service responsible for calculating and saving trueskill rating updates. To avoid race conditions, rating updates from a single game ought to be atomic.

Ancestors

Methods

async def enqueue(self, game_info: dict[str]) ‑> None
Expand source code
async def enqueue(self, game_info: dict[str]) -> None:
    if not self._accept_input:
        self._logger.warning("Dropped rating request %s", game_info)
        raise ServiceNotReadyError(
            "RatingService not yet initialized or shutting down."
        )

    summary = GameRatingSummary.from_game_info_dict(game_info)
    self._logger.debug("Queued up rating request %s", summary)
    await self._queue.put(summary)
    rating_service_backlog.set(self._queue.qsize())
def kill(self) ‑> None
Expand source code
def kill(self) -> None:
    """
    Exit without waiting for the queue to join.
    """
    self._accept_input = False
    if self._task is not None:
        self._task.cancel()
        self._task = None

Exit without waiting for the queue to join.

async def shutdown(self) ‑> None
Expand source code
async def shutdown(self) -> None:
    """
    Finish rating all remaining games, then exit.
    """
    self._accept_input = False
    self._logger.debug(
        "Shutdown initiated. Waiting on current queue: %s", self._queue
    )
    if self._queue.empty() and self._task:
        self._task.cancel()
    await self._queue.join()
    self._task = None
    self._logger.debug("Queue emptied: %s", self._queue)

Finish rating all remaining games, then exit.

async def update_data(self)
Expand source code
async def update_data(self):
    async with self._db.acquire() as conn:
        initializer = leaderboard.alias()
        sql = select(
            leaderboard.c.id,
            leaderboard.c.technical_name,
            initializer.c.technical_name.label("initializer")
        ).select_from(
            leaderboard.outerjoin(
                initializer,
                leaderboard.c.initializer_id == initializer.c.id
            )
        )
        result = await conn.execute(sql)
        rows = result.fetchall()

        self.leaderboards.clear()
        self._rating_type_ids = {}
        for row in rows:
            self.leaderboards[row.technical_name] = Leaderboard(
                row.id,
                row.technical_name
            )
            self._rating_type_ids[row.technical_name] = row.id

        # Link the initializers
        for row in rows:
            current = self.leaderboards[row.technical_name]
            init = self.leaderboards.get(row.initializer)
            if init:
                current.initializer = init

Inherited members

class ServerInstance (name: str,
database: FAFDatabase,
loop: asyncio.base_events.BaseEventLoop)
Expand source code
class ServerInstance(object):
    """
    A class representing a shared server state. Each `ServerInstance` may be
    exposed on multiple ports, but each port will share the same internal server
    state, i.e. the same players, games, etc.
    """

    def __init__(
        self,
        name: str,
        database: FAFDatabase,
        loop: asyncio.BaseEventLoop,
        # For testing
        _override_services: Optional[dict[str, Service]] = None
    ):
        self.name = name
        self._logger = logging.getLogger(self.name)
        self.database = database
        self.loop = loop

        self.started = False

        self.contexts: set[ServerContext] = set()

        self.services = _override_services or create_services({
            "server": self,
            "database": self.database,
            "loop": self.loop,
        })

        self.connection_factory = lambda: LobbyConnection(
            database=database,
            geoip=self.services["geo_ip_service"],
            game_service=self.services["game_service"],
            players=self.services["player_service"],
            ladder_service=self.services["ladder_service"],
            party_service=self.services["party_service"],
            rating_service=self.services["rating_service"],
            oauth_service=self.services["oauth_service"],
        )

    def write_broadcast(
        self,
        message,
        predicate=lambda conn: conn.authenticated
    ):
        """
        Queue a message to be sent to all connected clients.
        """
        self._logger.log(TRACE, "]]: %s", message)
        metrics.server_broadcasts.inc()

        for ctx in self.contexts:
            try:
                ctx.write_broadcast(message, predicate)
            except Exception:
                self._logger.exception(
                    "Error writing '%s'",
                    message.get("command", message)
                )

    @synchronizedmethod
    async def start_services(self) -> None:
        if self.started:
            return

        num_services = len(self.services)
        self._logger.debug("Initializing %s services", num_services)

        async def initialize(service):
            start = time.perf_counter()
            await service.initialize()
            service._logger.debug(
                "%s initialized in %0.2f seconds",
                service.__class__.__name__,
                time.perf_counter() - start
            )

        await asyncio.gather(*[
            initialize(service) for service in self.services.values()
        ])

        self._logger.debug("Initialized %s services", num_services)

        self.started = True

    async def listen(
        self,
        address: tuple[str, int],
        name: Optional[str] = None,
        protocol_class: type[Protocol] = QDataStreamProtocol,
        proxy: bool = False,
    ) -> ServerContext:
        """
        Start listening on a new address.

        # Params
        - `address`: Tuple indicating the host, port to listen on.
        - `name`: String used to identify this context in log messages. The
            default is to use the `protocol_class` name.
        - `protocol_class`: The protocol class implementation to use.
        - `proxy`: Boolean indicating whether or not to use the PROXY protocol.
            See: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
        """
        if not self.started:
            await self.start_services()

        ctx = ServerContext(
            f"{self.name}[{name or protocol_class.__name__}]",
            self.connection_factory,
            list(self.services.values()),
            protocol_class
        )
        await ctx.listen(*address, proxy=proxy)

        self.contexts.add(ctx)

        return ctx

    async def graceful_shutdown(self):
        """
        Start a graceful shut down of the server.

        1. Notify all services of graceful shutdown
        """
        self._logger.info("Initiating graceful shutdown")

        await map_suppress(
            lambda service: service.graceful_shutdown(),
            self.services.values(),
            logger=self._logger,
            msg="when starting graceful shutdown of service "
        )

    async def shutdown(self):
        """
        Immediately shutdown the server.

        1. Stop accepting new connections
        2. Stop all services
        3. Close all existing connections
        """
        self._logger.info("Initiating full shutdown")

        await self._stop_contexts()
        await self._shutdown_services()
        await self._shutdown_contexts()

        self.contexts.clear()
        self.started = False

    async def drain(self):
        """
        Wait for all games to end.
        """
        game_service: GameService = self.services["game_service"]
        broadcast_service: BroadcastService = self.services["broadcast_service"]
        try:
            await asyncio.wait_for(
                game_service.drain_games(),
                timeout=config.SHUTDOWN_GRACE_PERIOD
            )
        except asyncio.CancelledError:
            self._logger.debug(
                "Stopped waiting for games to end due to forced shutdown"
            )
        except asyncio.TimeoutError:
            self._logger.warning(
                "Graceful shutdown period ended! %s games are still live!",
                len(game_service.live_games)
            )
        finally:
            # The report_dirties loop is responsible for clearing dirty games
            # and broadcasting the update messages to players and to RabbitMQ.
            # We need to wait here for that loop to complete otherwise it is
            # possible for the services to be shut down inbetween clearing the
            # games and posting the messages, causing the posts to fail.
            await broadcast_service.wait_report_dirtes()

    async def _shutdown_services(self):
        await map_suppress(
            lambda service: service.shutdown(),
            self.services.values(),
            logger=self._logger,
            msg="when shutting down service "
        )

    async def _stop_contexts(self):
        await map_suppress(
            lambda ctx: ctx.stop(),
            self.contexts,
            logger=self._logger,
            msg="when stopping context "
        )

    async def _shutdown_contexts(self):
        await map_suppress(
            lambda ctx: ctx.shutdown(),
            self.contexts,
            logger=self._logger,
            msg="when shutting down context "
        )

A class representing a shared server state. Each ServerInstance may be exposed on multiple ports, but each port will share the same internal server state, i.e. the same players, games, etc.

Methods

async def drain(self)
Expand source code
async def drain(self):
    """
    Wait for all games to end.
    """
    game_service: GameService = self.services["game_service"]
    broadcast_service: BroadcastService = self.services["broadcast_service"]
    try:
        await asyncio.wait_for(
            game_service.drain_games(),
            timeout=config.SHUTDOWN_GRACE_PERIOD
        )
    except asyncio.CancelledError:
        self._logger.debug(
            "Stopped waiting for games to end due to forced shutdown"
        )
    except asyncio.TimeoutError:
        self._logger.warning(
            "Graceful shutdown period ended! %s games are still live!",
            len(game_service.live_games)
        )
    finally:
        # The report_dirties loop is responsible for clearing dirty games
        # and broadcasting the update messages to players and to RabbitMQ.
        # We need to wait here for that loop to complete otherwise it is
        # possible for the services to be shut down inbetween clearing the
        # games and posting the messages, causing the posts to fail.
        await broadcast_service.wait_report_dirtes()

Wait for all games to end.

async def graceful_shutdown(self)
Expand source code
async def graceful_shutdown(self):
    """
    Start a graceful shut down of the server.

    1. Notify all services of graceful shutdown
    """
    self._logger.info("Initiating graceful shutdown")

    await map_suppress(
        lambda service: service.graceful_shutdown(),
        self.services.values(),
        logger=self._logger,
        msg="when starting graceful shutdown of service "
    )

Start a graceful shut down of the server.

  1. Notify all services of graceful shutdown
async def listen(self,
address: tuple[str, int],
name: str | None = None,
protocol_class: type[Protocol] = server.protocol.qdatastream.QDataStreamProtocol,
proxy: bool = False) ‑> ServerContext
Expand source code
async def listen(
    self,
    address: tuple[str, int],
    name: Optional[str] = None,
    protocol_class: type[Protocol] = QDataStreamProtocol,
    proxy: bool = False,
) -> ServerContext:
    """
    Start listening on a new address.

    # Params
    - `address`: Tuple indicating the host, port to listen on.
    - `name`: String used to identify this context in log messages. The
        default is to use the `protocol_class` name.
    - `protocol_class`: The protocol class implementation to use.
    - `proxy`: Boolean indicating whether or not to use the PROXY protocol.
        See: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
    """
    if not self.started:
        await self.start_services()

    ctx = ServerContext(
        f"{self.name}[{name or protocol_class.__name__}]",
        self.connection_factory,
        list(self.services.values()),
        protocol_class
    )
    await ctx.listen(*address, proxy=proxy)

    self.contexts.add(ctx)

    return ctx

Start listening on a new address.

Params

  • address: Tuple indicating the host, port to listen on.
  • name: String used to identify this context in log messages. The default is to use the protocol_class name.
  • protocol_class: The protocol class implementation to use.
  • proxy: Boolean indicating whether or not to use the PROXY protocol. See: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
async def shutdown(self)
Expand source code
async def shutdown(self):
    """
    Immediately shutdown the server.

    1. Stop accepting new connections
    2. Stop all services
    3. Close all existing connections
    """
    self._logger.info("Initiating full shutdown")

    await self._stop_contexts()
    await self._shutdown_services()
    await self._shutdown_contexts()

    self.contexts.clear()
    self.started = False

Immediately shutdown the server.

  1. Stop accepting new connections
  2. Stop all services
  3. Close all existing connections
async def start_services(self) ‑> None
Expand source code
@synchronizedmethod
async def start_services(self) -> None:
    if self.started:
        return

    num_services = len(self.services)
    self._logger.debug("Initializing %s services", num_services)

    async def initialize(service):
        start = time.perf_counter()
        await service.initialize()
        service._logger.debug(
            "%s initialized in %0.2f seconds",
            service.__class__.__name__,
            time.perf_counter() - start
        )

    await asyncio.gather(*[
        initialize(service) for service in self.services.values()
    ])

    self._logger.debug("Initialized %s services", num_services)

    self.started = True
def write_broadcast(self, message, predicate=<function ServerInstance.<lambda>>)
Expand source code
def write_broadcast(
    self,
    message,
    predicate=lambda conn: conn.authenticated
):
    """
    Queue a message to be sent to all connected clients.
    """
    self._logger.log(TRACE, "]]: %s", message)
    metrics.server_broadcasts.inc()

    for ctx in self.contexts:
        try:
            ctx.write_broadcast(message, predicate)
        except Exception:
            self._logger.exception(
                "Error writing '%s'",
                message.get("command", message)
            )

Queue a message to be sent to all connected clients.

class ViolationService
Expand source code
@with_logger
class ViolationService(Service):
    """
    Track who is banned from searching and for how long. Apply progressive
    discipline for repeated violations.

    A violation could be anything, but it is usually any time a player fails
    to connect to a game.
    """

    def __init__(self):
        # We store a reference to the original `Player` object for logging only
        self._violations: dict[int, tuple[Player, Violation]] = {}

    async def initialize(self):
        self._cleanup_task = at_interval(5, func=self.clear_expired)

    def clear_expired(self):
        now = datetime_now()
        for player, violation in list(self._violations.values()):
            if violation.is_expired(now):
                self._clear_violation(player)

    def register_violations(self, players: list[Player]):
        now = datetime_now()
        for player in players:
            violation = self.get_violation(player)
            if violation is None or violation.is_expired(now):
                violation = Violation(time=now)
                self.set_violation(player, violation)
            else:
                violation.register()

            player.write_message({
                "command": "search_violation",
                **violation.to_dict()
            })
            extra_text = ""
            if violation.count > 1:
                delta_text = humanize.precisedelta(
                    violation.get_ban_expiration() - now
                )
                extra_text = f" You can queue again in {delta_text}"
            player.write_message({
                "command": "notice",
                "style": "info",
                "text": (
                    f"You have caused a matchmaking connection failure {violation.count} time(s). "
                    "Multiple failures result in temporary time-outs from matchmaker. "
                    "Please seek support on the forums or discord for persistent issues." +
                    extra_text
                )
            })

    def get_violations(self, players: list[Player]) -> dict[Player, Violation]:
        now = datetime_now()
        result = {}
        for player in players:
            violation = self.get_violation(player)
            if not violation:
                continue
            elif violation.get_ban_expiration() > now:
                result[player] = violation
            elif violation.is_expired(now):
                self._clear_violation(player)

        return result

    def get_violation(self, player: Player) -> Optional[Violation]:
        _, violation = self._violations.get(player.id, (None, None))
        return violation

    def set_violation(self, player: Player, violation: Violation):
        self._violations[player.id] = (player, violation)

    def _clear_violation(self, player: Player):
        violation = self.get_violation(player)
        self._logger.debug(
            "Cleared violation for player %s: %s",
            player.login,
            violation
        )
        del self._violations[player.id]

Track who is banned from searching and for how long. Apply progressive discipline for repeated violations.

A violation could be anything, but it is usually any time a player fails to connect to a game.

Ancestors

Methods

def clear_expired(self)
Expand source code
def clear_expired(self):
    now = datetime_now()
    for player, violation in list(self._violations.values()):
        if violation.is_expired(now):
            self._clear_violation(player)
def get_violation(self,
player: Player) ‑> Violation | None
Expand source code
def get_violation(self, player: Player) -> Optional[Violation]:
    _, violation = self._violations.get(player.id, (None, None))
    return violation
def get_violations(self,
players: list[Player]) ‑> dict[PlayerViolation]
Expand source code
def get_violations(self, players: list[Player]) -> dict[Player, Violation]:
    now = datetime_now()
    result = {}
    for player in players:
        violation = self.get_violation(player)
        if not violation:
            continue
        elif violation.get_ban_expiration() > now:
            result[player] = violation
        elif violation.is_expired(now):
            self._clear_violation(player)

    return result
def register_violations(self,
players: list[Player])
Expand source code
def register_violations(self, players: list[Player]):
    now = datetime_now()
    for player in players:
        violation = self.get_violation(player)
        if violation is None or violation.is_expired(now):
            violation = Violation(time=now)
            self.set_violation(player, violation)
        else:
            violation.register()

        player.write_message({
            "command": "search_violation",
            **violation.to_dict()
        })
        extra_text = ""
        if violation.count > 1:
            delta_text = humanize.precisedelta(
                violation.get_ban_expiration() - now
            )
            extra_text = f" You can queue again in {delta_text}"
        player.write_message({
            "command": "notice",
            "style": "info",
            "text": (
                f"You have caused a matchmaking connection failure {violation.count} time(s). "
                "Multiple failures result in temporary time-outs from matchmaker. "
                "Please seek support on the forums or discord for persistent issues." +
                extra_text
            )
        })
def set_violation(self,
player: Player,
violation: Violation)
Expand source code
def set_violation(self, player: Player, violation: Violation):
    self._violations[player.id] = (player, violation)

Inherited members