Module server.games.game

Classes

class Game (id: int,
database: FAFDatabase,
game_service: GameService,
game_stats_service: GameStatsService,
host: Player | None = None,
name: str = 'New Game',
map: Map = Map(id=None, folder_name='scmp_007', ranked=False, weight=1),
game_mode: str = 'faf',
matchmaker_queue_id: int | None = None,
rating_type: str | None = None,
displayed_rating_range: InclusiveRange | None = None,
enforce_rating_range: bool = False,
max_players: int = 12,
setup_timeout: int = 60)
Expand source code
class Game:
    """
    Object that lasts for the lifetime of a game on FAF.
    """
    init_mode = InitMode.NORMAL_LOBBY
    game_type = GameType.CUSTOM

    def __init__(
        self,
        id: int,
        database: "FAFDatabase",
        game_service: "GameService",
        game_stats_service: "GameStatsService",
        host: Optional[Player] = None,
        name: str = "New Game",
        map: Map = MAP_DEFAULT,
        game_mode: str = FeaturedModType.FAF,
        matchmaker_queue_id: Optional[int] = None,
        rating_type: Optional[str] = None,
        displayed_rating_range: Optional[InclusiveRange] = None,
        enforce_rating_range: bool = False,
        max_players: int = 12,
        setup_timeout: int = 60,
    ):
        self.id = id
        self._db = database
        self._results = GameResultReports(id)
        self._army_stats_list = []
        self._players_with_unsent_army_stats = []
        self._game_stats_service = game_stats_service
        self.game_service = game_service
        self._player_options: dict[int, dict[str, Any]] = defaultdict(dict)
        self.hosted_at = None
        self.launched_at = None
        self.finished = False
        self._logger = logging.getLogger(
            f"{self.__class__.__qualname__}.{id}"
        )
        self.visibility = VisibilityState.PUBLIC
        self.host = host
        self.name = name
        self.map = map
        self.password = None
        self._players_at_launch: list[Player] = []
        self.AIs = {}
        self.desyncs = 0
        self.validity = ValidityState.VALID
        self.game_mode = game_mode
        self.rating_type = rating_type or RatingType.GLOBAL
        self.displayed_rating_range = displayed_rating_range or InclusiveRange()
        self.enforce_rating_range = enforce_rating_range
        self.matchmaker_queue_id = matchmaker_queue_id
        self.setup_timeout = setup_timeout
        self.state = GameState.INITIALIZING
        self._connections = {}
        self._configured_player_ids: set[int] = set()
        self.enforce_rating = False
        self.game_options = GameOptions(
            id,
            {
                "AIReplacement": "Off",
                "CheatsEnabled": "false",
                "FogOfWar": "explored",
                "GameSpeed": "normal",
                "NoRushOption": "Off",
                "PrebuiltUnits": "Off",
                "RestrictedCategories": 0,
                "ScenarioFile": (pathlib.PurePath(map.scenario_file)),
                "Slots": max_players,
                "TeamLock": "locked",
                "Unranked": "No",
                "Victory": Victory.DEMORALIZATION,
            }
        )
        self.game_options.add_async_callback(
            "ScenarioFile",
            self.on_scenario_file_changed,
        )
        self.game_options.add_callback("Title", self.on_title_changed)

        self.mods = {}
        self._hosted_future = asyncio.Future()
        self._finish_lock = asyncio.Lock()

        self._logger.debug("%s created", self)
        asyncio.get_event_loop().create_task(self.timeout_game(setup_timeout))

    async def timeout_game(self, timeout: int = 60):
        await asyncio.sleep(timeout)
        if self.state is GameState.INITIALIZING:
            self._logger.debug("Game setup timed out, cancelling game")
            await self.on_game_finish()

    async def on_scenario_file_changed(self, scenario_path: pathlib.PurePath):
        try:
            map_folder_name = scenario_path.parts[2].lower()
        except IndexError:
            return

        self.map = await self.game_service.get_map(map_folder_name)

    def on_title_changed(self, title: str):
        with contextlib.suppress(ValueError):
            self.name = title

    @property
    def name(self):
        return self._name

    @name.setter
    def name(self, value: str):
        """
        Verifies that names only contain ascii characters.
        """
        value = value.strip()

        if not value.isascii():
            raise ValueError("Game title must be ascii!")

        if not value:
            raise ValueError("Game title must not be empty!")

        self.set_name_unchecked(value)

    def set_name_unchecked(self, value: str):
        """
        Sets the game name without doing any validity checks.

        Truncates the game name to avoid crashing mysql INSERT statements.
        """
        max_len = game_stats.c.gameName.type.length
        self._name = value[:max_len]

    @property
    def max_players(self) -> int:
        return self.game_options["Slots"]

    @property
    def armies(self) -> frozenset[int]:
        return frozenset(
            self.get_player_option(player.id, "Army")
            for player in self.players
        )

    @property
    def players(self) -> list[Player]:
        """
        Players in the game

        Depending on the state, it is either:
          - (LOBBY) The currently connected players
          - (LIVE) Players who participated in the game
        """
        if self.state is GameState.LOBBY:
            return self.get_connected_players()
        else:
            return self._players_at_launch

    def get_connected_players(self) -> list[Player]:
        """
        Get a collection of all players currently connected to the game.
        """
        return [
            player for player in self._connections.keys()
            if player.id in self._configured_player_ids
        ]

    def _is_observer(self, player: Player) -> bool:
        army = self.get_player_option(player.id, "Army")
        return army is None or army < 0

    @property
    def connections(self) -> Iterable["GameConnection"]:
        return self._connections.values()

    @property
    def teams(self) -> frozenset[int]:
        """
        A set of all teams of this game's players.
        """
        return frozenset(
            self.get_player_option(player.id, "Team")
            for player in self.players
        )

    @property
    def is_ffa(self) -> bool:
        if len(self.players) < 3:
            return False

        return FFA_TEAM in self.teams

    @property
    def is_multi_team(self) -> bool:
        return len(self.teams) > 2

    @property
    def has_ai(self) -> bool:
        return len(self.AIs) > 0

    @property
    def is_even(self) -> bool:
        """
        If teams are balanced taking into account that players on the FFA team
        are on individual teams.

        # Returns
        `True` iff all teams have the same player count.

        Special cases:

        - `True` if there are zero teams.
        - `False` if there is a single team.
        """
        teams = self.get_team_sets()
        if len(teams) == 0:
            return True
        if len(teams) == 1:
            return False

        team_sizes = set(len(team) for team in teams)
        return len(team_sizes) == 1

    def get_team_sets(self) -> list[set[Player]]:
        """
        Returns a list of teams represented as sets of players.
        Note that FFA players will be separated into individual teams.
        """
        if None in self.teams:
            raise GameError(
                "Missing team for at least one player. (player, team): {}"
                .format([(player, self.get_player_option(player.id, "Team"))
                        for player in self.players])
            )

        teams = defaultdict(set)
        ffa_players = []
        for player in self.players:
            team_id = self.get_player_option(player.id, "Team")
            if team_id == FFA_TEAM:
                ffa_players.append({player})
            else:
                teams[team_id].add(player)

        return list(teams.values()) + ffa_players

    def set_hosted(self):
        self._hosted_future.set_result(None)
        self.hosted_at = datetime_now()

    async def add_result(
        self,
        reporter: int,
        army: int,
        result_type: str,
        score: int,
        result_metadata: frozenset[str] = frozenset(),
    ):
        """
        As computed by the game.

        # Params
        - `reporter`: player ID
        - `army`: the army number being reported for
        - `result_type`: a string representing the result
        - `score`: an arbitrary number assigned with the result
        - `result_metadata`: everything preceding the `result_type` in the
            result message from the game, one or more words, optional
        """
        if army not in self.armies:
            self._logger.debug(
                "Ignoring results for unknown army %s: %s %s reported by: %s",
                army, result_type, score, reporter
            )
            return

        try:
            outcome = ArmyReportedOutcome(result_type.upper())
        except ValueError:
            self._logger.debug(
                "Ignoring result reported by %s for army %s: %s %s",
                reporter, army, result_type, score
            )
            return

        result = GameResultReport(reporter, army, outcome, score, result_metadata)
        self._results.add(result)
        self._logger.info(
            "%s reported result for army %s: %s %s", reporter, army,
            result_type, score
        )

        self._process_pending_army_stats()

    def _process_pending_army_stats(self):
        for player in self._players_with_unsent_army_stats:
            army = self.get_player_option(player.id, "Army")
            if army not in self._results:
                continue

            for result in self._results[army]:
                if result.outcome is not GameOutcome.UNKNOWN:
                    self._process_army_stats_for_player(player)
                    break

    def _process_army_stats_for_player(self, player):
        try:
            if (
                len(self._army_stats_list) == 0
                or self.game_options["CheatsEnabled"] != "false"
            ):
                return

            self._players_with_unsent_army_stats.remove(player)
            asyncio.create_task(
                self._game_stats_service.process_game_stats(
                    player, self, self._army_stats_list
                )
            )
        except Exception:
            # Never let an error in processing army stats cascade
            self._logger.exception(
                "Army stats could not be processed from player %s in game %s",
                player, self
            )

    def add_game_connection(self, game_connection):
        """
        Add a game connection to this game.
        """
        if game_connection.state != GameConnectionState.CONNECTED_TO_HOST:
            raise GameError(
                f"Invalid GameConnectionState: {game_connection.state}"
            )
        if self.state is not GameState.LOBBY and self.state is not GameState.LIVE:
            raise GameError(f"Invalid GameState: {self.state}")

        self._logger.info("Added game connection %s", game_connection)
        self._connections[game_connection.player] = game_connection

    async def disconnect_player(self, player: Player):
        if player.game_connection not in self._connections.values():
            return

        self._configured_player_ids.discard(player.id)

        if self.state is GameState.LOBBY and player.id in self._player_options:
            del self._player_options[player.id]

        await self.remove_game_connection(player.game_connection)

    async def remove_game_connection(self, game_connection):
        """
        Remove a game connection from this game.

        Will trigger `on_game_finish` if there are no more active connections to the
        game.
        """
        if game_connection not in self._connections.values():
            return

        player = game_connection.player
        del self._connections[player]
        del player.game

        self._logger.info("Removed game connection %s", game_connection)

        await self.check_game_finish(player)

    async def check_game_finish(self, player):
        await self.check_sim_end()

        async with self._finish_lock:
            host_left_lobby = (
                player == self.host and self.state is not GameState.LIVE
            )

            if self.state is not GameState.ENDED and (
                self.finished or
                len(self._connections) == 0 or
                host_left_lobby
            ):
                await self.on_game_finish()
            else:
                self._process_pending_army_stats()

    async def check_sim_end(self):
        if self.finished:
            return
        if self.state is not GameState.LIVE:
            return
        if [conn for conn in self.connections if not conn.finished_sim]:
            return
        self.finished = True
        async with self._db.acquire() as conn:
            await conn.execute(
                game_stats.update().where(
                    game_stats.c.id == self.id
                ).values(
                    endTime=sql_now()
                )
            )

    async def on_game_finish(self):
        try:
            if self.state is GameState.LOBBY:
                self._logger.info("Game cancelled pre launch")
            elif self.state is GameState.INITIALIZING:
                self._logger.info("Game cancelled pre initialization")
            elif self.state is GameState.LIVE:
                self._logger.info("Game finished normally")

                if self.desyncs > 20:
                    await self.mark_invalid(ValidityState.TOO_MANY_DESYNCS)
                    return

                await self.process_game_results()

                self._process_pending_army_stats()
        except Exception:    # pragma: no cover
            self._logger.exception("Error during game end")
        finally:
            self.state = GameState.ENDED

            self.game_service.mark_dirty(self)

    async def _run_pre_rate_validity_checks(self):
        pass

    async def process_game_results(self):
        if not self._results:
            await self.mark_invalid(ValidityState.UNKNOWN_RESULT)
            return

        await self.persist_results()

        game_results = await self.resolve_game_results()
        await self.game_service.publish_game_results(game_results)

    async def resolve_game_results(self) -> EndedGameInfo:
        if self.state not in (GameState.LIVE, GameState.ENDED):
            raise GameError("Cannot rate game that has not been launched.")

        await self._run_pre_rate_validity_checks()

        basic_info = self.get_basic_info()

        team_army_results = [
            [self.get_army_results(player) for player in team]
            for team in basic_info.teams
        ]

        team_outcomes = [GameOutcome.UNKNOWN for _ in basic_info.teams]
        team_player_partial_outcomes = [
            {self.get_player_outcome(player) for player in team}
            for team in basic_info.teams
        ]

        try:
            # TODO: Remove override once game result messages are reliable
            team_outcomes = (
                self._outcome_override_hook()
                or resolve_game(team_player_partial_outcomes)
            )
        except GameResolutionError:
            if self.validity is ValidityState.VALID:
                await self.mark_invalid(ValidityState.UNKNOWN_RESULT)

        try:
            commander_kills = {
                army_stats["name"]: army_stats["units"]["cdr"]["kills"]
                for army_stats in self._army_stats_list
            }
        except KeyError:
            commander_kills = {}

        return EndedGameInfo.from_basic(
            basic_info,
            self.validity,
            team_outcomes,
            commander_kills,
            team_army_results,
        )

    def _outcome_override_hook(self) -> Optional[list[GameOutcome]]:
        return None

    async def load_results(self):
        """
        Load results from the database
        """
        self._results = await GameResultReports.from_db(self._db, self.id)

    async def persist_results(self):
        """
        Persist game results into the database

        Requires the game to have been launched and the appropriate rows to
        exist in the database.
        """

        self._logger.debug("Saving scores from game %s", self.id)
        scores = {}
        for player in self.players:
            army = self.get_player_option(player.id, "Army")
            outcome = self.get_player_outcome(player)
            score = self.get_army_score(army)
            scores[player] = (score, outcome)
            self._logger.info(
                "Result for army %s, player: %s: score %s, outcome %s",
                army, player, score, outcome
            )

        async with self._db.acquire() as conn:
            rows = []
            for player, (score, outcome) in scores.items():
                self._logger.info(
                    "Score for player %s: score %s, outcome %s",
                    player, score, outcome,
                )
                rows.append(
                    {
                        "score": score,
                        "result": outcome.name.upper(),
                        "game_id": self.id,
                        "player_id": player.id,
                    }
                )

            update_statement = game_player_stats.update().where(
                and_(
                    game_player_stats.c.gameId == bindparam("game_id"),
                    game_player_stats.c.playerId == bindparam("player_id"),
                )
            ).values(
                score=bindparam("score"),
                scoreTime=sql_now(),
                result=bindparam("result"),
            )
            await conn.deadlock_retry_execute(update_statement, rows)

    def get_basic_info(self) -> BasicGameInfo:
        return BasicGameInfo(
            self.id,
            self.rating_type,
            self.map.id,
            self.game_mode,
            list(self.mods.keys()),
            self.get_team_sets(),
        )

    def set_player_option(self, player_id: int, key: str, value: Any):
        """
        Set game-associative options for given player, by id
        """
        self._configured_player_ids.add(player_id)
        self._player_options[player_id][key] = value

    def get_player_option(self, player_id: int, key: str) -> Optional[Any]:
        """
        Retrieve game-associative options for given player, by their uid
        """
        return self._player_options[player_id].get(key)

    def set_ai_option(self, name, key, value):
        """
        Set game-associative options for given AI, by name
        """
        if name not in self.AIs:
            self.AIs[name] = {}
        self.AIs[name][key] = value

    def clear_slot(self, slot_index):
        """
        A somewhat awkward message while we're still half-slot-associated with
        a bunch of data.

        Just makes sure that any players associated with this slot aren't
        assigned an army or team, and deletes any AI's.
        """
        for player in self.players:
            if self.get_player_option(player.id, "StartSpot") == slot_index:
                self.set_player_option(player.id, "Team", -1)
                self.set_player_option(player.id, "Army", -1)
                self.set_player_option(player.id, "StartSpot", -1)

        to_remove = []
        for ai in self.AIs:
            if self.AIs[ai]["StartSpot"] == slot_index:
                to_remove.append(ai)
        for item in to_remove:
            del self.AIs[item]

    async def validate_game_settings(self):
        """
        Mark the game invalid if it has non-compliant options
        """

        # Only allow ranked mods
        for mod_id in self.mods.keys():
            if mod_id not in self.game_service.ranked_mods:
                await self.mark_invalid(ValidityState.BAD_MOD)
                return

        if self.has_ai:
            await self.mark_invalid(ValidityState.HAS_AI_PLAYERS)
            return
        if self.is_multi_team:
            await self.mark_invalid(ValidityState.MULTI_TEAM)
            return
        if self.is_ffa:
            await self.mark_invalid(ValidityState.FFA_NOT_RANKED)
            return
        valid_options = {
            "AIReplacement": (FA.DISABLED, ValidityState.HAS_AI_PLAYERS),
            "FogOfWar": ("explored", ValidityState.NO_FOG_OF_WAR),
            "CheatsEnabled": (FA.DISABLED, ValidityState.CHEATS_ENABLED),
            "PrebuiltUnits": (FA.DISABLED, ValidityState.PREBUILT_ENABLED),
            "NoRushOption": (FA.DISABLED, ValidityState.NORUSH_ENABLED),
            "RestrictedCategories": (0, ValidityState.BAD_UNIT_RESTRICTIONS),
            "TeamLock": ("locked", ValidityState.UNLOCKED_TEAMS),
            "Unranked": (FA.DISABLED, ValidityState.HOST_SET_UNRANKED)
        }
        if await self._validate_game_options(valid_options) is False:
            return

        await self.validate_game_mode_settings()

    async def validate_game_mode_settings(self):
        """
        A subset of checks that need to be overridden in coop games.
        """
        if len(self.players) < 2:
            await self.mark_invalid(ValidityState.SINGLE_PLAYER)
            return

        if None in self.teams or not self.is_even:
            await self.mark_invalid(ValidityState.UNEVEN_TEAMS_NOT_RANKED)
            return

        valid_options = {
            "Victory": (Victory.DEMORALIZATION, ValidityState.WRONG_VICTORY_CONDITION)
        }
        await self._validate_game_options(valid_options)

    async def _validate_game_options(
        self,
        valid_options: dict[str, tuple[Any, ValidityState]]
    ) -> bool:
        for key, value in self.game_options.items():
            if key in valid_options:
                valid_value, validity_state = valid_options[key]
                if value != valid_value:
                    await self.mark_invalid(validity_state)
                    return False
        return True

    async def launch(self):
        """
        Mark the game as live.

        Freezes the set of active players so they are remembered if they drop.
        """
        assert self.state is GameState.LOBBY
        self.launched_at = time.time()
        # Freeze currently connected players since we need them for rating when
        # the game ends.
        self._players_at_launch = [
            player for player in self.get_connected_players()
            if not self._is_observer(player)
        ]
        self._players_with_unsent_army_stats = list(self._players_at_launch)

        self.state = GameState.LIVE

        await self.on_game_launched()
        await self.validate_game_settings()

        self._logger.info("Game launched")

    async def on_game_launched(self):
        for player in self.players:
            player.state = PlayerState.PLAYING
        await self.update_game_stats()
        await self.update_game_player_stats()

    async def update_game_stats(self):
        """
        Runs at game-start to populate the game_stats table (games that start are ones we actually
        care about recording stats for, after all).
        """
        assert self.host is not None

        # Ensure map data is up to date
        self.map = await self.game_service.get_map(self.map.folder_name)

        if self.validity is ValidityState.VALID and not self.map.ranked:
            await self.mark_invalid(ValidityState.BAD_MAP)

        modId = self.game_service.featured_mods[self.game_mode].id

        # Write out the game_stats record.
        # In some cases, games can be invalidated while running: we check for
        # those cases when the game ends and update this record as appropriate.

        game_type = str(self.game_options.get("Victory").value)

        async with self._db.acquire() as conn:
            await conn.execute(
                game_stats.insert().values(
                    id=self.id,
                    gameType=game_type,
                    gameMod=modId,
                    host=self.host.id,
                    mapId=self.map.id,
                    gameName=self.name,
                    validity=self.validity.value,
                )
            )

            if self.matchmaker_queue_id is not None:
                await conn.execute(
                    matchmaker_queue_game.insert().values(
                        matchmaker_queue_id=self.matchmaker_queue_id,
                        game_stats_id=self.id,
                    )
                )

    async def update_game_player_stats(self):
        query_args = []
        for player in self.players:
            options = {
                key: self.get_player_option(player.id, key)
                for key in ["Team", "StartSpot", "Color", "Faction"]
            }

            is_observer = (
                options["Team"] is None
                or options["Team"] < 0
                or options["StartSpot"] is None
                or options["StartSpot"] < 0
            )
            if is_observer:
                continue

            # DEPRECATED: Rating changes are persisted by the rating service
            # in the `leaderboard_rating_journal` table.
            mean, deviation = player.ratings[self.rating_type]

            query_args.append(
                {
                    "gameId": self.id,
                    "playerId": player.id,
                    "faction": options["Faction"],
                    "color": options["Color"],
                    "team": options["Team"],
                    "place": options["StartSpot"],
                    "mean": mean,
                    "deviation": deviation,
                    "AI": 0,
                    "score": 0,
                }
            )
        if not query_args:
            self._logger.warning("No player options available!")
            return

        try:
            async with self._db.acquire() as conn:
                await conn.execute(game_player_stats.insert().values(query_args))
        except DBAPIError:
            self._logger.exception(
                "Failed to update game_player_stats. Query args %s:", query_args
            )
            raise

    async def mark_invalid(self, new_validity_state: ValidityState):
        self._logger.info(
            "Marked as invalid because: %s", repr(new_validity_state)
        )
        self.validity = new_validity_state

        # If we haven't started yet, the invalidity will be persisted to the database when we start.
        # Otherwise, we have to do a special update query to write this information out.
        if self.state is not GameState.LIVE:
            return

        # Currently, we can only end up here if a game desynced or was a custom game that terminated
        # too quickly.
        async with self._db.acquire() as conn:
            await conn.execute(
                game_stats.update().where(
                    game_stats.c.id == self.id
                ).values(
                    validity=new_validity_state.value
                )
            )

    def get_army_score(self, army):
        return self._results.score(army)

    def get_player_outcome(self, player: Player) -> ArmyOutcome:
        army = self.get_player_option(player.id, "Army")
        if army is None:
            return ArmyOutcome.UNKNOWN

        return self._results.outcome(army)

    def get_army_results(self, player: Player) -> ArmyResult:
        army = self.get_player_option(player.id, "Army")
        return ArmyResult(
            player.id,
            army,
            self.get_player_outcome(player).name,
            self._results.metadata(army),
        )

    def report_army_stats(self, stats_json):
        self._army_stats_list = json.loads(stats_json)["stats"]
        self._process_pending_army_stats()

    def is_visible_to_player(self, player: Player) -> bool:
        """
        Determine if a player should see this game in their games list.

        Note: This is a *hot* function, it can have significant impacts on
        performance.
        """
        if self.host is None:
            return False

        if player == self.host or player in self._connections:
            return True

        if (
            self.enforce_rating_range
            and player.ratings[self.rating_type].displayed()
            not in self.displayed_rating_range
        ):
            return False

        if self.visibility is VisibilityState.FRIENDS:
            return player.id in self.host.friends
        else:
            return player.id not in self.host.foes

    def to_dict(self):
        client_state = {
            GameState.LOBBY: "open",
            GameState.LIVE: "playing",
            GameState.ENDED: "closed",
            GameState.INITIALIZING: "closed",
        }.get(self.state, "closed")
        connected_players = self.get_connected_players()
        return {
            "command": "game_info",
            "visibility": self.visibility.value,
            "password_protected": self.password is not None,
            "uid": self.id,
            "title": self.name,
            "state": client_state,
            "game_type": self.game_type.value,
            "featured_mod": self.game_mode,
            "sim_mods": self.mods,
            "mapname": self.map.folder_name,
            # DEPRECATED: Use `mapname` instead
            "map_file_path": self.map.file_path,
            "host": self.host.login if self.host else "",
            "num_players": len(connected_players),
            "max_players": self.max_players,
            "hosted_at": self.hosted_at.isoformat() if self.hosted_at else None,
            "launched_at": self.launched_at,
            "rating_type": self.rating_type,
            "rating_min": self.displayed_rating_range.lo,
            "rating_max": self.displayed_rating_range.hi,
            "enforce_rating_range": self.enforce_rating_range,
            "teams_ids": [
                {
                    "team_id": team,
                    "player_ids": [
                        player.id for player in connected_players
                        if self.get_player_option(player.id, "Team") == team
                    ]
                }
                for team in self.teams if team is not None
            ],
            "teams": {
                team: [
                    player.login for player in connected_players
                    if self.get_player_option(player.id, "Team") == team
                ]
                for team in self.teams if team is not None
            }
        }

    def __eq__(self, other):
        if not isinstance(other, Game):
            return False
        else:
            return self.id == other.id

    def __hash__(self):
        return self.id.__hash__()

    def __str__(self) -> str:
        return (
            f"Game({self.id}, {self.host.login if self.host else ''}, "
            f"{self.map.file_path})"
        )

