Module server.matchmaker
The matchmaker system
Used for keeping track of queues of players wanting to play specific kinds of
games, currently just used for 1v1 ladder
.
Sub-modules
server.matchmaker.algorithm
-
Matchmaker algorithm implementations
server.matchmaker.map_pool
server.matchmaker.matchmaker_queue
server.matchmaker.pop_timer
server.matchmaker.search
Classes
class CombinedSearch (*searches: Search)
-
Expand source code
class CombinedSearch(Search): def __init__(self, *searches: Search): assert searches rating_type = searches[0].rating_type assert all(map(lambda s: s.rating_type == rating_type, searches)) self.rating_type = rating_type self.searches = searches @property def players(self) -> list[Player]: return list(itertools.chain(*[s.players for s in self.searches])) @property def ratings(self) -> list[Rating]: return list(itertools.chain(*[s.ratings for s in self.searches])) @property def cumulative_rating(self) -> float: return sum(s.cumulative_rating for s in self.searches) @property def average_rating(self) -> float: return get_average_rating(self.searches) @property def raw_ratings(self) -> list[Rating]: return list(itertools.chain(*[s.raw_ratings for s in self.searches])) @property def displayed_ratings(self) -> list[float]: return list(itertools.chain(*[s.displayed_ratings for s in self.searches])) @property def failed_matching_attempts(self) -> int: return max(search.failed_matching_attempts for search in self.searches) def register_failed_matching_attempt(self): for search in self.searches: search.register_failed_matching_attempt() @property def match_threshold(self) -> float: """ Defines the threshold for game quality """ return min(s.match_threshold for s in self.searches) @property def is_matched(self) -> bool: return all(s.is_matched for s in self.searches) def done(self) -> bool: return all(s.done() for s in self.searches) @property def is_cancelled(self) -> bool: return any(s.is_cancelled for s in self.searches) def match(self, other: "Search"): """ Mark as matched with given opponent """ self._logger.info("Combined search matched %s with %s", self.players, other.players) for s in self.searches: s.match(other) async def await_match(self): """ Wait for this search to complete """ await asyncio.wait({s.await_match() for s in self.searches}) def cancel(self): """ Cancel searching for a match """ for s in self.searches: s.cancel() def __str__(self): return f"CombinedSearch({', '.join(str(s) for s in self.searches)})" def __repr__(self): return f"CombinedSearch({', '.join(str(s) for s in self.searches)})" def get_original_searches(self) -> list[Search]: """ Returns the searches of which this CombinedSearch is comprised """ return list(self.searches)
Represents the state of a users search for a match.
Ancestors
Instance variables
prop average_rating : float
-
Expand source code
@property def average_rating(self) -> float: return get_average_rating(self.searches)
prop cumulative_rating : float
-
Expand source code
@property def cumulative_rating(self) -> float: return sum(s.cumulative_rating for s in self.searches)
prop failed_matching_attempts : int
-
Expand source code
@property def failed_matching_attempts(self) -> int: return max(search.failed_matching_attempts for search in self.searches)
prop is_cancelled : bool
-
Expand source code
@property def is_cancelled(self) -> bool: return any(s.is_cancelled for s in self.searches)
prop is_matched : bool
-
Expand source code
@property def is_matched(self) -> bool: return all(s.is_matched for s in self.searches)
prop match_threshold : float
-
Expand source code
@property def match_threshold(self) -> float: """ Defines the threshold for game quality """ return min(s.match_threshold for s in self.searches)
Defines the threshold for game quality
prop players : list[Player]
-
Expand source code
@property def players(self) -> list[Player]: return list(itertools.chain(*[s.players for s in self.searches]))
prop ratings : list[Rating]
-
Expand source code
@property def ratings(self) -> list[Rating]: return list(itertools.chain(*[s.ratings for s in self.searches]))
prop raw_ratings : list[Rating]
-
Expand source code
@property def raw_ratings(self) -> list[Rating]: return list(itertools.chain(*[s.raw_ratings for s in self.searches]))
Methods
def done(self) ‑> bool
-
Expand source code
def done(self) -> bool: return all(s.done() for s in self.searches)
def get_original_searches(self) ‑> list[Search]
-
Expand source code
def get_original_searches(self) -> list[Search]: """ Returns the searches of which this CombinedSearch is comprised """ return list(self.searches)
Returns the searches of which this CombinedSearch is comprised
Inherited members
class MapPool (map_pool_id: int,
name: str,
maps: Iterable[MapPoolMap] = ())-
Expand source code
@with_logger class MapPool(object): def __init__( self, map_pool_id: int, name: str, maps: Iterable[MapPoolMap] = () ): self.id = map_pool_id self.name = name self.set_maps(maps) def set_maps(self, maps: Iterable[MapPoolMap]) -> None: self.maps = {map_.id: map_ for map_ in maps} def choose_map(self, played_map_ids: Iterable[int] = ()) -> Map: """ Select a random map who's id does not appear in `played_map_ids`. If all map ids appear in the list, then pick one that appears the least amount of times. """ if not self.maps: self._logger.critical( "Trying to choose a map from an empty map pool: %s", self.name ) raise RuntimeError(f"Map pool {self.name} not set!") # Make sure the counter has every available map counter = Counter(self.maps.keys()) counter.update(id_ for id_ in played_map_ids if id_ in self.maps) least_common = counter.most_common()[::-1] least_count = 1 for id_, count in least_common: if isinstance(self.maps[id_], Map): least_count = count break # Trim off the maps with higher play counts for i, (_, count) in enumerate(least_common): if count > least_count: least_common = least_common[:i] break weights = [self.maps[id_].weight for id_, _ in least_common] map_id = random.choices(least_common, weights=weights, k=1)[0][0] return self.maps[map_id].get_map() def __repr__(self) -> str: return f"MapPool({self.id}, {self.name}, {list(self.maps.values())})"
Methods
def choose_map(self, played_map_ids: Iterable[int] = ()) ‑> Map
-
Expand source code
def choose_map(self, played_map_ids: Iterable[int] = ()) -> Map: """ Select a random map who's id does not appear in `played_map_ids`. If all map ids appear in the list, then pick one that appears the least amount of times. """ if not self.maps: self._logger.critical( "Trying to choose a map from an empty map pool: %s", self.name ) raise RuntimeError(f"Map pool {self.name} not set!") # Make sure the counter has every available map counter = Counter(self.maps.keys()) counter.update(id_ for id_ in played_map_ids if id_ in self.maps) least_common = counter.most_common()[::-1] least_count = 1 for id_, count in least_common: if isinstance(self.maps[id_], Map): least_count = count break # Trim off the maps with higher play counts for i, (_, count) in enumerate(least_common): if count > least_count: least_common = least_common[:i] break weights = [self.maps[id_].weight for id_, _ in least_common] map_id = random.choices(least_common, weights=weights, k=1)[0][0] return self.maps[map_id].get_map()
Select a random map who's id does not appear in
played_map_ids
. If all map ids appear in the list, then pick one that appears the least amount of times. def set_maps(self,
maps: Iterable[MapPoolMap]) ‑> None-
Expand source code
def set_maps(self, maps: Iterable[MapPoolMap]) -> None: self.maps = {map_.id: map_ for map_ in maps}
class MatchmakerQueue (game_service: GameService,
on_match_found: Callable[[Search, Search, ForwardRef('MatchmakerQueue')], Any],
name: str,
queue_id: int,
featured_mod: str,
rating_type: str,
team_size: int = 1,
params: dict[str, typing.Any] | None = None,
map_pools: Iterable[tuple[MapPool, int | None, int | None]] = ())-
Expand source code
@with_logger class MatchmakerQueue: def __init__( self, game_service: "GameService", on_match_found: MatchFoundCallback, name: str, queue_id: int, featured_mod: str, rating_type: str, team_size: int = 1, params: Optional[dict[str, Any]] = None, map_pools: Iterable[tuple[MapPool, Optional[int], Optional[int]]] = (), ): self.game_service = game_service self.name = name self.id = queue_id self.featured_mod = featured_mod self.rating_type = rating_type self.team_size = team_size self.rating_peak = 1000.0 self.params = params or {} self.map_pools = {info[0].id: info for info in map_pools} self._queue: dict[Search, None] = OrderedDict() self.on_match_found = on_match_found self._is_running = True self.timer = PopTimer(self) self.matchmaker = TeamMatchMaker() @property def is_running(self) -> bool: return self._is_running def add_map_pool( self, map_pool: MapPool, min_rating: Optional[int], max_rating: Optional[int] ) -> None: self.map_pools[map_pool.id] = (map_pool, min_rating, max_rating) def get_map_pool_for_rating(self, rating: float) -> Optional[MapPool]: for map_pool, min_rating, max_rating in self.map_pools.values(): if min_rating is not None and rating < min_rating: continue if max_rating is not None and rating > max_rating: continue return map_pool def get_game_options(self) -> dict[str, Any]: return self.params.get("GameOptions") or None def initialize(self): asyncio.create_task(self.queue_pop_timer()) @property def num_players(self) -> int: return sum(len(search.players) for search in self._queue.keys()) async def queue_pop_timer(self) -> None: """ Periodically tries to match all Searches in the queue. The amount of time until next queue 'pop' is determined by the number of players in the queue. """ self._logger.debug("MatchmakerQueue initialized for %s", self.name) while self.is_running: try: await self.timer.next_pop() await self.find_matches() number_of_unmatched_searches = len(self._queue) metrics.unmatched_searches.labels(self.name).set( number_of_unmatched_searches ) # Any searches in the queue at this point were unable to find a # match this round and will have higher priority next round. self.game_service.mark_dirty(self) except asyncio.CancelledError: break except Exception: self._logger.exception( "Unexpected error during queue pop timer loop!" ) # To avoid potential busy loops await asyncio.sleep(1) self._logger.info("%s queue stopped", self.name) async def search(self, search: Search) -> None: """ Search for a match. Puts a search object into the queue and awaits completion. """ assert search is not None try: with MatchmakerSearchTimer(self.name): self.push(search) await search.await_match() self._logger.debug("Search complete: %s", search) except CancelledError: pass finally: # If the queue was cancelled, or some other error occurred, # make sure to clean up. self.game_service.mark_dirty(self) if search in self._queue: del self._queue[search] @synchronized(SpinLock(sleep_duration=1)) async def find_matches(self) -> None: """ Perform the matchmaking algorithm. Note that this function is synchronized such that only one instance of MatchmakerQueue can call this function at any given time. This is needed in order to safely enable multiqueuing. """ self._logger.info("Searching for matches: %s", self.name) searches = list(self._queue.keys()) if self.num_players < 2 * self.team_size: self._register_unmatched_searches(searches) return # Call self.match on all matches and filter out the ones that were cancelled loop = asyncio.get_running_loop() proposed_matches, unmatched_searches = await loop.run_in_executor( None, self.matchmaker.find, searches, self.team_size, self.rating_peak, ) # filter out matches that were cancelled matches: list[Match] = [] for match in proposed_matches: if self.match(match[0], match[1]): matches.append(match) else: unmatched_searches.extend(match) self._register_unmatched_searches(unmatched_searches) for search1, search2 in matches: self._report_party_sizes(search1) self._report_party_sizes(search2) rating_imbalance = abs(search1.cumulative_rating - search2.cumulative_rating) metrics.match_rating_imbalance.labels(self.name).observe(rating_imbalance) ratings = search1.displayed_ratings + search2.displayed_ratings rating_variety = max(ratings) - min(ratings) metrics.match_rating_variety.labels(self.name).observe(rating_variety) metrics.match_quality.labels(self.name).observe( search1.quality_with(search2) ) try: self.on_match_found(search1, search2, self) except Exception: self._logger.exception("Match callback raised an exception!") def _report_party_sizes(self, team): for search in team.get_original_searches(): metrics.matched_matchmaker_searches.labels( self.name, len(search.players) ).inc() def _register_unmatched_searches( self, unmatched_searches: list[Search], ): """ Tells all unmatched searches that they went through a failed matching attempt. """ for search in unmatched_searches: search.register_failed_matching_attempt() self._logger.debug( "Search %s remained unmatched at threshold %f in attempt number %s", search, search.match_threshold, search.failed_matching_attempts ) def push(self, search: Search): """ Push the given search object onto the queue """ self._queue[search] = None self.game_service.mark_dirty(self) def match(self, s1: Search, s2: Search) -> bool: """ Mark the given two searches as matched # Returns `True` if matching succeeded or `False` if matching failed. """ if s1.is_matched or s2.is_matched: return False if s1.is_cancelled or s2.is_cancelled: return False # Additional failsafe. Ideally this check will never fail. if any( player.state != PlayerState.SEARCHING_LADDER for player in s1.players + s2.players ): self._logger.warning( "Tried to match searches %s and %s while some players had " "invalid states: team1: %s team2: %s", s1, s2, list(p.state for p in s1.players), list(p.state for p in s2.players) ) return False s1.match(s2) s2.match(s1) if s1 in self._queue: del self._queue[s1] if s2 in self._queue: del self._queue[s2] return True def shutdown(self): self._is_running = False self.timer.cancel() def to_dict(self): """ Return a fuzzy representation of the searches currently in the queue """ return { "queue_name": self.name, "queue_pop_time": datetime.fromtimestamp( self.timer.next_queue_pop, timezone.utc ).isoformat(), "queue_pop_time_delta": round( self.timer.next_queue_pop - time.time(), ndigits=2 ), "num_players": self.num_players, "boundary_80s": [search.boundary_80 for search in self._queue.keys()], "boundary_75s": [search.boundary_75 for search in self._queue.keys()], # TODO: Remove, the client should query the API for this "team_size": self.team_size, } def __repr__(self): return repr(self._queue)
Instance variables
prop is_running : bool
-
Expand source code
@property def is_running(self) -> bool: return self._is_running
prop num_players : int
-
Expand source code
@property def num_players(self) -> int: return sum(len(search.players) for search in self._queue.keys())
Methods
def add_map_pool(self,
map_pool: MapPool,
min_rating: int | None,
max_rating: int | None) ‑> None-
Expand source code
def add_map_pool( self, map_pool: MapPool, min_rating: Optional[int], max_rating: Optional[int] ) -> None: self.map_pools[map_pool.id] = (map_pool, min_rating, max_rating)
async def find_matches(self) ‑> None
-
Expand source code
@synchronized(SpinLock(sleep_duration=1)) async def find_matches(self) -> None: """ Perform the matchmaking algorithm. Note that this function is synchronized such that only one instance of MatchmakerQueue can call this function at any given time. This is needed in order to safely enable multiqueuing. """ self._logger.info("Searching for matches: %s", self.name) searches = list(self._queue.keys()) if self.num_players < 2 * self.team_size: self._register_unmatched_searches(searches) return # Call self.match on all matches and filter out the ones that were cancelled loop = asyncio.get_running_loop() proposed_matches, unmatched_searches = await loop.run_in_executor( None, self.matchmaker.find, searches, self.team_size, self.rating_peak, ) # filter out matches that were cancelled matches: list[Match] = [] for match in proposed_matches: if self.match(match[0], match[1]): matches.append(match) else: unmatched_searches.extend(match) self._register_unmatched_searches(unmatched_searches) for search1, search2 in matches: self._report_party_sizes(search1) self._report_party_sizes(search2) rating_imbalance = abs(search1.cumulative_rating - search2.cumulative_rating) metrics.match_rating_imbalance.labels(self.name).observe(rating_imbalance) ratings = search1.displayed_ratings + search2.displayed_ratings rating_variety = max(ratings) - min(ratings) metrics.match_rating_variety.labels(self.name).observe(rating_variety) metrics.match_quality.labels(self.name).observe( search1.quality_with(search2) ) try: self.on_match_found(search1, search2, self) except Exception: self._logger.exception("Match callback raised an exception!")
Perform the matchmaking algorithm.
Note that this function is synchronized such that only one instance of MatchmakerQueue can call this function at any given time. This is needed in order to safely enable multiqueuing.
def get_game_options(self) ‑> dict[str, typing.Any]
-
Expand source code
def get_game_options(self) -> dict[str, Any]: return self.params.get("GameOptions") or None
def get_map_pool_for_rating(self, rating: float) ‑> MapPool | None
-
Expand source code
def get_map_pool_for_rating(self, rating: float) -> Optional[MapPool]: for map_pool, min_rating, max_rating in self.map_pools.values(): if min_rating is not None and rating < min_rating: continue if max_rating is not None and rating > max_rating: continue return map_pool
def initialize(self)
-
Expand source code
def initialize(self): asyncio.create_task(self.queue_pop_timer())
def match(self,
s1: Search,
s2: Search) ‑> bool-
Expand source code
def match(self, s1: Search, s2: Search) -> bool: """ Mark the given two searches as matched # Returns `True` if matching succeeded or `False` if matching failed. """ if s1.is_matched or s2.is_matched: return False if s1.is_cancelled or s2.is_cancelled: return False # Additional failsafe. Ideally this check will never fail. if any( player.state != PlayerState.SEARCHING_LADDER for player in s1.players + s2.players ): self._logger.warning( "Tried to match searches %s and %s while some players had " "invalid states: team1: %s team2: %s", s1, s2, list(p.state for p in s1.players), list(p.state for p in s2.players) ) return False s1.match(s2) s2.match(s1) if s1 in self._queue: del self._queue[s1] if s2 in self._queue: del self._queue[s2] return True
Mark the given two searches as matched
Returns
True
if matching succeeded orFalse
if matching failed. def push(self,
search: Search)-
Expand source code
def push(self, search: Search): """ Push the given search object onto the queue """ self._queue[search] = None self.game_service.mark_dirty(self)
Push the given search object onto the queue
async def queue_pop_timer(self) ‑> None
-
Expand source code
async def queue_pop_timer(self) -> None: """ Periodically tries to match all Searches in the queue. The amount of time until next queue 'pop' is determined by the number of players in the queue. """ self._logger.debug("MatchmakerQueue initialized for %s", self.name) while self.is_running: try: await self.timer.next_pop() await self.find_matches() number_of_unmatched_searches = len(self._queue) metrics.unmatched_searches.labels(self.name).set( number_of_unmatched_searches ) # Any searches in the queue at this point were unable to find a # match this round and will have higher priority next round. self.game_service.mark_dirty(self) except asyncio.CancelledError: break except Exception: self._logger.exception( "Unexpected error during queue pop timer loop!" ) # To avoid potential busy loops await asyncio.sleep(1) self._logger.info("%s queue stopped", self.name)
Periodically tries to match all Searches in the queue. The amount of time until next queue 'pop' is determined by the number of players in the queue.
async def search(self,
search: Search) ‑> None-
Expand source code
async def search(self, search: Search) -> None: """ Search for a match. Puts a search object into the queue and awaits completion. """ assert search is not None try: with MatchmakerSearchTimer(self.name): self.push(search) await search.await_match() self._logger.debug("Search complete: %s", search) except CancelledError: pass finally: # If the queue was cancelled, or some other error occurred, # make sure to clean up. self.game_service.mark_dirty(self) if search in self._queue: del self._queue[search]
Search for a match.
Puts a search object into the queue and awaits completion.
def shutdown(self)
-
Expand source code
def shutdown(self): self._is_running = False self.timer.cancel()
def to_dict(self)
-
Expand source code
def to_dict(self): """ Return a fuzzy representation of the searches currently in the queue """ return { "queue_name": self.name, "queue_pop_time": datetime.fromtimestamp( self.timer.next_queue_pop, timezone.utc ).isoformat(), "queue_pop_time_delta": round( self.timer.next_queue_pop - time.time(), ndigits=2 ), "num_players": self.num_players, "boundary_80s": [search.boundary_80 for search in self._queue.keys()], "boundary_75s": [search.boundary_75 for search in self._queue.keys()], # TODO: Remove, the client should query the API for this "team_size": self.team_size, }
Return a fuzzy representation of the searches currently in the queue
class PopTimer (queue: MatchmakerQueue)
-
Expand source code
@with_logger class PopTimer(object): """ Calculates when the next pop should happen based on the rate of players queuing. timer = PopTimer(some_queue) # Pauses the coroutine until the next queue pop await timer.next_pop() The timer will adjust the pop times in an attempt to maintain a fixed queue size on each pop. So generally, the more people are in the queue, the shorter the time will be. The player queue rate is based on a moving average over the last few pops. The exact size can be set in config. """ def __init__(self, queue: "MatchmakerQueue"): self.queue = queue # Set up deque's for calculating a moving average self.last_queue_amounts: deque[int] = deque(maxlen=config.QUEUE_POP_TIME_MOVING_AVG_SIZE) self.last_queue_times: deque[float] = deque(maxlen=config.QUEUE_POP_TIME_MOVING_AVG_SIZE) self._last_queue_pop = time() # Optimistically schedule first pop for half of the max pop time self.next_queue_pop = self._last_queue_pop + (config.QUEUE_POP_TIME_MAX / 2) self._wait_task = None async def next_pop(self): """ Wait for the timer to pop. """ time_remaining = self.next_queue_pop - time() self._logger.info("Next %s wave happening in %is", self.queue.name, time_remaining) metrics.matchmaker_queue_pop.labels(self.queue.name).set(int(time_remaining)) self._wait_task = asyncio.create_task(asyncio.sleep(time_remaining)) await self._wait_task num_players = self.queue.num_players metrics.matchmaker_players.labels(self.queue.name).set(num_players) self._last_queue_pop = time() self.next_queue_pop = self._last_queue_pop + self.time_until_next_pop( num_players, time_remaining ) def time_until_next_pop(self, num_queued: int, time_queued: float) -> float: """ Calculate how long we should wait for the next queue to pop based on the current rate of ladder queues """ # Calculate moving average of player queue rate self.last_queue_amounts.append(num_queued) self.last_queue_times.append(time_queued) total_players = sum(self.last_queue_amounts) if total_players == 0: return config.QUEUE_POP_TIME_MAX total_times = sum(self.last_queue_times) if total_times: self._logger.debug( "Queue rate for %s: %f/s", self.queue.name, total_players / total_times ) players_per_match = self.queue.team_size * 2 desired_players = config.QUEUE_POP_DESIRED_MATCHES * players_per_match # Obtained by solving $ NUM_PLAYERS = rate * time $ for time. next_pop_time = desired_players * total_times / total_players if next_pop_time > config.QUEUE_POP_TIME_MAX: self._logger.info( "Required time (%.2fs) for %s is larger than max pop time (%ds). " "Consider increasing the max pop time", next_pop_time, self.queue.name, config.QUEUE_POP_TIME_MAX ) return config.QUEUE_POP_TIME_MAX if next_pop_time < config.QUEUE_POP_TIME_MIN: self._logger.warning( "Required time (%.2fs) for %s is lower than min pop time (%ds). ", next_pop_time, self.queue.name, config.QUEUE_POP_TIME_MIN ) return config.QUEUE_POP_TIME_MIN return next_pop_time def cancel(self): if self._wait_task: self._wait_task.cancel()
Calculates when the next pop should happen based on the rate of players queuing.
timer = PopTimer(some_queue) # Pauses the coroutine until the next queue pop await timer.next_pop()
The timer will adjust the pop times in an attempt to maintain a fixed queue size on each pop. So generally, the more people are in the queue, the shorter the time will be.
The player queue rate is based on a moving average over the last few pops. The exact size can be set in config.
Methods
def cancel(self)
-
Expand source code
def cancel(self): if self._wait_task: self._wait_task.cancel()
async def next_pop(self)
-
Expand source code
async def next_pop(self): """ Wait for the timer to pop. """ time_remaining = self.next_queue_pop - time() self._logger.info("Next %s wave happening in %is", self.queue.name, time_remaining) metrics.matchmaker_queue_pop.labels(self.queue.name).set(int(time_remaining)) self._wait_task = asyncio.create_task(asyncio.sleep(time_remaining)) await self._wait_task num_players = self.queue.num_players metrics.matchmaker_players.labels(self.queue.name).set(num_players) self._last_queue_pop = time() self.next_queue_pop = self._last_queue_pop + self.time_until_next_pop( num_players, time_remaining )
Wait for the timer to pop.
def time_until_next_pop(self, num_queued: int, time_queued: float) ‑> float
-
Expand source code
def time_until_next_pop(self, num_queued: int, time_queued: float) -> float: """ Calculate how long we should wait for the next queue to pop based on the current rate of ladder queues """ # Calculate moving average of player queue rate self.last_queue_amounts.append(num_queued) self.last_queue_times.append(time_queued) total_players = sum(self.last_queue_amounts) if total_players == 0: return config.QUEUE_POP_TIME_MAX total_times = sum(self.last_queue_times) if total_times: self._logger.debug( "Queue rate for %s: %f/s", self.queue.name, total_players / total_times ) players_per_match = self.queue.team_size * 2 desired_players = config.QUEUE_POP_DESIRED_MATCHES * players_per_match # Obtained by solving $ NUM_PLAYERS = rate * time $ for time. next_pop_time = desired_players * total_times / total_players if next_pop_time > config.QUEUE_POP_TIME_MAX: self._logger.info( "Required time (%.2fs) for %s is larger than max pop time (%ds). " "Consider increasing the max pop time", next_pop_time, self.queue.name, config.QUEUE_POP_TIME_MAX ) return config.QUEUE_POP_TIME_MAX if next_pop_time < config.QUEUE_POP_TIME_MIN: self._logger.warning( "Required time (%.2fs) for %s is lower than min pop time (%ds). ", next_pop_time, self.queue.name, config.QUEUE_POP_TIME_MIN ) return config.QUEUE_POP_TIME_MIN return next_pop_time
Calculate how long we should wait for the next queue to pop based on the current rate of ladder queues
class Search (players: list[Player],
start_time: float | None = None,
rating_type: str = 'ladder_1v1',
on_matched: Callable[[ForwardRef('Search'), ForwardRef('Search')], Any] = <function Search.<lambda>>)-
Expand source code
@with_logger class Search: """ Represents the state of a users search for a match. """ def __init__( self, players: list[Player], start_time: Optional[float] = None, rating_type: str = RatingType.LADDER_1V1, on_matched: OnMatchedCallback = lambda _1, _2: None ): assert isinstance(players, list) for player in players: assert player.ratings[rating_type] is not None self.players = players self.rating_type = rating_type self.start_time = start_time or time.time() self._match = asyncio.get_event_loop().create_future() self._failed_matching_attempts = 0 self.on_matched = on_matched # Precompute this self.quality_against_self = self.quality_with(self) def adjusted_rating(self, player: Player) -> Rating: """ Returns an adjusted mean with a simple linear interpolation between current mean and a specified base mean """ mean, dev = player.ratings[self.rating_type] game_count = player.game_count[self.rating_type] adjusted_mean = ((config.NEWBIE_MIN_GAMES - game_count) * config.NEWBIE_BASE_MEAN + game_count * mean) / config.NEWBIE_MIN_GAMES return Rating(adjusted_mean, dev) def is_newbie(self, player: Player) -> bool: return player.game_count[self.rating_type] <= config.NEWBIE_MIN_GAMES def is_single_party(self) -> bool: return len(self.players) == 1 def has_newbie(self) -> bool: for player in self.players: if self.is_newbie(player): return True return False def num_newbies(self) -> int: return sum(self.is_newbie(player) for player in self.players) def has_high_rated_player(self) -> bool: max_rating = max(self.displayed_ratings) return max_rating >= config.HIGH_RATED_PLAYER_MIN_RATING def has_top_player(self) -> bool: max_rating = max(self.displayed_ratings) return max_rating >= config.TOP_PLAYER_MIN_RATING @property def ratings(self) -> list[Rating]: ratings = [] for player, rating in zip(self.players, self.raw_ratings): # New players (less than config.NEWBIE_MIN_GAMES games) match against less skilled opponents if self.is_newbie(player): rating = self.adjusted_rating(player) ratings.append(rating) return ratings @property def cumulative_rating(self) -> float: return sum(self.displayed_ratings) @property def average_rating(self) -> float: return statistics.mean(self.displayed_ratings) @property def raw_ratings(self) -> list[Rating]: return [player.ratings[self.rating_type] for player in self.players] @property def displayed_ratings(self) -> list[float]: """ The client always displays mean - 3 * dev as a player's rating. So generally this is perceived as a player's true rating. """ return [rating.displayed() for rating in self.raw_ratings] def _nearby_rating_range(self, delta: int) -> tuple[int, int]: """ Returns 'boundary' mu values for player matching. Adjust delta for different game qualities. """ mu, _ = self.ratings[0] # Takes the rating of the first player, only works for 1v1 rounded_mu = int(math.ceil(mu / 10) * 10) # Round to 10 return rounded_mu - delta, rounded_mu + delta @property def boundary_80(self) -> tuple[int, int]: """ Achieves roughly 80% quality. """ return self._nearby_rating_range(200) @property def boundary_75(self) -> tuple[int, int]: """ Achieves roughly 75% quality. FIXME - why is it MORE restrictive??? """ return self._nearby_rating_range(100) @property def failed_matching_attempts(self) -> int: return self._failed_matching_attempts @property def search_expansion(self) -> float: """ Defines how much to expand the search range of game quality due to waiting time. The threshold will expand linearly with every failed matching attempt until it reaches the specified MAX. Top players use bigger values to make matching easier. """ if self.has_top_player(): return min( self._failed_matching_attempts * config.LADDER_TOP_PLAYER_SEARCH_EXPANSION_STEP, config.LADDER_TOP_PLAYER_SEARCH_EXPANSION_MAX ) elif self.has_newbie(): return min( self._failed_matching_attempts * config.LADDER_NEWBIE_SEARCH_EXPANSION_STEP, config.LADDER_NEWBIE_SEARCH_EXPANSION_MAX ) else: return min( self._failed_matching_attempts * config.LADDER_SEARCH_EXPANSION_STEP, config.LADDER_SEARCH_EXPANSION_MAX ) def register_failed_matching_attempt(self): """ Signal that matchmaker tried to match this search but was unsuccessful and increase internal counter by one. """ self._failed_matching_attempts += 1 @property def match_threshold(self) -> float: """ Defines the threshold for game quality The base minimum quality is determined as 80% of the quality of a game against a copy of yourself. This is decreased by `self.search_expansion` if search is to be expanded. """ return max(0.8 * self.quality_against_self - self.search_expansion, 0) def quality_with(self, other: "Search") -> float: assert all(other.raw_ratings) assert other.players team1 = [trueskill.Rating(*rating) for rating in self.ratings] team2 = [trueskill.Rating(*rating) for rating in other.ratings] return trueskill.quality([team1, team2]) @property def is_matched(self) -> bool: return self._match.done() and not self._match.cancelled() def done(self) -> bool: return self._match.done() @property def is_cancelled(self) -> bool: return self._match.cancelled() def matches_with(self, other: "Search") -> bool: """ Determine if this search is compatible with other given search according to both wishes. """ if not isinstance(other, Search): return False quality = self.quality_with(other) return self._match_quality_acceptable(other, quality) def _match_quality_acceptable(self, other: "Search", quality: float) -> bool: """ Determine if the given match quality is acceptable. This gets it's own function so we can call it from the Matchmaker using a cached `quality` value. """ # NOTE: We are assuming for optimization purposes that quality is # symmetric. If this ever changes, update here return (quality >= self.match_threshold and quality >= other.match_threshold) def match(self, other: "Search"): """ Mark as matched with given opponent """ self._logger.info("Matched %s with %s", self, other) self.on_matched(self, other) for player, raw_rating in zip(self.players, self.raw_ratings): if self.is_newbie(player): mean, dev = raw_rating adjusted_mean, adjusted_dev = self.adjusted_rating(player) self._logger.info( "Adjusted mean rating for %s with %d games from %.1f to %.1f", player.login, player.game_count[self.rating_type], mean, adjusted_mean ) self._match.set_result(other) async def await_match(self): """ Wait for this search to complete """ await asyncio.wait_for(self._match, None) return self._match def cancel(self): """ Cancel searching for a match """ self._match.cancel() def __str__(self) -> str: return ( f"Search({self.rating_type}, {self._players_repr()}, threshold=" f"{self.match_threshold:.2f}, expansion={self.search_expansion:.2f})" ) def _players_repr(self) -> str: contents = ", ".join( f"Player({p.login}, {p.id}, {p.ratings[self.rating_type]})" for p in self.players ) return f"[{contents}]" def __repr__(self) -> str: """For debugging""" return ( f"Search({[p.login for p in self.players]}, {self.average_rating}" f"{f', FMA: {self.failed_matching_attempts}' if self.failed_matching_attempts else ''}" f"{', has_newbie)' if self.has_newbie() else ')'}" ) def get_original_searches(self) -> list["Search"]: """ Returns the searches of which this Search is comprised, as if it were a CombinedSearch of one """ return [self]
Represents the state of a users search for a match.
Subclasses
Instance variables
prop average_rating : float
-
Expand source code
@property def average_rating(self) -> float: return statistics.mean(self.displayed_ratings)
prop boundary_75 : tuple[int, int]
-
Expand source code
@property def boundary_75(self) -> tuple[int, int]: """ Achieves roughly 75% quality. FIXME - why is it MORE restrictive??? """ return self._nearby_rating_range(100)
Achieves roughly 75% quality. FIXME - why is it MORE restrictive???
prop boundary_80 : tuple[int, int]
-
Expand source code
@property def boundary_80(self) -> tuple[int, int]: """ Achieves roughly 80% quality. """ return self._nearby_rating_range(200)
Achieves roughly 80% quality.
prop cumulative_rating : float
-
Expand source code
@property def cumulative_rating(self) -> float: return sum(self.displayed_ratings)
prop displayed_ratings : list[float]
-
Expand source code
@property def displayed_ratings(self) -> list[float]: """ The client always displays mean - 3 * dev as a player's rating. So generally this is perceived as a player's true rating. """ return [rating.displayed() for rating in self.raw_ratings]
The client always displays mean - 3 * dev as a player's rating. So generally this is perceived as a player's true rating.
prop failed_matching_attempts : int
-
Expand source code
@property def failed_matching_attempts(self) -> int: return self._failed_matching_attempts
prop is_cancelled : bool
-
Expand source code
@property def is_cancelled(self) -> bool: return self._match.cancelled()
prop is_matched : bool
-
Expand source code
@property def is_matched(self) -> bool: return self._match.done() and not self._match.cancelled()
prop match_threshold : float
-
Expand source code
@property def match_threshold(self) -> float: """ Defines the threshold for game quality The base minimum quality is determined as 80% of the quality of a game against a copy of yourself. This is decreased by `self.search_expansion` if search is to be expanded. """ return max(0.8 * self.quality_against_self - self.search_expansion, 0)
Defines the threshold for game quality
The base minimum quality is determined as 80% of the quality of a game against a copy of yourself. This is decreased by
self.search_expansion
if search is to be expanded. prop ratings : list[Rating]
-
Expand source code
@property def ratings(self) -> list[Rating]: ratings = [] for player, rating in zip(self.players, self.raw_ratings): # New players (less than config.NEWBIE_MIN_GAMES games) match against less skilled opponents if self.is_newbie(player): rating = self.adjusted_rating(player) ratings.append(rating) return ratings
prop raw_ratings : list[Rating]
-
Expand source code
@property def raw_ratings(self) -> list[Rating]: return [player.ratings[self.rating_type] for player in self.players]
prop search_expansion : float
-
Expand source code
@property def search_expansion(self) -> float: """ Defines how much to expand the search range of game quality due to waiting time. The threshold will expand linearly with every failed matching attempt until it reaches the specified MAX. Top players use bigger values to make matching easier. """ if self.has_top_player(): return min( self._failed_matching_attempts * config.LADDER_TOP_PLAYER_SEARCH_EXPANSION_STEP, config.LADDER_TOP_PLAYER_SEARCH_EXPANSION_MAX ) elif self.has_newbie(): return min( self._failed_matching_attempts * config.LADDER_NEWBIE_SEARCH_EXPANSION_STEP, config.LADDER_NEWBIE_SEARCH_EXPANSION_MAX ) else: return min( self._failed_matching_attempts * config.LADDER_SEARCH_EXPANSION_STEP, config.LADDER_SEARCH_EXPANSION_MAX )
Defines how much to expand the search range of game quality due to waiting time.
The threshold will expand linearly with every failed matching attempt until it reaches the specified MAX.
Top players use bigger values to make matching easier.
Methods
def adjusted_rating(self,
player: Player) ‑> Rating-
Expand source code
def adjusted_rating(self, player: Player) -> Rating: """ Returns an adjusted mean with a simple linear interpolation between current mean and a specified base mean """ mean, dev = player.ratings[self.rating_type] game_count = player.game_count[self.rating_type] adjusted_mean = ((config.NEWBIE_MIN_GAMES - game_count) * config.NEWBIE_BASE_MEAN + game_count * mean) / config.NEWBIE_MIN_GAMES return Rating(adjusted_mean, dev)
Returns an adjusted mean with a simple linear interpolation between current mean and a specified base mean
async def await_match(self)
-
Expand source code
async def await_match(self): """ Wait for this search to complete """ await asyncio.wait_for(self._match, None) return self._match
Wait for this search to complete
def cancel(self)
-
Expand source code
def cancel(self): """ Cancel searching for a match """ self._match.cancel()
Cancel searching for a match
def done(self) ‑> bool
-
Expand source code
def done(self) -> bool: return self._match.done()
def get_original_searches(self) ‑> list['Search']
-
Expand source code
def get_original_searches(self) -> list["Search"]: """ Returns the searches of which this Search is comprised, as if it were a CombinedSearch of one """ return [self]
Returns the searches of which this Search is comprised, as if it were a CombinedSearch of one
def has_high_rated_player(self) ‑> bool
-
Expand source code
def has_high_rated_player(self) -> bool: max_rating = max(self.displayed_ratings) return max_rating >= config.HIGH_RATED_PLAYER_MIN_RATING
def has_newbie(self) ‑> bool
-
Expand source code
def has_newbie(self) -> bool: for player in self.players: if self.is_newbie(player): return True return False
def has_top_player(self) ‑> bool
-
Expand source code
def has_top_player(self) -> bool: max_rating = max(self.displayed_ratings) return max_rating >= config.TOP_PLAYER_MIN_RATING
def is_newbie(self,
player: Player) ‑> bool-
Expand source code
def is_newbie(self, player: Player) -> bool: return player.game_count[self.rating_type] <= config.NEWBIE_MIN_GAMES
def is_single_party(self) ‑> bool
-
Expand source code
def is_single_party(self) -> bool: return len(self.players) == 1
def match(self,
other: Search)-
Expand source code
def match(self, other: "Search"): """ Mark as matched with given opponent """ self._logger.info("Matched %s with %s", self, other) self.on_matched(self, other) for player, raw_rating in zip(self.players, self.raw_ratings): if self.is_newbie(player): mean, dev = raw_rating adjusted_mean, adjusted_dev = self.adjusted_rating(player) self._logger.info( "Adjusted mean rating for %s with %d games from %.1f to %.1f", player.login, player.game_count[self.rating_type], mean, adjusted_mean ) self._match.set_result(other)
Mark as matched with given opponent
def matches_with(self,
other: Search) ‑> bool-
Expand source code
def matches_with(self, other: "Search") -> bool: """ Determine if this search is compatible with other given search according to both wishes. """ if not isinstance(other, Search): return False quality = self.quality_with(other) return self._match_quality_acceptable(other, quality)
Determine if this search is compatible with other given search according to both wishes.
def num_newbies(self) ‑> int
-
Expand source code
def num_newbies(self) -> int: return sum(self.is_newbie(player) for player in self.players)
def quality_with(self,
other: Search) ‑> float-
Expand source code
def quality_with(self, other: "Search") -> float: assert all(other.raw_ratings) assert other.players team1 = [trueskill.Rating(*rating) for rating in self.ratings] team2 = [trueskill.Rating(*rating) for rating in other.ratings] return trueskill.quality([team1, team2])
def register_failed_matching_attempt(self)
-
Expand source code
def register_failed_matching_attempt(self): """ Signal that matchmaker tried to match this search but was unsuccessful and increase internal counter by one. """ self._failed_matching_attempts += 1
Signal that matchmaker tried to match this search but was unsuccessful and increase internal counter by one.