Module server.matchmaker

The matchmaker system

Used for keeping track of queues of players wanting to play specific kinds of games, currently just used for 1v1 ladder.

Sub-modules

server.matchmaker.algorithm

Matchmaker algorithm implementations

server.matchmaker.map_pool
server.matchmaker.matchmaker_queue
server.matchmaker.pop_timer
server.matchmaker.search

Classes

class CombinedSearch (*searches: Search)

Represents the state of a users search for a match.

Expand source code
class CombinedSearch(Search):
    def __init__(self, *searches: Search):
        assert searches
        rating_type = searches[0].rating_type
        assert all(map(lambda s: s.rating_type == rating_type, searches))

        self.rating_type = rating_type
        self.searches = searches

    @property
    def players(self) -> list[Player]:
        return list(itertools.chain(*[s.players for s in self.searches]))

    @property
    def ratings(self) -> list[Rating]:
        return list(itertools.chain(*[s.ratings for s in self.searches]))

    @property
    def cumulative_rating(self) -> float:
        return sum(s.cumulative_rating for s in self.searches)

    @property
    def average_rating(self) -> float:
        return get_average_rating(self.searches)

    @property
    def raw_ratings(self) -> list[Rating]:
        return list(itertools.chain(*[s.raw_ratings for s in self.searches]))

    @property
    def displayed_ratings(self) -> list[float]:
        return list(itertools.chain(*[s.displayed_ratings for s in self.searches]))

    @property
    def failed_matching_attempts(self) -> int:
        return max(search.failed_matching_attempts for search in self.searches)

    def register_failed_matching_attempt(self):
        for search in self.searches:
            search.register_failed_matching_attempt()

    @property
    def match_threshold(self) -> float:
        """
        Defines the threshold for game quality
        """
        return min(s.match_threshold for s in self.searches)

    @property
    def is_matched(self) -> bool:
        return all(s.is_matched for s in self.searches)

    def done(self) -> bool:
        return all(s.done() for s in self.searches)

    @property
    def is_cancelled(self) -> bool:
        return any(s.is_cancelled for s in self.searches)

    def match(self, other: "Search"):
        """
        Mark as matched with given opponent
        """
        self._logger.info("Combined search matched %s with %s", self.players, other.players)

        for s in self.searches:
            s.match(other)

    async def await_match(self):
        """
        Wait for this search to complete
        """
        await asyncio.wait({s.await_match() for s in self.searches})

    def cancel(self):
        """
        Cancel searching for a match
        """
        for s in self.searches:
            s.cancel()

    def __str__(self):
        return f"CombinedSearch({', '.join(str(s) for s in self.searches)})"

    def __repr__(self):
        return f"CombinedSearch({', '.join(str(s) for s in self.searches)})"

    def get_original_searches(self) -> list[Search]:
        """
        Returns the searches of which this CombinedSearch is comprised
        """
        return list(self.searches)

Ancestors

Instance variables

prop average_rating : float
Expand source code
@property
def average_rating(self) -> float:
    return get_average_rating(self.searches)
prop cumulative_rating : float
Expand source code
@property
def cumulative_rating(self) -> float:
    return sum(s.cumulative_rating for s in self.searches)
prop failed_matching_attempts : int
Expand source code
@property
def failed_matching_attempts(self) -> int:
    return max(search.failed_matching_attempts for search in self.searches)
prop is_cancelled : bool
Expand source code
@property
def is_cancelled(self) -> bool:
    return any(s.is_cancelled for s in self.searches)
prop is_matched : bool
Expand source code
@property
def is_matched(self) -> bool:
    return all(s.is_matched for s in self.searches)
prop match_threshold : float

Defines the threshold for game quality

Expand source code
@property
def match_threshold(self) -> float:
    """
    Defines the threshold for game quality
    """
    return min(s.match_threshold for s in self.searches)
prop players : list[Player]
Expand source code
@property
def players(self) -> list[Player]:
    return list(itertools.chain(*[s.players for s in self.searches]))
prop ratings : list[Rating]
Expand source code
@property
def ratings(self) -> list[Rating]:
    return list(itertools.chain(*[s.ratings for s in self.searches]))
