Module server.rating_service
Post-game rating functionality
Sub-modules
server.rating_service.game_rater
server.rating_service.rating_service
server.rating_service.typedefs
Classes
class GameRatingSummary (game_id: int, rating_type: str, teams: list[TeamRatingSummary])
-
Holds minimal information needed to rate a game. Fields: - game_id: id of the game to rate - rating_type: str (e.g. "ladder1v1") - teams: a list of two TeamRatingSummaries
Expand source code
class GameRatingSummary(NamedTuple): """ Holds minimal information needed to rate a game. Fields: - game_id: id of the game to rate - rating_type: str (e.g. "ladder1v1") - teams: a list of two TeamRatingSummaries """ game_id: int rating_type: str teams: list[TeamRatingSummary] @classmethod def from_game_info_dict(cls, game_info: dict[str]) -> "GameRatingSummary": if len(game_info["teams"]) != 2: raise ValueError("Detected other than two teams.") return cls( game_info["game_id"], game_info["rating_type"], [ TeamRatingSummary( GameOutcome(summary["outcome"]), set(summary["player_ids"]), summary["army_results"], ) for summary in game_info["teams"] ], )
Ancestors
- builtins.tuple
Static methods
def from_game_info_dict(game_info: dict[str]) ‑> GameRatingSummary
Instance variables
var game_id : int
-
Alias for field number 0
var rating_type : str
-
Alias for field number 1
var teams : list[TeamRatingSummary]
-
Alias for field number 2
class RatingService (database: FAFDatabase, player_service: PlayerService, message_queue_service: MessageQueueService)
-
Service responsible for calculating and saving trueskill rating updates. To avoid race conditions, rating updates from a single game ought to be atomic.
Expand source code
@with_logger class RatingService(Service): """ Service responsible for calculating and saving trueskill rating updates. To avoid race conditions, rating updates from a single game ought to be atomic. """ def __init__( self, database: FAFDatabase, player_service: PlayerService, message_queue_service: MessageQueueService ): self._db = database self._player_service_callback = player_service.signal_player_rating_change self._accept_input = False self._queue = asyncio.Queue() self._task = None self._rating_type_ids: Optional[dict[str, int]] = None self.leaderboards: dict[str, Leaderboard] = {} self._message_queue_service = message_queue_service async def initialize(self) -> None: if self._task is not None: self._logger.error("Service already runnning or not properly shut down.") return await self.update_data() self._update_cron = aiocron.crontab("*/10 * * * *", func=self.update_data) self._accept_input = True self._task = asyncio.create_task(self._handle_rating_queue()) async def update_data(self): async with self._db.acquire() as conn: initializer = leaderboard.alias() sql = select( leaderboard.c.id, leaderboard.c.technical_name, initializer.c.technical_name.label("initializer") ).select_from( leaderboard.outerjoin( initializer, leaderboard.c.initializer_id == initializer.c.id ) ) result = await conn.execute(sql) rows = result.fetchall() self.leaderboards.clear() self._rating_type_ids = {} for row in rows: self.leaderboards[row.technical_name] = Leaderboard( row.id, row.technical_name ) self._rating_type_ids[row.technical_name] = row.id # Link the initializers for row in rows: current = self.leaderboards[row.technical_name] init = self.leaderboards.get(row.initializer) if init: current.initializer = init async def enqueue(self, game_info: dict[str]) -> None: if not self._accept_input: self._logger.warning("Dropped rating request %s", game_info) raise ServiceNotReadyError( "RatingService not yet initialized or shutting down." ) summary = GameRatingSummary.from_game_info_dict(game_info) self._logger.debug("Queued up rating request %s", summary) await self._queue.put(summary) rating_service_backlog.set(self._queue.qsize()) async def _handle_rating_queue(self) -> None: self._logger.debug("RatingService started!") try: while self._accept_input or not self._queue.empty(): summary = await self._queue.get() self._logger.debug("Now rating request %s", summary) try: # Make sure we finish writing rating changes even if the # server is shutting down await asyncio.shield(self._rate(summary)) except GameRatingError: self._logger.warning("Error rating game %s", summary) except Exception: # pragma: no cover self._logger.exception("Failed rating request %s", summary) else: self._logger.debug("Done rating request.") finally: self._queue.task_done() rating_service_backlog.set(self._queue.qsize()) except asyncio.CancelledError: pass except Exception: # pragma: no cover self._logger.critical( "Unexpected exception while handling rating queue.", exc_info=True ) self._logger.debug("RatingService stopped.") async def _rate(self, summary: GameRatingSummary) -> None: assert self._rating_type_ids is not None if summary.rating_type not in self._rating_type_ids: raise GameRatingError(f"Unknown rating type {summary.rating_type}.") rater = GameRater(summary) rating_results = [] async with self._db.acquire() as conn: # Fetch all players rating info from the database player_ratings = await self._get_all_player_ratings( conn, rater.player_ids ) rating_result = await self._rate_for_leaderboard( conn, summary.game_id, summary.rating_type, player_ratings, rater ) assert rating_result is not None rating_results.append(rating_result) # TODO: If we add hidden ratings, make sure to check for them here. # Hidden ratings should not affect global. # TODO: Use game_type == "matchmaker" instead? if summary.rating_type != RatingType.GLOBAL: self._logger.debug( "Performing global rating adjustment for players: %s", rater.player_ids ) adjustment_rater = AdjustmentGameRater( rater, rating_result.old_ratings ) global_rating_result = await self._rate_for_leaderboard( conn, summary.game_id, RatingType.GLOBAL, player_ratings, adjustment_rater, update_game_player_stats=False ) if global_rating_result: rating_results.append(global_rating_result) for rating_result in rating_results: await self._publish_rating_changes( rating_result.game_id, rating_result.rating_type, rating_result.old_ratings, rating_result.new_ratings, rating_result.outcome_map ) async def _rate_for_leaderboard( self, conn, game_id: int, rating_type: str, player_ratings: dict[PlayerID, PlayerRatings], rater: GameRater, update_game_player_stats: bool = True ) -> Optional[GameRatingResult]: """ Rates a game using a particular rating_type and GameRater. """ uninitialized_ratings = { # Querying the key will create the value using rating # initialization, sort of like a defaultdict. player_id: Rating(*player_ratings[player_id][rating_type]) for player_id in player_ratings.keys() if rating_type not in player_ratings[player_id] } # Initialize the ratings we need old_ratings = { player_id: Rating(*player_ratings[player_id][rating_type]) for player_id in player_ratings.keys() } new_ratings = rater.compute_rating(old_ratings) if not new_ratings: return None need_initial_ratings = { player_id: rating for player_id, rating in uninitialized_ratings.items() if player_id in new_ratings } if need_initial_ratings: # Ensure that leaderboard entries exist before calling persist. await self._create_initial_ratings( conn, rating_type, need_initial_ratings ) outcome_map = rater.get_outcome_map() # Now persist the changes for all players that get the adjustment. await self._persist_rating_changes( conn, game_id, rating_type, old_ratings, new_ratings, outcome_map, update_game_player_stats=update_game_player_stats ) return GameRatingResult( game_id, rating_type, old_ratings, new_ratings, outcome_map ) async def _create_initial_ratings( self, conn, rating_type: str, ratings: RatingDict ): assert self._rating_type_ids is not None leaderboard_id = self._rating_type_ids[rating_type] values = [ dict( login_id=player_id, mean=rating.mean, deviation=rating.dev, total_games=0, won_games=0, leaderboard_id=leaderboard_id, ) for player_id, rating in ratings.items() ] if values: await conn.execute( leaderboard_rating.insert(), values ) async def _get_all_player_ratings( self, conn, player_ids: list[PlayerID] ) -> dict[PlayerID, PlayerRatings]: sql = select( leaderboard_rating.c.login_id, leaderboard.c.technical_name, leaderboard_rating.c.mean, leaderboard_rating.c.deviation ).join(leaderboard).where( leaderboard_rating.c.login_id.in_(player_ids) ) result = await conn.execute(sql) player_ratings = { player_id: PlayerRatings(self.leaderboards, init=False) for player_id in player_ids } for row in result: player_id, rating_type = row.login_id, row.technical_name player_ratings[player_id][rating_type] = (row.mean, row.deviation) return player_ratings async def _persist_rating_changes( self, conn, game_id: int, rating_type: str, old_ratings: RatingDict, new_ratings: RatingDict, outcomes: dict[PlayerID, GameOutcome], update_game_player_stats: bool = True ) -> None: """ Persist computed ratings to the respective players' selected rating """ assert self._rating_type_ids is not None self._logger.debug("Saving rating change stats for game %i", game_id) ratings = [ (player_id, old_ratings[player_id], new_ratings[player_id]) for player_id in new_ratings.keys() ] for player_id, old_rating, new_rating in ratings: self._logger.debug( "New %s rating for player with id %s: %s -> %s", rating_type, player_id, old_rating, new_rating, ) if update_game_player_stats: # DEPRECATED: game_player_stats table contains rating data. # Use leaderboard_rating_journal instead gps_update_sql = ( game_player_stats.update() .where( and_( game_player_stats.c.playerId == bindparam("player_id"), game_player_stats.c.gameId == game_id, ) ) .values( after_mean=bindparam("after_mean"), after_deviation=bindparam("after_deviation"), mean=bindparam("mean"), deviation=bindparam("deviation"), scoreTime=func.now() ) ) try: result = await conn.execute(gps_update_sql, [ dict( player_id=player_id, after_mean=new_rating.mean, after_deviation=new_rating.dev, mean=old_rating.mean, deviation=old_rating.dev, ) for player_id, old_rating, new_rating in ratings ]) if result.rowcount != len(ratings): self._logger.warning( "gps_update_sql only updated %d out of %d rows for game_id %d", result.rowcount, len(ratings), game_id ) return except pymysql.OperationalError: # Could happen if we drop the rating columns from game_player_stats self._logger.warning( "gps_update_sql failed for game %d, ignoring...", game_id, exc_info=True ) leaderboard_id = self._rating_type_ids[rating_type] journal_insert_sql = leaderboard_rating_journal.insert().values( leaderboard_id=leaderboard_id, rating_mean_before=bindparam("rating_mean_before"), rating_deviation_before=bindparam("rating_deviation_before"), rating_mean_after=bindparam("rating_mean_after"), rating_deviation_after=bindparam("rating_deviation_after"), game_player_stats_id=select(game_player_stats.c.id).where( and_( game_player_stats.c.playerId == bindparam("player_id"), game_player_stats.c.gameId == game_id, ) ).scalar_subquery(), ) await conn.execute(journal_insert_sql, [ dict( player_id=player_id, rating_mean_before=old_rating.mean, rating_deviation_before=old_rating.dev, rating_mean_after=new_rating.mean, rating_deviation_after=new_rating.dev, ) for player_id, old_rating, new_rating in ratings ]) rating_update_sql = ( leaderboard_rating.update() .where( and_( leaderboard_rating.c.login_id == bindparam("player_id"), leaderboard_rating.c.leaderboard_id == leaderboard_id, ) ) .values( mean=bindparam("mean"), deviation=bindparam("deviation"), total_games=leaderboard_rating.c.total_games + 1, won_games=leaderboard_rating.c.won_games + bindparam("increment"), ) ) await conn.execute(rating_update_sql, [ dict( player_id=player_id, mean=new_rating.mean, deviation=new_rating.dev, increment=( 1 if outcomes[player_id] is GameOutcome.VICTORY else 0 ) ) for player_id, _, new_rating in ratings ]) for player_id, new_rating in new_ratings.items(): self._update_player_object(player_id, rating_type, new_rating) def _update_player_object( self, player_id: PlayerID, rating_type: str, new_rating: Rating ) -> None: if self._player_service_callback is None: self._logger.warning( "Tried to send rating change to player service, " "but no service was registered." ) return self._logger.debug( "Sending player rating update for player with id %i", player_id ) self._player_service_callback(player_id, rating_type, new_rating) async def _publish_rating_changes( self, game_id: int, rating_type: str, old_ratings: RatingDict, new_ratings: RatingDict, outcomes: dict[PlayerID, GameOutcome], ): for player_id, new_rating in new_ratings.items(): if player_id not in outcomes: self._logger.error("Missing outcome for player %i", player_id) continue if player_id not in old_ratings: self._logger.error("Missing old rating for player %i", player_id) continue old_rating = old_ratings[player_id] rating_change_dict = { "game_id": game_id, "player_id": player_id, "rating_type": rating_type, "new_rating_mean": new_rating.mean, "new_rating_deviation": new_rating.dev, "old_rating_mean": old_rating.mean, "old_rating_deviation": old_rating.dev, "outcome": outcomes[player_id].value } await self._message_queue_service.publish( config.MQ_EXCHANGE_NAME, "success.rating.update", rating_change_dict, ) async def _join_rating_queue(self) -> None: """ Offers a call that is blocking until the rating queue has been emptied. Mostly for testing purposes. """ await self._queue.join() async def shutdown(self) -> None: """ Finish rating all remaining games, then exit. """ self._accept_input = False self._logger.debug( "Shutdown initiated. Waiting on current queue: %s", self._queue ) if self._queue.empty() and self._task: self._task.cancel() await self._queue.join() self._task = None self._logger.debug("Queue emptied: %s", self._queue) def kill(self) -> None: """ Exit without waiting for the queue to join. """ self._accept_input = False if self._task is not None: self._task.cancel() self._task = None
Ancestors
Methods
async def enqueue(self, game_info: dict[str]) ‑> None
def kill(self) ‑> None
-
Exit without waiting for the queue to join.
async def shutdown(self) ‑> None
-
Finish rating all remaining games, then exit.
async def update_data(self)
Inherited members