Object that lasts for the lifetime of a game on FAF.

Subclasses

Class variables

var game_type
var init_mode

Instance variables

prop armies : frozenset[int]
Expand source code
@property
def armies(self) -> frozenset[int]:
    return frozenset(
        self.get_player_option(player.id, "Army")
        for player in self.players
    )
prop connections : Iterable[GameConnection]
Expand source code
@property
def connections(self) -> Iterable["GameConnection"]:
    return self._connections.values()
prop has_ai : bool
Expand source code
@property
def has_ai(self) -> bool:
    return len(self.AIs) > 0
prop is_even : bool
Expand source code
@property
def is_even(self) -> bool:
    """
    If teams are balanced taking into account that players on the FFA team
    are on individual teams.

    # Returns
    `True` iff all teams have the same player count.

    Special cases:

    - `True` if there are zero teams.
    - `False` if there is a single team.
    """
    teams = self.get_team_sets()
    if len(teams) == 0:
        return True
    if len(teams) == 1:
        return False

    team_sizes = set(len(team) for team in teams)
    return len(team_sizes) == 1

If teams are balanced taking into account that players on the FFA team are on individual teams.

Returns

True iff all teams have the same player count.

Special cases:

  • True if there are zero teams.
  • False if there is a single team.
prop is_ffa : bool
Expand source code
@property
def is_ffa(self) -> bool:
    if len(self.players) < 3:
        return False

    return FFA_TEAM in self.teams
