Module server.matchmaker.matchmaker_queue
Classes
class MatchmakerQueue (game_service: GameService,
on_match_found: Callable[[Search, Search, ForwardRef('MatchmakerQueue')], Any],
name: str,
queue_id: int,
featured_mod: str,
rating_type: str,
team_size: int = 1,
params: dict[str, typing.Any] | None = None,
map_pools: Iterable[tuple[MapPool, int | None, int | None]] = ())-
Expand source code
@with_logger class MatchmakerQueue: def __init__( self, game_service: "GameService", on_match_found: MatchFoundCallback, name: str, queue_id: int, featured_mod: str, rating_type: str, team_size: int = 1, params: Optional[dict[str, Any]] = None, map_pools: Iterable[tuple[MapPool, Optional[int], Optional[int]]] = (), ): self.game_service = game_service self.name = name self.id = queue_id self.featured_mod = featured_mod self.rating_type = rating_type self.team_size = team_size self.rating_peak = 1000.0 self.params = params or {} self.map_pools = {info[0].id: info for info in map_pools} self._queue: dict[Search, None] = OrderedDict() self.on_match_found = on_match_found self._is_running = True self.timer = PopTimer(self) self.matchmaker = TeamMatchMaker() @property def is_running(self) -> bool: return self._is_running def add_map_pool( self, map_pool: MapPool, min_rating: Optional[int], max_rating: Optional[int] ) -> None: self.map_pools[map_pool.id] = (map_pool, min_rating, max_rating) def get_map_pool_for_rating(self, rating: float) -> Optional[MapPool]: for map_pool, min_rating, max_rating in self.map_pools.values(): if min_rating is not None and rating < min_rating: continue if max_rating is not None and rating > max_rating: continue return map_pool def get_game_options(self) -> dict[str, Any]: return self.params.get("GameOptions") or None def initialize(self): asyncio.create_task(self.queue_pop_timer()) @property def num_players(self) -> int: return sum(len(search.players) for search in self._queue.keys()) async def queue_pop_timer(self) -> None: """ Periodically tries to match all Searches in the queue. The amount of time until next queue 'pop' is determined by the number of players in the queue. """ self._logger.debug("MatchmakerQueue initialized for %s", self.name) while self.is_running: try: await self.timer.next_pop() await self.find_matches() number_of_unmatched_searches = len(self._queue) metrics.unmatched_searches.labels(self.name).set( number_of_unmatched_searches ) # Any searches in the queue at this point were unable to find a # match this round and will have higher priority next round. self.game_service.mark_dirty(self) except asyncio.CancelledError: break except Exception: self._logger.exception( "Unexpected error during queue pop timer loop!" ) # To avoid potential busy loops await asyncio.sleep(1) self._logger.info("%s queue stopped", self.name) async def search(self, search: Search) -> None: """ Search for a match. Puts a search object into the queue and awaits completion. """ assert search is not None try: with MatchmakerSearchTimer(self.name): self.push(search) await search.await_match() self._logger.debug("Search complete: %s", search) except CancelledError: pass finally: # If the queue was cancelled, or some other error occurred, # make sure to clean up. self.game_service.mark_dirty(self) if search in self._queue: del self._queue[search] @synchronized(SpinLock(sleep_duration=1)) async def find_matches(self) -> None: """ Perform the matchmaking algorithm. Note that this function is synchronized such that only one instance of MatchmakerQueue can call this function at any given time. This is needed in order to safely enable multiqueuing. """ self._logger.info("Searching for matches: %s", self.name) searches = list(self._queue.keys()) if self.num_players < 2 * self.team_size: self._register_unmatched_searches(searches) return # Call self.match on all matches and filter out the ones that were cancelled loop = asyncio.get_running_loop() proposed_matches, unmatched_searches = await loop.run_in_executor( None, self.matchmaker.find, searches, self.team_size, self.rating_peak, ) # filter out matches that were cancelled matches: list[Match] = [] for match in proposed_matches: if self.match(match[0], match[1]): matches.append(match) else: unmatched_searches.extend(match) self._register_unmatched_searches(unmatched_searches) for search1, search2 in matches: self._report_party_sizes(search1) self._report_party_sizes(search2) rating_imbalance = abs(search1.cumulative_rating - search2.cumulative_rating) metrics.match_rating_imbalance.labels(self.name).observe(rating_imbalance) ratings = search1.displayed_ratings + search2.displayed_ratings rating_variety = max(ratings) - min(ratings) metrics.match_rating_variety.labels(self.name).observe(rating_variety) metrics.match_quality.labels(self.name).observe( search1.quality_with(search2) ) try: self.on_match_found(search1, search2, self) except Exception: self._logger.exception("Match callback raised an exception!") def _report_party_sizes(self, team): for search in team.get_original_searches(): metrics.matched_matchmaker_searches.labels( self.name, len(search.players) ).inc() def _register_unmatched_searches( self, unmatched_searches: list[Search], ): """ Tells all unmatched searches that they went through a failed matching attempt. """ for search in unmatched_searches: search.register_failed_matching_attempt() self._logger.debug( "Search %s remained unmatched at threshold %f in attempt number %s", search, search.match_threshold, search.failed_matching_attempts ) def push(self, search: Search): """ Push the given search object onto the queue """ self._queue[search] = None self.game_service.mark_dirty(self) def match(self, s1: Search, s2: Search) -> bool: """ Mark the given two searches as matched # Returns `True` if matching succeeded or `False` if matching failed. """ if s1.is_matched or s2.is_matched: return False if s1.is_cancelled or s2.is_cancelled: return False # Additional failsafe. Ideally this check will never fail. if any( player.state != PlayerState.SEARCHING_LADDER for player in s1.players + s2.players ): self._logger.warning( "Tried to match searches %s and %s while some players had " "invalid states: team1: %s team2: %s", s1, s2, list(p.state for p in s1.players), list(p.state for p in s2.players) ) return False s1.match(s2) s2.match(s1) if s1 in self._queue: del self._queue[s1] if s2 in self._queue: del self._queue[s2] return True def shutdown(self): self._is_running = False self.timer.cancel() def to_dict(self): """ Return a fuzzy representation of the searches currently in the queue """ return { "queue_name": self.name, "queue_pop_time": datetime.fromtimestamp( self.timer.next_queue_pop, timezone.utc ).isoformat(), "queue_pop_time_delta": round( self.timer.next_queue_pop - time.time(), ndigits=2 ), "num_players": self.num_players, "boundary_80s": [search.boundary_80 for search in self._queue.keys()], "boundary_75s": [search.boundary_75 for search in self._queue.keys()], # TODO: Remove, the client should query the API for this "team_size": self.team_size, } def __repr__(self): return repr(self._queue)
Instance variables
prop is_running : bool
-
Expand source code
@property def is_running(self) -> bool: return self._is_running
prop num_players : int
-
Expand source code
@property def num_players(self) -> int: return sum(len(search.players) for search in self._queue.keys())
Methods
def add_map_pool(self,
map_pool: MapPool,
min_rating: int | None,
max_rating: int | None) ‑> None-
Expand source code
def add_map_pool( self, map_pool: MapPool, min_rating: Optional[int], max_rating: Optional[int] ) -> None: self.map_pools[map_pool.id] = (map_pool, min_rating, max_rating)
async def find_matches(self) ‑> None
-
Expand source code
@synchronized(SpinLock(sleep_duration=1)) async def find_matches(self) -> None: """ Perform the matchmaking algorithm. Note that this function is synchronized such that only one instance of MatchmakerQueue can call this function at any given time. This is needed in order to safely enable multiqueuing. """ self._logger.info("Searching for matches: %s", self.name) searches = list(self._queue.keys()) if self.num_players < 2 * self.team_size: self._register_unmatched_searches(searches) return # Call self.match on all matches and filter out the ones that were cancelled loop = asyncio.get_running_loop() proposed_matches, unmatched_searches = await loop.run_in_executor( None, self.matchmaker.find, searches, self.team_size, self.rating_peak, ) # filter out matches that were cancelled matches: list[Match] = [] for match in proposed_matches: if self.match(match[0], match[1]): matches.append(match) else: unmatched_searches.extend(match) self._register_unmatched_searches(unmatched_searches) for search1, search2 in matches: self._report_party_sizes(search1) self._report_party_sizes(search2) rating_imbalance = abs(search1.cumulative_rating - search2.cumulative_rating) metrics.match_rating_imbalance.labels(self.name).observe(rating_imbalance) ratings = search1.displayed_ratings + search2.displayed_ratings rating_variety = max(ratings) - min(ratings) metrics.match_rating_variety.labels(self.name).observe(rating_variety) metrics.match_quality.labels(self.name).observe( search1.quality_with(search2) ) try: self.on_match_found(search1, search2, self) except Exception: self._logger.exception("Match callback raised an exception!")
Perform the matchmaking algorithm.
Note that this function is synchronized such that only one instance of MatchmakerQueue can call this function at any given time. This is needed in order to safely enable multiqueuing.
def get_game_options(self) ‑> dict[str, typing.Any]
-
Expand source code
def get_game_options(self) -> dict[str, Any]: return self.params.get("GameOptions") or None
def get_map_pool_for_rating(self, rating: float) ‑> MapPool | None
-
Expand source code
def get_map_pool_for_rating(self, rating: float) -> Optional[MapPool]: for map_pool, min_rating, max_rating in self.map_pools.values(): if min_rating is not None and rating < min_rating: continue if max_rating is not None and rating > max_rating: continue return map_pool
def initialize(self)
-
Expand source code
def initialize(self): asyncio.create_task(self.queue_pop_timer())
def match(self,
s1: Search,
s2: Search) ‑> bool-
Expand source code
def match(self, s1: Search, s2: Search) -> bool: """ Mark the given two searches as matched # Returns `True` if matching succeeded or `False` if matching failed. """ if s1.is_matched or s2.is_matched: return False if s1.is_cancelled or s2.is_cancelled: return False # Additional failsafe. Ideally this check will never fail. if any( player.state != PlayerState.SEARCHING_LADDER for player in s1.players + s2.players ): self._logger.warning( "Tried to match searches %s and %s while some players had " "invalid states: team1: %s team2: %s", s1, s2, list(p.state for p in s1.players), list(p.state for p in s2.players) ) return False s1.match(s2) s2.match(s1) if s1 in self._queue: del self._queue[s1] if s2 in self._queue: del self._queue[s2] return True
Mark the given two searches as matched
Returns
True
if matching succeeded orFalse
if matching failed. def push(self,
search: Search)-
Expand source code
def push(self, search: Search): """ Push the given search object onto the queue """ self._queue[search] = None self.game_service.mark_dirty(self)
Push the given search object onto the queue
async def queue_pop_timer(self) ‑> None
-
Expand source code
async def queue_pop_timer(self) -> None: """ Periodically tries to match all Searches in the queue. The amount of time until next queue 'pop' is determined by the number of players in the queue. """ self._logger.debug("MatchmakerQueue initialized for %s", self.name) while self.is_running: try: await self.timer.next_pop() await self.find_matches() number_of_unmatched_searches = len(self._queue) metrics.unmatched_searches.labels(self.name).set( number_of_unmatched_searches ) # Any searches in the queue at this point were unable to find a # match this round and will have higher priority next round. self.game_service.mark_dirty(self) except asyncio.CancelledError: break except Exception: self._logger.exception( "Unexpected error during queue pop timer loop!" ) # To avoid potential busy loops await asyncio.sleep(1) self._logger.info("%s queue stopped", self.name)
Periodically tries to match all Searches in the queue. The amount of time until next queue 'pop' is determined by the number of players in the queue.
async def search(self,
search: Search) ‑> None-
Expand source code
async def search(self, search: Search) -> None: """ Search for a match. Puts a search object into the queue and awaits completion. """ assert search is not None try: with MatchmakerSearchTimer(self.name): self.push(search) await search.await_match() self._logger.debug("Search complete: %s", search) except CancelledError: pass finally: # If the queue was cancelled, or some other error occurred, # make sure to clean up. self.game_service.mark_dirty(self) if search in self._queue: del self._queue[search]
Search for a match.
Puts a search object into the queue and awaits completion.
def shutdown(self)
-
Expand source code
def shutdown(self): self._is_running = False self.timer.cancel()
def to_dict(self)
-
Expand source code
def to_dict(self): """ Return a fuzzy representation of the searches currently in the queue """ return { "queue_name": self.name, "queue_pop_time": datetime.fromtimestamp( self.timer.next_queue_pop, timezone.utc ).isoformat(), "queue_pop_time_delta": round( self.timer.next_queue_pop - time.time(), ndigits=2 ), "num_players": self.num_players, "boundary_80s": [search.boundary_80 for search in self._queue.keys()], "boundary_75s": [search.boundary_75 for search in self._queue.keys()], # TODO: Remove, the client should query the API for this "team_size": self.team_size, }
Return a fuzzy representation of the searches currently in the queue
class MatchmakerSearchTimer (queue_name)
-
Expand source code
class MatchmakerSearchTimer: def __init__(self, queue_name): self.queue_name = queue_name def __enter__(self): self.start_time = time.monotonic() def __exit__(self, exc_type, exc_value, traceback): total_time = time.monotonic() - self.start_time if exc_type is None: status = "successful" elif exc_type is CancelledError: status = "cancelled" else: status = "errored" metric = metrics.matchmaker_search_duration.labels(self.queue_name, status) metric.observe(total_time)