prop raw_ratings : list[Rating]
Expand source code
@property
def raw_ratings(self) -> list[Rating]:
    return list(itertools.chain(*[s.raw_ratings for s in self.searches]))

Methods

def done(self) ‑> bool
def get_original_searches(self) ‑> list[Search]

Returns the searches of which this CombinedSearch is comprised

Inherited members

class MapPool (map_pool_id: int, name: str, maps: Iterable[MapPoolMap] = ())
Expand source code
@with_logger
class MapPool(object):
    def __init__(
        self,
        map_pool_id: int,
        name: str,
        maps: Iterable[MapPoolMap] = ()
    ):
        self.id = map_pool_id
        self.name = name
        self.set_maps(maps)

    def set_maps(self, maps: Iterable[MapPoolMap]) -> None:
        self.maps = {map_.id: map_ for map_ in maps}

    def choose_map(self, played_map_ids: Iterable[int] = ()) -> Map:
        """
        Select a random map who's id does not appear in `played_map_ids`. If
        all map ids appear in the list, then pick one that appears the least
        amount of times.
        """
        if not self.maps:
            self._logger.critical(
                "Trying to choose a map from an empty map pool: %s", self.name
            )
            raise RuntimeError(f"Map pool {self.name} not set!")

        # Make sure the counter has every available map
        counter = Counter(self.maps.keys())
        counter.update(id_ for id_ in played_map_ids if id_ in self.maps)

        least_common = counter.most_common()[::-1]
        least_count = 1
        for id_, count in least_common:
            if isinstance(self.maps[id_], Map):
                least_count = count
                break

        # Trim off the maps with higher play counts
        for i, (_, count) in enumerate(least_common):
            if count > least_count:
                least_common = least_common[:i]
                break

        weights = [self.maps[id_].weight for id_, _ in least_common]

        map_id = random.choices(least_common, weights=weights, k=1)[0][0]
        return self.maps[map_id].get_map()

    def __repr__(self) -> str:
        return f"MapPool({self.id}, {self.name}, {list(self.maps.values())})"

Methods

def choose_map(self, played_map_ids: Iterable[int] = ()) ‑> Map

Select a random map who's id does not appear in played_map_ids. If all map ids appear in the list, then pick one that appears the least amount of times.