prop is_multi_team : bool
Expand source code
@property
def is_multi_team(self) -> bool:
    return len(self.teams) > 2
prop max_players : int
Expand source code
@property
def max_players(self) -> int:
    return self.game_options["Slots"]
prop name
Expand source code
@property
def name(self):
    return self._name
prop players : list[Player]
Expand source code
@property
def players(self) -> list[Player]:
    """
    Players in the game

    Depending on the state, it is either:
      - (LOBBY) The currently connected players
      - (LIVE) Players who participated in the game
    """
    if self.state is GameState.LOBBY:
        return self.get_connected_players()
    else:
        return self._players_at_launch

Players in the game

Depending on the state, it is either: - (LOBBY) The currently connected players - (LIVE) Players who participated in the game

prop teams : frozenset[int]
Expand source code
@property
def teams(self) -> frozenset[int]:
    """
    A set of all teams of this game's players.
    """
    return frozenset(
        self.get_player_option(player.id, "Team")
        for player in self.players
    )

A set of all teams of this game's players.

Methods

def add_game_connection(self, game_connection)
Expand source code
def add_game_connection(self, game_connection):
    """
    Add a game connection to this game.
    """
    if game_connection.state != GameConnectionState.CONNECTED_TO_HOST:
        raise GameError(
            f"Invalid GameConnectionState: {game_connection.state}"
        )
    if self.state is not GameState.LOBBY and self.state is not GameState.LIVE:
        raise GameError(f"Invalid GameState: {self.state}")

    self._logger.info("Added game connection %s", game_connection)
    self._connections[game_connection.player] = game_connection