def set_maps(self, maps: Iterable[MapPoolMap]) ‑> None
class MatchmakerQueue (game_service: GameService, on_match_found: Callable[[SearchSearch, ForwardRef('MatchmakerQueue')], Any], name: str, queue_id: int, featured_mod: str, rating_type: str, team_size: int = 1, params: Optional[dict[str, typing.Any]] = None, map_pools: Iterable[tuple[MapPool, Optional[int], Optional[int]]] = ())
Expand source code
@with_logger
class MatchmakerQueue:
    def __init__(
        self,
        game_service: "GameService",
        on_match_found: MatchFoundCallback,
        name: str,
        queue_id: int,
        featured_mod: str,
        rating_type: str,
        team_size: int = 1,
        params: Optional[dict[str, Any]] = None,
        map_pools: Iterable[tuple[MapPool, Optional[int], Optional[int]]] = (),
    ):
        self.game_service = game_service
        self.name = name
        self.id = queue_id
        self.featured_mod = featured_mod
        self.rating_type = rating_type
        self.team_size = team_size
        self.rating_peak = 1000.0
        self.params = params or {}
        self.map_pools = {info[0].id: info for info in map_pools}

        self._queue: dict[Search, None] = OrderedDict()
        self.on_match_found = on_match_found
        self._is_running = True

        self.timer = PopTimer(self)

        self.matchmaker = TeamMatchMaker()

    @property
    def is_running(self) -> bool:
        return self._is_running

    def add_map_pool(
        self,
        map_pool: MapPool,
        min_rating: Optional[int],
        max_rating: Optional[int]
    ) -> None:
        self.map_pools[map_pool.id] = (map_pool, min_rating, max_rating)

    def get_map_pool_for_rating(self, rating: float) -> Optional[MapPool]:
        for map_pool, min_rating, max_rating in self.map_pools.values():
            if min_rating is not None and rating < min_rating:
                continue
            if max_rating is not None and rating > max_rating:
                continue
            return map_pool

    def get_game_options(self) -> dict[str, Any]:
        return self.params.get("GameOptions") or None

    def initialize(self):
        asyncio.create_task(self.queue_pop_timer())

    @property
    def num_players(self) -> int:
        return sum(len(search.players) for search in self._queue.keys())

    async def queue_pop_timer(self) -> None:
        """ Periodically tries to match all Searches in the queue. The amount
        of time until next queue 'pop' is determined by the number of players
        in the queue.
        """
        self._logger.debug("MatchmakerQueue initialized for %s", self.name)
        while self.is_running:
            try:
                await self.timer.next_pop()

                await self.find_matches()

                number_of_unmatched_searches = len(self._queue)
                metrics.unmatched_searches.labels(self.name).set(
                    number_of_unmatched_searches
                )

                # Any searches in the queue at this point were unable to find a
                # match this round and will have higher priority next round.

                self.game_service.mark_dirty(self)
            except asyncio.CancelledError:
                break
            except Exception:
                self._logger.exception(
                    "Unexpected error during queue pop timer loop!"
                )
                # To avoid potential busy loops
                await asyncio.sleep(1)
        self._logger.info("%s queue stopped", self.name)

    async def search(self, search: Search) -> None:
        """
        Search for a match.

        Puts a search object into the queue and awaits completion.
        """
        assert search is not None

        try:
            with MatchmakerSearchTimer(self.name):
                self.push(search)
                await search.await_match()
            self._logger.debug("Search complete: %s", search)
        except CancelledError:
            pass
        finally:
            # If the queue was cancelled, or some other error occurred,
            # make sure to clean up.
            self.game_service.mark_dirty(self)
            if search in self._queue:
                del self._queue[search]

    @synchronized(SpinLock(sleep_duration=1))
    async def find_matches(self) -> None:
        """
        Perform the matchmaking algorithm.

        Note that this function is synchronized such that only one instance of
        MatchmakerQueue can call this function at any given time. This is
        needed in order to safely enable multiqueuing.
        """
        self._logger.info("Searching for matches: %s", self.name)

        searches = list(self._queue.keys())

        if self.num_players < 2 * self.team_size:
            self._register_unmatched_searches(searches)
            return

        # Call self.match on all matches and filter out the ones that were cancelled
        loop = asyncio.get_running_loop()
        proposed_matches, unmatched_searches = await loop.run_in_executor(
            None,
            self.matchmaker.find,
            searches,
            self.team_size,
            self.rating_peak,
        )

        # filter out matches that were cancelled
        matches: list[Match] = []
        for match in proposed_matches:
            if self.match(match[0], match[1]):
                matches.append(match)
            else:
                unmatched_searches.extend(match)

        self._register_unmatched_searches(unmatched_searches)

        for search1, search2 in matches:
            self._report_party_sizes(search1)
            self._report_party_sizes(search2)

            rating_imbalance = abs(search1.cumulative_rating - search2.cumulative_rating)
            metrics.match_rating_imbalance.labels(self.name).observe(rating_imbalance)

            ratings = search1.displayed_ratings + search2.displayed_ratings
            rating_variety = max(ratings) - min(ratings)
            metrics.match_rating_variety.labels(self.name).observe(rating_variety)

            metrics.match_quality.labels(self.name).observe(
                search1.quality_with(search2)
            )
            try:
                self.on_match_found(search1, search2, self)
            except Exception:
                self._logger.exception("Match callback raised an exception!")

    def _report_party_sizes(self, team):
        for search in team.get_original_searches():
            metrics.matched_matchmaker_searches.labels(
                self.name, len(search.players)
            ).inc()

    def _register_unmatched_searches(
        self,
        unmatched_searches: list[Search],
    ):
        """
        Tells all unmatched searches that they went through a failed matching
        attempt.
        """
        for search in unmatched_searches:
            search.register_failed_matching_attempt()
            self._logger.debug(
                "Search %s remained unmatched at threshold %f in attempt number %s",
                search, search.match_threshold, search.failed_matching_attempts
            )

    def push(self, search: Search):
        """ Push the given search object onto the queue """

        self._queue[search] = None
        self.game_service.mark_dirty(self)

    def match(self, s1: Search, s2: Search) -> bool:
        """
        Mark the given two searches as matched

        # Returns
        `True` if matching succeeded or `False` if matching failed.
        """
        if s1.is_matched or s2.is_matched:
            return False
        if s1.is_cancelled or s2.is_cancelled:
            return False
        # Additional failsafe. Ideally this check will never fail.
        if any(
            player.state != PlayerState.SEARCHING_LADDER
            for player in s1.players + s2.players
        ):
            self._logger.warning(
                "Tried to match searches %s and %s while some players had "
                "invalid states: team1: %s team2: %s",
                s1, s2,
                list(p.state for p in s1.players),
                list(p.state for p in s2.players)
            )
            return False

        s1.match(s2)
        s2.match(s1)
        if s1 in self._queue:
            del self._queue[s1]
        if s2 in self._queue:
            del self._queue[s2]

        return True

    def shutdown(self):
        self._is_running = False
        self.timer.cancel()

    def to_dict(self):
        """
        Return a fuzzy representation of the searches currently in the queue
        """
        return {
            "queue_name": self.name,
            "queue_pop_time": datetime.fromtimestamp(
                self.timer.next_queue_pop, timezone.utc
            ).isoformat(),
            "queue_pop_time_delta": round(
                self.timer.next_queue_pop - time.time(),
                ndigits=2
            ),
            "num_players": self.num_players,
            "boundary_80s": [search.boundary_80 for search in self._queue.keys()],
            "boundary_75s": [search.boundary_75 for search in self._queue.keys()],
            # TODO: Remove, the client should query the API for this
            "team_size": self.team_size,
        }

    def __repr__(self):
        return repr(self._queue)

Instance variables

prop is_running : bool
Expand source code
@property
def is_running(self) -> bool:
    return self._is_running
prop num_players : int
Expand source code
@property
def num_players(self) -> int:
    return sum(len(search.players) for search in self._queue.keys())

Methods

def add_map_pool(self, map_pool: MapPool, min_rating: Optional[int], max_rating: Optional[int]) ‑> None
async def find_matches(self) ‑> None

Perform the matchmaking algorithm.

Note that this function is synchronized such that only one instance of MatchmakerQueue can call this function at any given time. This is needed in order to safely enable multiqueuing.

def get_game_options(self) ‑> dict[str, typing.Any]
def get_map_pool_for_rating(self, rating: float) ‑> Optional[MapPool]
def initialize(self)
def match(self, s1: Search, s2: Search) ‑> bool

Mark the given two searches as matched

Returns

True if matching succeeded or False if matching failed.

def push(self, search: Search)

Push the given search object onto the queue

async def queue_pop_timer(self) ‑> None

Periodically tries to match all Searches in the queue. The amount of time until next queue 'pop' is determined by the number of players in the queue.

async def search(self, search: Search) ‑> None

Search for a match.

Puts a search object into the queue and awaits completion.

def shutdown(self)
def to_dict(self)

Return a fuzzy representation of the searches currently in the queue

class PopTimer (queue: MatchmakerQueue)

Calculates when the next pop should happen based on the rate of players queuing.

timer = PopTimer(some_queue)
# Pauses the coroutine until the next queue pop
await timer.next_pop()

The timer will adjust the pop times in an attempt to maintain a fixed queue size on each pop. So generally, the more people are in the queue, the shorter the time will be.

The player queue rate is based on a moving average over the last few pops. The exact size can be set in config.