Add a game connection to this game.

async def add_result(self,
reporter: int,
army: int,
result_type: str,
score: int,
result_metadata: frozenset[str] = frozenset())
Expand source code
async def add_result(
    self,
    reporter: int,
    army: int,
    result_type: str,
    score: int,
    result_metadata: frozenset[str] = frozenset(),
):
    """
    As computed by the game.

    # Params
    - `reporter`: player ID
    - `army`: the army number being reported for
    - `result_type`: a string representing the result
    - `score`: an arbitrary number assigned with the result
    - `result_metadata`: everything preceding the `result_type` in the
        result message from the game, one or more words, optional
    """
    if army not in self.armies:
        self._logger.debug(
            "Ignoring results for unknown army %s: %s %s reported by: %s",
            army, result_type, score, reporter
        )
        return

    try:
        outcome = ArmyReportedOutcome(result_type.upper())
    except ValueError:
        self._logger.debug(
            "Ignoring result reported by %s for army %s: %s %s",
            reporter, army, result_type, score
        )
        return

    result = GameResultReport(reporter, army, outcome, score, result_metadata)
    self._results.add(result)
    self._logger.info(
        "%s reported result for army %s: %s %s", reporter, army,
        result_type, score
    )

    self._process_pending_army_stats()

As computed by the game.