Expand source code
@with_logger
class PopTimer(object):
    """ Calculates when the next pop should happen based on the rate of players
    queuing.

        timer = PopTimer(some_queue)
        # Pauses the coroutine until the next queue pop
        await timer.next_pop()

    The timer will adjust the pop times in an attempt to maintain a fixed queue
    size on each pop. So generally, the more people are in the queue, the
    shorter the time will be.

    The player queue rate is based on a moving average over the last few pops.
    The exact size can be set in config.
    """

    def __init__(self, queue: "MatchmakerQueue"):
        self.queue = queue
        # Set up deque's for calculating a moving average
        self.last_queue_amounts: deque[int] = deque(maxlen=config.QUEUE_POP_TIME_MOVING_AVG_SIZE)
        self.last_queue_times: deque[float] = deque(maxlen=config.QUEUE_POP_TIME_MOVING_AVG_SIZE)

        self._last_queue_pop = time()
        # Optimistically schedule first pop for half of the max pop time
        self.next_queue_pop = self._last_queue_pop + (config.QUEUE_POP_TIME_MAX / 2)
        self._wait_task = None

    async def next_pop(self):
        """ Wait for the timer to pop. """

        time_remaining = self.next_queue_pop - time()
        self._logger.info("Next %s wave happening in %is", self.queue.name, time_remaining)
        metrics.matchmaker_queue_pop.labels(self.queue.name).set(int(time_remaining))

        self._wait_task = asyncio.create_task(asyncio.sleep(time_remaining))
        await self._wait_task

        num_players = self.queue.num_players
        metrics.matchmaker_players.labels(self.queue.name).set(num_players)

        self._last_queue_pop = time()
        self.next_queue_pop = self._last_queue_pop + self.time_until_next_pop(
            num_players, time_remaining
        )

    def time_until_next_pop(self, num_queued: int, time_queued: float) -> float:
        """ Calculate how long we should wait for the next queue to pop based
        on the current rate of ladder queues
        """
        # Calculate moving average of player queue rate
        self.last_queue_amounts.append(num_queued)
        self.last_queue_times.append(time_queued)

        total_players = sum(self.last_queue_amounts)
        if total_players == 0:
            return config.QUEUE_POP_TIME_MAX

        total_times = sum(self.last_queue_times)
        if total_times:
            self._logger.debug(
                "Queue rate for %s: %f/s", self.queue.name,
                total_players / total_times
            )

        players_per_match = self.queue.team_size * 2
        desired_players = config.QUEUE_POP_DESIRED_MATCHES * players_per_match
        # Obtained by solving $ NUM_PLAYERS = rate * time $ for time.
        next_pop_time = desired_players * total_times / total_players
        if next_pop_time > config.QUEUE_POP_TIME_MAX:
            self._logger.info(
                "Required time (%.2fs) for %s is larger than max pop time (%ds). "
                "Consider increasing the max pop time",
                next_pop_time, self.queue.name, config.QUEUE_POP_TIME_MAX
            )
            return config.QUEUE_POP_TIME_MAX

        if next_pop_time < config.QUEUE_POP_TIME_MIN:
            self._logger.warning(
                "Required time (%.2fs) for %s is lower than min pop time (%ds). ",
                next_pop_time, self.queue.name, config.QUEUE_POP_TIME_MIN
            )
            return config.QUEUE_POP_TIME_MIN

        return next_pop_time

    def cancel(self):
        if self._wait_task:
            self._wait_task.cancel()

Methods

def cancel(self)
async def next_pop(self)

Wait for the timer to pop.

def time_until_next_pop(self, num_queued: int, time_queued: float) ‑> float

Calculate how long we should wait for the next queue to pop based on the current rate of ladder queues

class Search (players: list[Player], start_time: Optional[float] = None, rating_type: str = 'ladder_1v1', on_matched: Callable[[ForwardRef('Search'), ForwardRef('Search')], Any] = <function Search.<lambda>>)

Represents the state of a users search for a match.