Params

  • reporter: player ID
  • army: the army number being reported for
  • result_type: a string representing the result
  • score: an arbitrary number assigned with the result
  • result_metadata: everything preceding the result_type in the result message from the game, one or more words, optional
async def check_game_finish(self, player)
Expand source code
async def check_game_finish(self, player):
    await self.check_sim_end()

    async with self._finish_lock:
        host_left_lobby = (
            player == self.host and self.state is not GameState.LIVE
        )

        if self.state is not GameState.ENDED and (
            self.finished or
            len(self._connections) == 0 or
            host_left_lobby
        ):
            await self.on_game_finish()
        else:
            self._process_pending_army_stats()
async def check_sim_end(self)
Expand source code
async def check_sim_end(self):
    if self.finished:
        return
    if self.state is not GameState.LIVE:
        return
    if [conn for conn in self.connections if not conn.finished_sim]:
        return
    self.finished = True
    async with self._db.acquire() as conn:
        await conn.execute(
            game_stats.update().where(
                game_stats.c.id == self.id
            ).values(
                endTime=sql_now()
            )
        )
def clear_slot(self, slot_index)
Expand source code
def clear_slot(self, slot_index):
    """
    A somewhat awkward message while we're still half-slot-associated with
    a bunch of data.

    Just makes sure that any players associated with this slot aren't
    assigned an army or team, and deletes any AI's.
    """
    for player in self.players:
        if self.get_player_option(player.id, "StartSpot") == slot_index:
            self.set_player_option(player.id, "Team", -1)
            self.set_player_option(player.id, "Army", -1)
            self.set_player_option(player.id, "StartSpot", -1)

    to_remove = []
    for ai in self.AIs:
        if self.AIs[ai]["StartSpot"] == slot_index:
            to_remove.append(ai)
    for item in to_remove:
        del self.AIs[item]

A somewhat awkward message while we're still half-slot-associated with a bunch of data.

Just makes sure that any players associated with this slot aren't assigned an army or team, and deletes any AI's.

async def disconnect_player(self,
player: Player)
Expand source code
async def disconnect_player(self, player: Player):
    if player.game_connection not in self._connections.values():
        return

    self._configured_player_ids.discard(player.id)

    if self.state is GameState.LOBBY and player.id in self._player_options:
        del self._player_options[player.id]

    await self.remove_game_connection(player.game_connection)
def get_army_results(self,
player: Player) ‑> ArmyResult
Expand source code
def get_army_results(self, player: Player) -> ArmyResult:
    army = self.get_player_option(player.id, "Army")
    return ArmyResult(
        player.id,
        army,
        self.get_player_outcome(player).name,
        self._results.metadata(army),
    )
def get_army_score(self, army)
Expand source code
def get_army_score(self, army):
    return self._results.score(army)
def get_basic_info(self) ‑> BasicGameInfo
Expand source code
def get_basic_info(self) -> BasicGameInfo:
    return BasicGameInfo(
        self.id,
        self.rating_type,
        self.map.id,
        self.game_mode,
        list(self.mods.keys()),
        self.get_team_sets(),
    )
def get_connected_players(self) ‑> list[Player]
Expand source code
def get_connected_players(self) -> list[Player]:
    """
    Get a collection of all players currently connected to the game.
    """
    return [
        player for player in self._connections.keys()
        if player.id in self._configured_player_ids
    ]

Get a collection of all players currently connected to the game.