Expand source code
@with_logger
class Search:
    """
    Represents the state of a users search for a match.
    """

    def __init__(
        self,
        players: list[Player],
        start_time: Optional[float] = None,
        rating_type: str = RatingType.LADDER_1V1,
        on_matched: OnMatchedCallback = lambda _1, _2: None
    ):
        assert isinstance(players, list)
        for player in players:
            assert player.ratings[rating_type] is not None

        self.players = players
        self.rating_type = rating_type
        self.start_time = start_time or time.time()
        self._match = asyncio.get_event_loop().create_future()
        self._failed_matching_attempts = 0
        self.on_matched = on_matched

        # Precompute this
        self.quality_against_self = self.quality_with(self)

    def adjusted_rating(self, player: Player) -> Rating:
        """
        Returns an adjusted mean with a simple linear interpolation between current mean and a specified base mean
        """
        mean, dev = player.ratings[self.rating_type]
        game_count = player.game_count[self.rating_type]
        adjusted_mean = ((config.NEWBIE_MIN_GAMES - game_count) * config.NEWBIE_BASE_MEAN
                         + game_count * mean) / config.NEWBIE_MIN_GAMES
        return Rating(adjusted_mean, dev)

    def is_newbie(self, player: Player) -> bool:
        return player.game_count[self.rating_type] <= config.NEWBIE_MIN_GAMES

    def is_single_party(self) -> bool:
        return len(self.players) == 1

    def has_newbie(self) -> bool:
        for player in self.players:
            if self.is_newbie(player):
                return True

        return False

    def num_newbies(self) -> int:
        return sum(self.is_newbie(player) for player in self.players)

    def has_high_rated_player(self) -> bool:
        max_rating = max(self.displayed_ratings)
        return max_rating >= config.HIGH_RATED_PLAYER_MIN_RATING

    def has_top_player(self) -> bool:
        max_rating = max(self.displayed_ratings)
        return max_rating >= config.TOP_PLAYER_MIN_RATING

    @property
    def ratings(self) -> list[Rating]:
        ratings = []
        for player, rating in zip(self.players, self.raw_ratings):
            # New players (less than config.NEWBIE_MIN_GAMES games) match against less skilled opponents
            if self.is_newbie(player):
                rating = self.adjusted_rating(player)
            ratings.append(rating)
        return ratings

    @property
    def cumulative_rating(self) -> float:
        return sum(self.displayed_ratings)

    @property
    def average_rating(self) -> float:
        return statistics.mean(self.displayed_ratings)

    @property
    def raw_ratings(self) -> list[Rating]:
        return [player.ratings[self.rating_type] for player in self.players]

    @property
    def displayed_ratings(self) -> list[float]:
        """
        The client always displays mean - 3 * dev as a player's rating.
        So generally this is perceived as a player's true rating.
        """
        return [rating.displayed() for rating in self.raw_ratings]

    def _nearby_rating_range(self, delta: int) -> tuple[int, int]:
        """
        Returns 'boundary' mu values for player matching. Adjust delta for
        different game qualities.
        """
        mu, _ = self.ratings[0]  # Takes the rating of the first player, only works for 1v1
        rounded_mu = int(math.ceil(mu / 10) * 10)  # Round to 10
        return rounded_mu - delta, rounded_mu + delta

    @property
    def boundary_80(self) -> tuple[int, int]:
        """ Achieves roughly 80% quality. """
        return self._nearby_rating_range(200)

    @property
    def boundary_75(self) -> tuple[int, int]:
        """ Achieves roughly 75% quality. FIXME - why is it MORE restrictive??? """
        return self._nearby_rating_range(100)

    @property
    def failed_matching_attempts(self) -> int:
        return self._failed_matching_attempts

    @property
    def search_expansion(self) -> float:
        """
        Defines how much to expand the search range of game quality due to waiting
        time.

        The threshold will expand linearly with every failed matching attempt
        until it reaches the specified MAX.

        Top players use bigger values to make matching easier.
        """
        if self.has_top_player():
            return min(
                self._failed_matching_attempts * config.LADDER_TOP_PLAYER_SEARCH_EXPANSION_STEP,
                config.LADDER_TOP_PLAYER_SEARCH_EXPANSION_MAX
            )
        elif self.has_newbie():
            return min(
                self._failed_matching_attempts * config.LADDER_NEWBIE_SEARCH_EXPANSION_STEP,
                config.LADDER_NEWBIE_SEARCH_EXPANSION_MAX
            )
        else:
            return min(
                self._failed_matching_attempts * config.LADDER_SEARCH_EXPANSION_STEP,
                config.LADDER_SEARCH_EXPANSION_MAX
            )

    def register_failed_matching_attempt(self):
        """
        Signal that matchmaker tried to match this search but was unsuccessful
        and increase internal counter by one.
        """

        self._failed_matching_attempts += 1

    @property
    def match_threshold(self) -> float:
        """
        Defines the threshold for game quality

        The base minimum quality is determined as 80% of the quality of a game
        against a copy of yourself. This is decreased by `self.search_expansion`
        if search is to be expanded.
        """

        return max(0.8 * self.quality_against_self - self.search_expansion, 0)

    def quality_with(self, other: "Search") -> float:
        assert all(other.raw_ratings)
        assert other.players

        team1 = [trueskill.Rating(*rating) for rating in self.ratings]
        team2 = [trueskill.Rating(*rating) for rating in other.ratings]

        return trueskill.quality([team1, team2])

    @property
    def is_matched(self) -> bool:
        return self._match.done() and not self._match.cancelled()

    def done(self) -> bool:
        return self._match.done()

    @property
    def is_cancelled(self) -> bool:
        return self._match.cancelled()

    def matches_with(self, other: "Search") -> bool:
        """
        Determine if this search is compatible with other given search according
        to both wishes.
        """
        if not isinstance(other, Search):
            return False

        quality = self.quality_with(other)
        return self._match_quality_acceptable(other, quality)

    def _match_quality_acceptable(self, other: "Search", quality: float) -> bool:
        """
        Determine if the given match quality is acceptable.

        This gets it's own function so we can call it from the Matchmaker using
        a cached `quality` value.
        """
        # NOTE: We are assuming for optimization purposes that quality is
        # symmetric. If this ever changes, update here
        return (quality >= self.match_threshold and
                quality >= other.match_threshold)

    def match(self, other: "Search"):
        """
        Mark as matched with given opponent
        """
        self._logger.info("Matched %s with %s", self, other)

        self.on_matched(self, other)

        for player, raw_rating in zip(self.players, self.raw_ratings):
            if self.is_newbie(player):
                mean, dev = raw_rating
                adjusted_mean, adjusted_dev = self.adjusted_rating(player)
                self._logger.info(
                    "Adjusted mean rating for %s with %d games from %.1f to %.1f",
                    player.login,
                    player.game_count[self.rating_type],
                    mean,
                    adjusted_mean
                )
        self._match.set_result(other)

    async def await_match(self):
        """
        Wait for this search to complete
        """
        await asyncio.wait_for(self._match, None)
        return self._match

    def cancel(self):
        """
        Cancel searching for a match
        """
        self._match.cancel()

    def __str__(self) -> str:
        return (
            f"Search({self.rating_type}, {self._players_repr()}, threshold="
            f"{self.match_threshold:.2f}, expansion={self.search_expansion:.2f})"
        )

    def _players_repr(self) -> str:
        contents = ", ".join(
            f"Player({p.login}, {p.id}, {p.ratings[self.rating_type]})"
            for p in self.players
        )
        return f"[{contents}]"

    def __repr__(self) -> str:
        """For debugging"""
        return (
            f"Search({[p.login for p in self.players]}, {self.average_rating}"
            f"{f', FMA: {self.failed_matching_attempts}' if self.failed_matching_attempts else ''}"
            f"{', has_newbie)' if self.has_newbie() else ')'}"
        )

    def get_original_searches(self) -> list["Search"]:
        """
        Returns the searches of which this Search is comprised,
        as if it were a CombinedSearch of one
        """
        return [self]

Subclasses

Instance variables

prop average_rating : float
Expand source code
@property
def average_rating(self) -> float:
    return statistics.mean(self.displayed_ratings)
prop boundary_75 : tuple[int, int]

Achieves roughly 75% quality. FIXME - why is it MORE restrictive???

Expand source code
@property
def boundary_75(self) -> tuple[int, int]:
    """ Achieves roughly 75% quality. FIXME - why is it MORE restrictive??? """
    return self._nearby_rating_range(100)
prop boundary_80 : tuple[int, int]

Achieves roughly 80% quality.

Expand source code
@property
def boundary_80(self) -> tuple[int, int]:
    """ Achieves roughly 80% quality. """
    return self._nearby_rating_range(200)
prop cumulative_rating : float
Expand source code
@property
def cumulative_rating(self) -> float:
    return sum(self.displayed_ratings)
prop displayed_ratings : list[float]

The client always displays mean - 3 * dev as a player's rating. So generally this is perceived as a player's true rating.

Expand source code
@property
def displayed_ratings(self) -> list[float]:
    """
    The client always displays mean - 3 * dev as a player's rating.
    So generally this is perceived as a player's true rating.
    """
    return [rating.displayed() for rating in self.raw_ratings]
prop failed_matching_attempts : int
Expand source code
@property
def failed_matching_attempts(self) -> int:
    return self._failed_matching_attempts