def get_player_option(self, player_id: int, key: str) ‑> Any | None
Expand source code
def get_player_option(self, player_id: int, key: str) -> Optional[Any]:
    """
    Retrieve game-associative options for given player, by their uid
    """
    return self._player_options[player_id].get(key)

Retrieve game-associative options for given player, by their uid

def get_player_outcome(self,
player: Player) ‑> ArmyOutcome
Expand source code
def get_player_outcome(self, player: Player) -> ArmyOutcome:
    army = self.get_player_option(player.id, "Army")
    if army is None:
        return ArmyOutcome.UNKNOWN

    return self._results.outcome(army)
def get_team_sets(self) ‑> list[set[Player]]
Expand source code
def get_team_sets(self) -> list[set[Player]]:
    """
    Returns a list of teams represented as sets of players.
    Note that FFA players will be separated into individual teams.
    """
    if None in self.teams:
        raise GameError(
            "Missing team for at least one player. (player, team): {}"
            .format([(player, self.get_player_option(player.id, "Team"))
                    for player in self.players])
        )

    teams = defaultdict(set)
    ffa_players = []
    for player in self.players:
        team_id = self.get_player_option(player.id, "Team")
        if team_id == FFA_TEAM:
            ffa_players.append({player})
        else:
            teams[team_id].add(player)

    return list(teams.values()) + ffa_players

Returns a list of teams represented as sets of players. Note that FFA players will be separated into individual teams.

def is_visible_to_player(self,
player: Player) ‑> bool
Expand source code
def is_visible_to_player(self, player: Player) -> bool:
    """
    Determine if a player should see this game in their games list.

    Note: This is a *hot* function, it can have significant impacts on
    performance.
    """
    if self.host is None:
        return False

    if player == self.host or player in self._connections:
        return True

    if (
        self.enforce_rating_range
        and player.ratings[self.rating_type].displayed()
        not in self.displayed_rating_range
    ):
        return False

    if self.visibility is VisibilityState.FRIENDS:
        return player.id in self.host.friends
    else:
        return player.id not in self.host.foes

Determine if a player should see this game in their games list.

Note: This is a hot function, it can have significant impacts on performance.

async def launch(self)
Expand source code
async def launch(self):
    """
    Mark the game as live.

    Freezes the set of active players so they are remembered if they drop.
    """
    assert self.state is GameState.LOBBY
    self.launched_at = time.time()
    # Freeze currently connected players since we need them for rating when
    # the game ends.
    self._players_at_launch = [
        player for player in self.get_connected_players()
        if not self._is_observer(player)
    ]
    self._players_with_unsent_army_stats = list(self._players_at_launch)

    self.state = GameState.LIVE

    await self.on_game_launched()
    await self.validate_game_settings()

    self._logger.info("Game launched")

Mark the game as live.

Freezes the set of active players so they are remembered if they drop.

async def load_results(self)
Expand source code
async def load_results(self):
    """
    Load results from the database
    """
    self._results = await GameResultReports.from_db(self._db, self.id)

Load results from the database

async def mark_invalid(self,
new_validity_state: ValidityState)
Expand source code
async def mark_invalid(self, new_validity_state: ValidityState):
    self._logger.info(
        "Marked as invalid because: %s", repr(new_validity_state)
    )
    self.validity = new_validity_state

    # If we haven't started yet, the invalidity will be persisted to the database when we start.
    # Otherwise, we have to do a special update query to write this information out.
    if self.state is not GameState.LIVE:
        return

    # Currently, we can only end up here if a game desynced or was a custom game that terminated
    # too quickly.
    async with self._db.acquire() as conn:
        await conn.execute(
            game_stats.update().where(
                game_stats.c.id == self.id
            ).values(
                validity=new_validity_state.value
            )
        )
async def on_game_finish(self)
Expand source code
async def on_game_finish(self):
    try:
        if self.state is GameState.LOBBY:
            self._logger.info("Game cancelled pre launch")
        elif self.state is GameState.INITIALIZING:
            self._logger.info("Game cancelled pre initialization")
        elif self.state is GameState.LIVE:
            self._logger.info("Game finished normally")

            if self.desyncs > 20:
                await self.mark_invalid(ValidityState.TOO_MANY_DESYNCS)
                return

            await self.process_game_results()

            self._process_pending_army_stats()
    except Exception:    # pragma: no cover
        self._logger.exception("Error during game end")
    finally:
        self.state = GameState.ENDED

        self.game_service.mark_dirty(self)
async def on_game_launched(self)
Expand source code
async def on_game_launched(self):
    for player in self.players:
        player.state = PlayerState.PLAYING
    await self.update_game_stats()
    await self.update_game_player_stats()
async def on_scenario_file_changed(self, scenario_path: pathlib.PurePath)
Expand source code
async def on_scenario_file_changed(self, scenario_path: pathlib.PurePath):
    try:
        map_folder_name = scenario_path.parts[2].lower()
    except IndexError:
        return

    self.map = await self.game_service.get_map(map_folder_name)
def on_title_changed(self, title: str)
Expand source code
def on_title_changed(self, title: str):
    with contextlib.suppress(ValueError):
        self.name = title
async def persist_results(self)
Expand source code
async def persist_results(self):
    """
    Persist game results into the database

    Requires the game to have been launched and the appropriate rows to
    exist in the database.
    """

    self._logger.debug("Saving scores from game %s", self.id)
    scores = {}
    for player in self.players:
        army = self.get_player_option(player.id, "Army")
        outcome = self.get_player_outcome(player)
        score = self.get_army_score(army)
        scores[player] = (score, outcome)
        self._logger.info(
            "Result for army %s, player: %s: score %s, outcome %s",
            army, player, score, outcome
        )

    async with self._db.acquire() as conn:
        rows = []
        for player, (score, outcome) in scores.items():
            self._logger.info(
                "Score for player %s: score %s, outcome %s",
                player, score, outcome,
            )
            rows.append(
                {
                    "score": score,
                    "result": outcome.name.upper(),
                    "game_id": self.id,
                    "player_id": player.id,
                }
            )

        update_statement = game_player_stats.update().where(
            and_(
                game_player_stats.c.gameId == bindparam("game_id"),
                game_player_stats.c.playerId == bindparam("player_id"),
            )
        ).values(
            score=bindparam("score"),
            scoreTime=sql_now(),
            result=bindparam("result"),
        )
        await conn.deadlock_retry_execute(update_statement, rows)

Persist game results into the database

Requires the game to have been launched and the appropriate rows to exist in the database.

async def process_game_results(self)
Expand source code
async def process_game_results(self):
    if not self._results:
        await self.mark_invalid(ValidityState.UNKNOWN_RESULT)
        return

    await self.persist_results()

    game_results = await self.resolve_game_results()
    await self.game_service.publish_game_results(game_results)