prop is_cancelled : bool
Expand source code
@property
def is_cancelled(self) -> bool:
    return self._match.cancelled()
prop is_matched : bool
Expand source code
@property
def is_matched(self) -> bool:
    return self._match.done() and not self._match.cancelled()
prop match_threshold : float

Defines the threshold for game quality

The base minimum quality is determined as 80% of the quality of a game against a copy of yourself. This is decreased by self.search_expansion if search is to be expanded.

Expand source code
@property
def match_threshold(self) -> float:
    """
    Defines the threshold for game quality

    The base minimum quality is determined as 80% of the quality of a game
    against a copy of yourself. This is decreased by `self.search_expansion`
    if search is to be expanded.
    """

    return max(0.8 * self.quality_against_self - self.search_expansion, 0)
prop ratings : list[Rating]
Expand source code
@property
def ratings(self) -> list[Rating]:
    ratings = []
    for player, rating in zip(self.players, self.raw_ratings):
        # New players (less than config.NEWBIE_MIN_GAMES games) match against less skilled opponents
        if self.is_newbie(player):
            rating = self.adjusted_rating(player)
        ratings.append(rating)
    return ratings
prop raw_ratings : list[Rating]
Expand source code
@property
def raw_ratings(self) -> list[Rating]:
    return [player.ratings[self.rating_type] for player in self.players]
prop search_expansion : float

Defines how much to expand the search range of game quality due to waiting time.

The threshold will expand linearly with every failed matching attempt until it reaches the specified MAX.

Top players use bigger values to make matching easier.

Expand source code
@property
def search_expansion(self) -> float:
    """
    Defines how much to expand the search range of game quality due to waiting
    time.

    The threshold will expand linearly with every failed matching attempt
    until it reaches the specified MAX.

    Top players use bigger values to make matching easier.
    """
    if self.has_top_player():
        return min(
            self._failed_matching_attempts * config.LADDER_TOP_PLAYER_SEARCH_EXPANSION_STEP,
            config.LADDER_TOP_PLAYER_SEARCH_EXPANSION_MAX
        )
    elif self.has_newbie():
        return min(
            self._failed_matching_attempts * config.LADDER_NEWBIE_SEARCH_EXPANSION_STEP,
            config.LADDER_NEWBIE_SEARCH_EXPANSION_MAX
        )
    else:
        return min(
            self._failed_matching_attempts * config.LADDER_SEARCH_EXPANSION_STEP,
            config.LADDER_SEARCH_EXPANSION_MAX
        )

Methods

def adjusted_rating(self, player: Player) ‑> Rating

Returns an adjusted mean with a simple linear interpolation between current mean and a specified base mean

async def await_match(self)

Wait for this search to complete

def cancel(self)

Cancel searching for a match

def done(self) ‑> bool
def get_original_searches(self) ‑> list['Search']

Returns the searches of which this Search is comprised, as if it were a CombinedSearch of one

def has_high_rated_player(self) ‑> bool
def has_newbie(self) ‑> bool
def has_top_player(self) ‑> bool
def is_newbie(self, player: Player) ‑> bool
def is_single_party(self) ‑> bool
def match(self, other: Search)

Mark as matched with given opponent

def matches_with(self, other: Search) ‑> bool

Determine if this search is compatible with other given search according to both wishes.

def num_newbies(self) ‑> int
def quality_with(self, other: Search) ‑> float
def register_failed_matching_attempt(self)

Signal that matchmaker tried to match this search but was unsuccessful and increase internal counter by one.