async def remove_game_connection(self, game_connection)
Expand source code
async def remove_game_connection(self, game_connection):
    """
    Remove a game connection from this game.

    Will trigger `on_game_finish` if there are no more active connections to the
    game.
    """
    if game_connection not in self._connections.values():
        return

    player = game_connection.player
    del self._connections[player]
    del player.game

    self._logger.info("Removed game connection %s", game_connection)

    await self.check_game_finish(player)

Remove a game connection from this game.

Will trigger on_game_finish if there are no more active connections to the game.

def report_army_stats(self, stats_json)
Expand source code
def report_army_stats(self, stats_json):
    self._army_stats_list = json.loads(stats_json)["stats"]
    self._process_pending_army_stats()
async def resolve_game_results(self) ‑> EndedGameInfo
Expand source code
async def resolve_game_results(self) -> EndedGameInfo:
    if self.state not in (GameState.LIVE, GameState.ENDED):
        raise GameError("Cannot rate game that has not been launched.")

    await self._run_pre_rate_validity_checks()

    basic_info = self.get_basic_info()

    team_army_results = [
        [self.get_army_results(player) for player in team]
        for team in basic_info.teams
    ]

    team_outcomes = [GameOutcome.UNKNOWN for _ in basic_info.teams]
    team_player_partial_outcomes = [
        {self.get_player_outcome(player) for player in team}
        for team in basic_info.teams
    ]

    try:
        # TODO: Remove override once game result messages are reliable
        team_outcomes = (
            self._outcome_override_hook()
            or resolve_game(team_player_partial_outcomes)
        )
    except GameResolutionError:
        if self.validity is ValidityState.VALID:
            await self.mark_invalid(ValidityState.UNKNOWN_RESULT)

    try:
        commander_kills = {
            army_stats["name"]: army_stats["units"]["cdr"]["kills"]
            for army_stats in self._army_stats_list
        }
    except KeyError:
        commander_kills = {}

    return EndedGameInfo.from_basic(
        basic_info,
        self.validity,
        team_outcomes,
        commander_kills,
        team_army_results,
    )
def set_ai_option(self, name, key, value)
Expand source code
def set_ai_option(self, name, key, value):
    """
    Set game-associative options for given AI, by name
    """
    if name not in self.AIs:
        self.AIs[name] = {}
    self.AIs[name][key] = value

Set game-associative options for given AI, by name

def set_hosted(self)
Expand source code
def set_hosted(self):
    self._hosted_future.set_result(None)
    self.hosted_at = datetime_now()
def set_name_unchecked(self, value: str)
Expand source code
def set_name_unchecked(self, value: str):
    """
    Sets the game name without doing any validity checks.

    Truncates the game name to avoid crashing mysql INSERT statements.
    """
    max_len = game_stats.c.gameName.type.length
    self._name = value[:max_len]

Sets the game name without doing any validity checks.

Truncates the game name to avoid crashing mysql INSERT statements.

def set_player_option(self, player_id: int, key: str, value: Any)
Expand source code
def set_player_option(self, player_id: int, key: str, value: Any):
    """
    Set game-associative options for given player, by id
    """
    self._configured_player_ids.add(player_id)
    self._player_options[player_id][key] = value

Set game-associative options for given player, by id

async def timeout_game(self, timeout: int = 60)
Expand source code
async def timeout_game(self, timeout: int = 60):
    await asyncio.sleep(timeout)
    if self.state is GameState.INITIALIZING:
        self._logger.debug("Game setup timed out, cancelling game")
        await self.on_game_finish()
def to_dict(self)
Expand source code
def to_dict(self):
    client_state = {
        GameState.LOBBY: "open",
        GameState.LIVE: "playing",
        GameState.ENDED: "closed",
        GameState.INITIALIZING: "closed",
    }.get(self.state, "closed")
    connected_players = self.get_connected_players()
    return {
        "command": "game_info",
        "visibility": self.visibility.value,
        "password_protected": self.password is not None,
        "uid": self.id,
        "title": self.name,
        "state": client_state,
        "game_type": self.game_type.value,
        "featured_mod": self.game_mode,
        "sim_mods": self.mods,
        "mapname": self.map.folder_name,
        # DEPRECATED: Use `mapname` instead
        "map_file_path": self.map.file_path,
        "host": self.host.login if self.host else "",
        "num_players": len(connected_players),
        "max_players": self.max_players,
        "hosted_at": self.hosted_at.isoformat() if self.hosted_at else None,
        "launched_at": self.launched_at,
        "rating_type": self.rating_type,
        "rating_min": self.displayed_rating_range.lo,
        "rating_max": self.displayed_rating_range.hi,
        "enforce_rating_range": self.enforce_rating_range,
        "teams_ids": [
            {
                "team_id": team,
                "player_ids": [
                    player.id for player in connected_players
                    if self.get_player_option(player.id, "Team") == team
                ]
            }
            for team in self.teams if team is not None
        ],
        "teams": {
            team: [
                player.login for player in connected_players
                if self.get_player_option(player.id, "Team") == team
            ]
            for team in self.teams if team is not None
        }
    }
async def update_game_player_stats(self)
Expand source code
async def update_game_player_stats(self):
    query_args = []
    for player in self.players:
        options = {
            key: self.get_player_option(player.id, key)
            for key in ["Team", "StartSpot", "Color", "Faction"]
        }

        is_observer = (
            options["Team"] is None
            or options["Team"] < 0
            or options["StartSpot"] is None
            or options["StartSpot"] < 0
        )
        if is_observer:
            continue

        # DEPRECATED: Rating changes are persisted by the rating service
        # in the `leaderboard_rating_journal` table.
        mean, deviation = player.ratings[self.rating_type]

        query_args.append(
            {
                "gameId": self.id,
                "playerId": player.id,
                "faction": options["Faction"],
                "color": options["Color"],
                "team": options["Team"],
                "place": options["StartSpot"],
                "mean": mean,
                "deviation": deviation,
                "AI": 0,
                "score": 0,
            }
        )
    if not query_args:
        self._logger.warning("No player options available!")
        return

    try:
        async with self._db.acquire() as conn:
            await conn.execute(game_player_stats.insert().values(query_args))
    except DBAPIError:
        self._logger.exception(
            "Failed to update game_player_stats. Query args %s:", query_args
        )
        raise
async def update_game_stats(self)
Expand source code
async def update_game_stats(self):
    """
    Runs at game-start to populate the game_stats table (games that start are ones we actually
    care about recording stats for, after all).
    """
    assert self.host is not None

    # Ensure map data is up to date
    self.map = await self.game_service.get_map(self.map.folder_name)

    if self.validity is ValidityState.VALID and not self.map.ranked:
        await self.mark_invalid(ValidityState.BAD_MAP)

    modId = self.game_service.featured_mods[self.game_mode].id

    # Write out the game_stats record.
    # In some cases, games can be invalidated while running: we check for
    # those cases when the game ends and update this record as appropriate.

    game_type = str(self.game_options.get("Victory").value)

    async with self._db.acquire() as conn:
        await conn.execute(
            game_stats.insert().values(
                id=self.id,
                gameType=game_type,
                gameMod=modId,
                host=self.host.id,
                mapId=self.map.id,
                gameName=self.name,
                validity=self.validity.value,
            )
        )

        if self.matchmaker_queue_id is not None:
            await conn.execute(
                matchmaker_queue_game.insert().values(
                    matchmaker_queue_id=self.matchmaker_queue_id,
                    game_stats_id=self.id,
                )
            )

Runs at game-start to populate the game_stats table (games that start are ones we actually care about recording stats for, after all).

async def validate_game_mode_settings(self)
Expand source code
async def validate_game_mode_settings(self):
    """
    A subset of checks that need to be overridden in coop games.
    """
    if len(self.players) < 2:
        await self.mark_invalid(ValidityState.SINGLE_PLAYER)
        return

    if None in self.teams or not self.is_even:
        await self.mark_invalid(ValidityState.UNEVEN_TEAMS_NOT_RANKED)
        return

    valid_options = {
        "Victory": (Victory.DEMORALIZATION, ValidityState.WRONG_VICTORY_CONDITION)
    }
    await self._validate_game_options(valid_options)

A subset of checks that need to be overridden in coop games.

async def validate_game_settings(self)
Expand source code
async def validate_game_settings(self):
    """
    Mark the game invalid if it has non-compliant options
    """

    # Only allow ranked mods
    for mod_id in self.mods.keys():
        if mod_id not in self.game_service.ranked_mods:
            await self.mark_invalid(ValidityState.BAD_MOD)
            return

    if self.has_ai:
        await self.mark_invalid(ValidityState.HAS_AI_PLAYERS)
        return
    if self.is_multi_team:
        await self.mark_invalid(ValidityState.MULTI_TEAM)
        return
    if self.is_ffa:
        await self.mark_invalid(ValidityState.FFA_NOT_RANKED)
        return
    valid_options = {
        "AIReplacement": (FA.DISABLED, ValidityState.HAS_AI_PLAYERS),
        "FogOfWar": ("explored", ValidityState.NO_FOG_OF_WAR),
        "CheatsEnabled": (FA.DISABLED, ValidityState.CHEATS_ENABLED),
        "PrebuiltUnits": (FA.DISABLED, ValidityState.PREBUILT_ENABLED),
        "NoRushOption": (FA.DISABLED, ValidityState.NORUSH_ENABLED),
        "RestrictedCategories": (0, ValidityState.BAD_UNIT_RESTRICTIONS),
        "TeamLock": ("locked", ValidityState.UNLOCKED_TEAMS),
        "Unranked": (FA.DISABLED, ValidityState.HOST_SET_UNRANKED)
    }
    if await self._validate_game_options(valid_options) is False:
        return

    await self.validate_game_mode_settings()

Mark the game invalid if it has non-compliant options

class GameError (*args, **kwargs)
Expand source code
class GameError(Exception):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class GameOptions (id: int, *args, **kwargs)
Expand source code
class GameOptions(dict):
    def __init__(self, id: int, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._logger = logging.getLogger(
            f"{self.__class__.__qualname__}.{id}"
        )
        self.callbacks = defaultdict(list)
        self.async_callbacks = defaultdict(list)

    def add_callback(self, key: str, callback: Callable[[Any], Any]):
        self.callbacks[key].append(callback)

    def add_async_callback(
        self,
        key: str,
        callback: Callable[[Any], Awaitable[Any]],
    ):
        self.async_callbacks[key].append(callback)

    async def set_option(self, k: str, v: Any) -> None:
        v = self._set_option(k, v)
        self._run_sync_callbacks(k, v)

        await asyncio.gather(*(
            self._log_async_exception(
                async_callback(v),
                k,
                v,
            )
            for async_callback in self.async_callbacks.get(k, ())
        ))

    def __setitem__(self, k: str, v: Any) -> None:
        v = self._set_option(k, v)
        self._run_sync_callbacks(k, v)

        for async_callback in self.async_callbacks.get(k, ()):
            asyncio.create_task(
                self._log_async_exception(
                    async_callback(v),
                    k,
                    v,
                )
            )

    def _set_option(self, k: str, v: Any) -> Any:
        """
        Set the new value potentially transforming it first. Returns the value
        that was set.
        """
        if k == "Victory" and not isinstance(v, Victory):
            victory = Victory.__members__.get(v.upper())
            if victory is None:
                victory = self.get("Victory")
                self._logger.warning(
                    "Invalid victory type '%s'! Using '%s' instead.",
                    v,
                    victory.name if victory else None,
                )
                return
            v = victory
        elif k == "Slots":
            v = int(v)
        elif k == "ScenarioFile":
            # Convert to a posix path. Since posix paths are also interpreted
            # the same way as windows paths (but not the other way around!) we
            # can do this by parsing as a PureWindowsPath first
            v = pathlib.PurePath(pathlib.PureWindowsPath(v).as_posix())

        super().__setitem__(k, v)

        return v

    def _run_sync_callbacks(self, k: str, v: Any):
        for callback in self.callbacks.get(k, ()):
            try:
                callback(v)
            except Exception:
                self._logger.exception(
                    "Error running callback for '%s' (value %r)",
                    k,
                    v,
                )

    async def _log_async_exception(self, coro: Awaitable[Any], k: str, v: Any):
        try:
            return await coro
        except Exception:
            self._logger.exception(
                "Error running async callback for '%s' (value %r)",
                k,
                v,
            )

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Ancestors

  • builtins.dict

Methods

def add_async_callback(self, key: str, callback: Callable[[Any], Awaitable[Any]])
Expand source code
def add_async_callback(
    self,
    key: str,
    callback: Callable[[Any], Awaitable[Any]],
):
    self.async_callbacks[key].append(callback)
def add_callback(self, key: str, callback: Callable[[Any], Any])
Expand source code
def add_callback(self, key: str, callback: Callable[[Any], Any]):
    self.callbacks[key].append(callback)
async def set_option(self, k: str, v: Any) ‑> None
Expand source code
async def set_option(self, k: str, v: Any) -> None:
    v = self._set_option(k, v)
    self._run_sync_callbacks(k, v)

    await asyncio.gather(*(
        self._log_async_exception(
            async_callback(v),
            k,
            v,
        )
        for async_callback in self.async_callbacks.get(k, ())
    ))