Module server.matchmaker

The matchmaker system

Used for keeping track of queues of players wanting to play specific kinds of games, currently just used for 1v1 ladder.

Sub-modules

server.matchmaker.algorithm

Matchmaker algorithm implementations

server.matchmaker.map_pool
server.matchmaker.matchmaker_queue
server.matchmaker.pop_timer
server.matchmaker.search

Classes

class CombinedSearch (*searches: Search)
Expand source code
class CombinedSearch(Search):
    def __init__(self, *searches: Search):
        assert searches
        rating_type = searches[0].rating_type
        assert all(map(lambda s: s.rating_type == rating_type, searches))

        self.rating_type = rating_type
        self.searches = searches

    @property
    def players(self) -> list[Player]:
        return list(itertools.chain(*[s.players for s in self.searches]))

    @property
    def ratings(self) -> list[Rating]:
        return list(itertools.chain(*[s.ratings for s in self.searches]))

    @property
    def cumulative_rating(self) -> float:
        return sum(s.cumulative_rating for s in self.searches)

    @property
    def average_rating(self) -> float:
        return get_average_rating(self.searches)

    @property
    def raw_ratings(self) -> list[Rating]:
        return list(itertools.chain(*[s.raw_ratings for s in self.searches]))

    @property
    def displayed_ratings(self) -> list[float]:
        return list(itertools.chain(*[s.displayed_ratings for s in self.searches]))

    @property
    def failed_matching_attempts(self) -> int:
        return max(search.failed_matching_attempts for search in self.searches)

    def register_failed_matching_attempt(self):
        for search in self.searches:
            search.register_failed_matching_attempt()

    @property
    def match_threshold(self) -> float:
        """
        Defines the threshold for game quality
        """
        return min(s.match_threshold for s in self.searches)

    @property
    def is_matched(self) -> bool:
        return all(s.is_matched for s in self.searches)

    def done(self) -> bool:
        return all(s.done() for s in self.searches)

    @property
    def is_cancelled(self) -> bool:
        return any(s.is_cancelled for s in self.searches)

    def match(self, other: "Search"):
        """
        Mark as matched with given opponent
        """
        self._logger.info("Combined search matched %s with %s", self.players, other.players)

        for s in self.searches:
            s.match(other)

    async def await_match(self):
        """
        Wait for this search to complete
        """
        await asyncio.wait({s.await_match() for s in self.searches})

    def cancel(self):
        """
        Cancel searching for a match
        """
        for s in self.searches:
            s.cancel()

    def __str__(self):
        return f"CombinedSearch({', '.join(str(s) for s in self.searches)})"

    def __repr__(self):
        return f"CombinedSearch({', '.join(str(s) for s in self.searches)})"

    def get_original_searches(self) -> list[Search]:
        """
        Returns the searches of which this CombinedSearch is comprised
        """
        return list(self.searches)

Represents the state of a users search for a match.

Ancestors

Instance variables

prop average_rating : float
Expand source code
@property
def average_rating(self) -> float:
    return get_average_rating(self.searches)
prop cumulative_rating : float
Expand source code
@property
def cumulative_rating(self) -> float:
    return sum(s.cumulative_rating for s in self.searches)
prop failed_matching_attempts : int
Expand source code
@property
def failed_matching_attempts(self) -> int:
    return max(search.failed_matching_attempts for search in self.searches)
prop is_cancelled : bool
Expand source code
@property
def is_cancelled(self) -> bool:
    return any(s.is_cancelled for s in self.searches)
prop is_matched : bool
Expand source code
@property
def is_matched(self) -> bool:
    return all(s.is_matched for s in self.searches)
prop match_threshold : float
Expand source code
@property
def match_threshold(self) -> float:
    """
    Defines the threshold for game quality
    """
    return min(s.match_threshold for s in self.searches)

Defines the threshold for game quality

prop players : list[Player]
Expand source code
@property
def players(self) -> list[Player]:
    return list(itertools.chain(*[s.players for s in self.searches]))
prop ratings : list[Rating]
Expand source code
@property
def ratings(self) -> list[Rating]:
    return list(itertools.chain(*[s.ratings for s in self.searches]))
prop raw_ratings : list[Rating]
Expand source code
@property
def raw_ratings(self) -> list[Rating]:
    return list(itertools.chain(*[s.raw_ratings for s in self.searches]))

Methods

def done(self) ‑> bool
Expand source code
def done(self) -> bool:
    return all(s.done() for s in self.searches)
def get_original_searches(self) ‑> list[Search]
Expand source code
def get_original_searches(self) -> list[Search]:
    """
    Returns the searches of which this CombinedSearch is comprised
    """
    return list(self.searches)

Returns the searches of which this CombinedSearch is comprised

Inherited members

class MapPool (map_pool_id: int,
name: str,
maps: Iterable[MapPoolMap] = ())
Expand source code
@with_logger
class MapPool(object):
    def __init__(
        self,
        map_pool_id: int,
        name: str,
        maps: Iterable[MapPoolMap] = ()
    ):
        self.id = map_pool_id
        self.name = name
        self.set_maps(maps)

    def set_maps(self, maps: Iterable[MapPoolMap]) -> None:
        self.maps = {map_.id: map_ for map_ in maps}

    def choose_map(self, played_map_ids: Iterable[int] = ()) -> Map:
        """
        Select a random map who's id does not appear in `played_map_ids`. If
        all map ids appear in the list, then pick one that appears the least
        amount of times.
        """
        if not self.maps:
            self._logger.critical(
                "Trying to choose a map from an empty map pool: %s", self.name
            )
            raise RuntimeError(f"Map pool {self.name} not set!")

        # Make sure the counter has every available map
        counter = Counter(self.maps.keys())
        counter.update(id_ for id_ in played_map_ids if id_ in self.maps)

        least_common = counter.most_common()[::-1]
        least_count = 1
        for id_, count in least_common:
            if isinstance(self.maps[id_], Map):
                least_count = count
                break

        # Trim off the maps with higher play counts
        for i, (_, count) in enumerate(least_common):
            if count > least_count:
                least_common = least_common[:i]
                break

        weights = [self.maps[id_].weight for id_, _ in least_common]

        map_id = random.choices(least_common, weights=weights, k=1)[0][0]
        return self.maps[map_id].get_map()

    def __repr__(self) -> str:
        return f"MapPool({self.id}, {self.name}, {list(self.maps.values())})"

Methods

def choose_map(self, played_map_ids: Iterable[int] = ()) ‑> Map
Expand source code
def choose_map(self, played_map_ids: Iterable[int] = ()) -> Map:
    """
    Select a random map who's id does not appear in `played_map_ids`. If
    all map ids appear in the list, then pick one that appears the least
    amount of times.
    """
    if not self.maps:
        self._logger.critical(
            "Trying to choose a map from an empty map pool: %s", self.name
        )
        raise RuntimeError(f"Map pool {self.name} not set!")

    # Make sure the counter has every available map
    counter = Counter(self.maps.keys())
    counter.update(id_ for id_ in played_map_ids if id_ in self.maps)

    least_common = counter.most_common()[::-1]
    least_count = 1
    for id_, count in least_common:
        if isinstance(self.maps[id_], Map):
            least_count = count
            break

    # Trim off the maps with higher play counts
    for i, (_, count) in enumerate(least_common):
        if count > least_count:
            least_common = least_common[:i]
            break

    weights = [self.maps[id_].weight for id_, _ in least_common]

    map_id = random.choices(least_common, weights=weights, k=1)[0][0]
    return self.maps[map_id].get_map()

Select a random map who's id does not appear in played_map_ids. If all map ids appear in the list, then pick one that appears the least amount of times.

def set_maps(self,
maps: Iterable[MapPoolMap]) ‑> None
Expand source code
def set_maps(self, maps: Iterable[MapPoolMap]) -> None:
    self.maps = {map_.id: map_ for map_ in maps}
class MatchmakerQueue (game_service: GameService,
on_match_found: Callable[[SearchSearch, ForwardRef('MatchmakerQueue')], Any],
name: str,
queue_id: int,
featured_mod: str,
rating_type: str,
team_size: int = 1,
params: dict[str, typing.Any] | None = None,
map_pools: Iterable[tuple[MapPool, int | None, int | None]] = ())
Expand source code
@with_logger
class MatchmakerQueue:
    def __init__(
        self,
        game_service: "GameService",
        on_match_found: MatchFoundCallback,
        name: str,
        queue_id: int,
        featured_mod: str,
        rating_type: str,
        team_size: int = 1,
        params: Optional[dict[str, Any]] = None,
        map_pools: Iterable[tuple[MapPool, Optional[int], Optional[int]]] = (),
    ):
        self.game_service = game_service
        self.name = name
        self.id = queue_id
        self.featured_mod = featured_mod
        self.rating_type = rating_type
        self.team_size = team_size
        self.rating_peak = 1000.0
        self.params = params or {}
        self.map_pools = {info[0].id: info for info in map_pools}

        self._queue: dict[Search, None] = OrderedDict()
        self.on_match_found = on_match_found
        self._is_running = True

        self.timer = PopTimer(self)

        self.matchmaker = TeamMatchMaker()

    @property
    def is_running(self) -> bool:
        return self._is_running

    def add_map_pool(
        self,
        map_pool: MapPool,
        min_rating: Optional[int],
        max_rating: Optional[int]
    ) -> None:
        self.map_pools[map_pool.id] = (map_pool, min_rating, max_rating)

    def get_map_pool_for_rating(self, rating: float) -> Optional[MapPool]:
        for map_pool, min_rating, max_rating in self.map_pools.values():
            if min_rating is not None and rating < min_rating:
                continue
            if max_rating is not None and rating > max_rating:
                continue
            return map_pool

    def get_game_options(self) -> dict[str, Any]:
        return self.params.get("GameOptions") or None

    def initialize(self):
        asyncio.create_task(self.queue_pop_timer())

    @property
    def num_players(self) -> int:
        return sum(len(search.players) for search in self._queue.keys())

    async def queue_pop_timer(self) -> None:
        """ Periodically tries to match all Searches in the queue. The amount
        of time until next queue 'pop' is determined by the number of players
        in the queue.
        """
        self._logger.debug("MatchmakerQueue initialized for %s", self.name)
        while self.is_running:
            try:
                await self.timer.next_pop()

                await self.find_matches()

                number_of_unmatched_searches = len(self._queue)
                metrics.unmatched_searches.labels(self.name).set(
                    number_of_unmatched_searches
                )

                # Any searches in the queue at this point were unable to find a
                # match this round and will have higher priority next round.

                self.game_service.mark_dirty(self)
            except asyncio.CancelledError:
                break
            except Exception:
                self._logger.exception(
                    "Unexpected error during queue pop timer loop!"
                )
                # To avoid potential busy loops
                await asyncio.sleep(1)
        self._logger.info("%s queue stopped", self.name)

    async def search(self, search: Search) -> None:
        """
        Search for a match.

        Puts a search object into the queue and awaits completion.
        """
        assert search is not None

        try:
            with MatchmakerSearchTimer(self.name):
                self.push(search)
                await search.await_match()
            self._logger.debug("Search complete: %s", search)
        except CancelledError:
            pass
        finally:
            # If the queue was cancelled, or some other error occurred,
            # make sure to clean up.
            self.game_service.mark_dirty(self)
            if search in self._queue:
                del self._queue[search]

    @synchronized(SpinLock(sleep_duration=1))
    async def find_matches(self) -> None:
        """
        Perform the matchmaking algorithm.

        Note that this function is synchronized such that only one instance of
        MatchmakerQueue can call this function at any given time. This is
        needed in order to safely enable multiqueuing.
        """
        self._logger.info("Searching for matches: %s", self.name)

        searches = list(self._queue.keys())

        if self.num_players < 2 * self.team_size:
            self._register_unmatched_searches(searches)
            return

        # Call self.match on all matches and filter out the ones that were cancelled
        loop = asyncio.get_running_loop()
        proposed_matches, unmatched_searches = await loop.run_in_executor(
            None,
            self.matchmaker.find,
            searches,
            self.team_size,
            self.rating_peak,
        )

        # filter out matches that were cancelled
        matches: list[Match] = []
        for match in proposed_matches:
            if self.match(match[0], match[1]):
                matches.append(match)
            else:
                unmatched_searches.extend(match)

        self._register_unmatched_searches(unmatched_searches)

        for search1, search2 in matches:
            self._report_party_sizes(search1)
            self._report_party_sizes(search2)

            rating_imbalance = abs(search1.cumulative_rating - search2.cumulative_rating)
            metrics.match_rating_imbalance.labels(self.name).observe(rating_imbalance)

            ratings = search1.displayed_ratings + search2.displayed_ratings
            rating_variety = max(ratings) - min(ratings)
            metrics.match_rating_variety.labels(self.name).observe(rating_variety)

            metrics.match_quality.labels(self.name).observe(
                search1.quality_with(search2)
            )
            try:
                self.on_match_found(search1, search2, self)
            except Exception:
                self._logger.exception("Match callback raised an exception!")

    def _report_party_sizes(self, team):
        for search in team.get_original_searches():
            metrics.matched_matchmaker_searches.labels(
                self.name, len(search.players)
            ).inc()

    def _register_unmatched_searches(
        self,
        unmatched_searches: list[Search],
    ):
        """
        Tells all unmatched searches that they went through a failed matching
        attempt.
        """
        for search in unmatched_searches:
            search.register_failed_matching_attempt()
            self._logger.debug(
                "Search %s remained unmatched at threshold %f in attempt number %s",
                search, search.match_threshold, search.failed_matching_attempts
            )

    def push(self, search: Search):
        """ Push the given search object onto the queue """

        self._queue[search] = None
        self.game_service.mark_dirty(self)

    def match(self, s1: Search, s2: Search) -> bool:
        """
        Mark the given two searches as matched

        # Returns
        `True` if matching succeeded or `False` if matching failed.
        """
        if s1.is_matched or s2.is_matched:
            return False
        if s1.is_cancelled or s2.is_cancelled:
            return False
        # Additional failsafe. Ideally this check will never fail.
        if any(
            player.state != PlayerState.SEARCHING_LADDER
            for player in s1.players + s2.players
        ):
            self._logger.warning(
                "Tried to match searches %s and %s while some players had "
                "invalid states: team1: %s team2: %s",
                s1, s2,
                list(p.state for p in s1.players),
                list(p.state for p in s2.players)
            )
            return False

        s1.match(s2)
        s2.match(s1)
        if s1 in self._queue:
            del self._queue[s1]
        if s2 in self._queue:
            del self._queue[s2]

        return True

    def shutdown(self):
        self._is_running = False
        self.timer.cancel()

    def to_dict(self):
        """
        Return a fuzzy representation of the searches currently in the queue
        """
        return {
            "queue_name": self.name,
            "queue_pop_time": datetime.fromtimestamp(
                self.timer.next_queue_pop, timezone.utc
            ).isoformat(),
            "queue_pop_time_delta": round(
                self.timer.next_queue_pop - time.time(),
                ndigits=2
            ),
            "num_players": self.num_players,
            "boundary_80s": [search.boundary_80 for search in self._queue.keys()],
            "boundary_75s": [search.boundary_75 for search in self._queue.keys()],
            # TODO: Remove, the client should query the API for this
            "team_size": self.team_size,
        }

    def __repr__(self):
        return repr(self._queue)

Instance variables

prop is_running : bool
Expand source code
@property
def is_running(self) -> bool:
    return self._is_running
prop num_players : int
Expand source code
@property
def num_players(self) -> int:
    return sum(len(search.players) for search in self._queue.keys())

Methods

def add_map_pool(self,
map_pool: MapPool,
min_rating: int | None,
max_rating: int | None) ‑> None
Expand source code
def add_map_pool(
    self,
    map_pool: MapPool,
    min_rating: Optional[int],
    max_rating: Optional[int]
) -> None:
    self.map_pools[map_pool.id] = (map_pool, min_rating, max_rating)
async def find_matches(self) ‑> None
Expand source code
@synchronized(SpinLock(sleep_duration=1))
async def find_matches(self) -> None:
    """
    Perform the matchmaking algorithm.

    Note that this function is synchronized such that only one instance of
    MatchmakerQueue can call this function at any given time. This is
    needed in order to safely enable multiqueuing.
    """
    self._logger.info("Searching for matches: %s", self.name)

    searches = list(self._queue.keys())

    if self.num_players < 2 * self.team_size:
        self._register_unmatched_searches(searches)
        return

    # Call self.match on all matches and filter out the ones that were cancelled
    loop = asyncio.get_running_loop()
    proposed_matches, unmatched_searches = await loop.run_in_executor(
        None,
        self.matchmaker.find,
        searches,
        self.team_size,
        self.rating_peak,
    )

    # filter out matches that were cancelled
    matches: list[Match] = []
    for match in proposed_matches:
        if self.match(match[0], match[1]):
            matches.append(match)
        else:
            unmatched_searches.extend(match)

    self._register_unmatched_searches(unmatched_searches)

    for search1, search2 in matches:
        self._report_party_sizes(search1)
        self._report_party_sizes(search2)

        rating_imbalance = abs(search1.cumulative_rating - search2.cumulative_rating)
        metrics.match_rating_imbalance.labels(self.name).observe(rating_imbalance)

        ratings = search1.displayed_ratings + search2.displayed_ratings
        rating_variety = max(ratings) - min(ratings)
        metrics.match_rating_variety.labels(self.name).observe(rating_variety)

        metrics.match_quality.labels(self.name).observe(
            search1.quality_with(search2)
        )
        try:
            self.on_match_found(search1, search2, self)
        except Exception:
            self._logger.exception("Match callback raised an exception!")

Perform the matchmaking algorithm.

Note that this function is synchronized such that only one instance of MatchmakerQueue can call this function at any given time. This is needed in order to safely enable multiqueuing.

def get_game_options(self) ‑> dict[str, typing.Any]
Expand source code
def get_game_options(self) -> dict[str, Any]:
    return self.params.get("GameOptions") or None
def get_map_pool_for_rating(self, rating: float) ‑> MapPool | None
Expand source code
def get_map_pool_for_rating(self, rating: float) -> Optional[MapPool]:
    for map_pool, min_rating, max_rating in self.map_pools.values():
        if min_rating is not None and rating < min_rating:
            continue
        if max_rating is not None and rating > max_rating:
            continue
        return map_pool
def initialize(self)
Expand source code
def initialize(self):
    asyncio.create_task(self.queue_pop_timer())
def match(self,
s1: Search,
s2: Search) ‑> bool
Expand source code
def match(self, s1: Search, s2: Search) -> bool:
    """
    Mark the given two searches as matched

    # Returns
    `True` if matching succeeded or `False` if matching failed.
    """
    if s1.is_matched or s2.is_matched:
        return False
    if s1.is_cancelled or s2.is_cancelled:
        return False
    # Additional failsafe. Ideally this check will never fail.
    if any(
        player.state != PlayerState.SEARCHING_LADDER
        for player in s1.players + s2.players
    ):
        self._logger.warning(
            "Tried to match searches %s and %s while some players had "
            "invalid states: team1: %s team2: %s",
            s1, s2,
            list(p.state for p in s1.players),
            list(p.state for p in s2.players)
        )
        return False

    s1.match(s2)
    s2.match(s1)
    if s1 in self._queue:
        del self._queue[s1]
    if s2 in self._queue:
        del self._queue[s2]

    return True

Mark the given two searches as matched

Returns

True if matching succeeded or False if matching failed.

def push(self,
search: Search)
Expand source code
def push(self, search: Search):
    """ Push the given search object onto the queue """

    self._queue[search] = None
    self.game_service.mark_dirty(self)

Push the given search object onto the queue

async def queue_pop_timer(self) ‑> None
Expand source code
async def queue_pop_timer(self) -> None:
    """ Periodically tries to match all Searches in the queue. The amount
    of time until next queue 'pop' is determined by the number of players
    in the queue.
    """
    self._logger.debug("MatchmakerQueue initialized for %s", self.name)
    while self.is_running:
        try:
            await self.timer.next_pop()

            await self.find_matches()

            number_of_unmatched_searches = len(self._queue)
            metrics.unmatched_searches.labels(self.name).set(
                number_of_unmatched_searches
            )

            # Any searches in the queue at this point were unable to find a
            # match this round and will have higher priority next round.

            self.game_service.mark_dirty(self)
        except asyncio.CancelledError:
            break
        except Exception:
            self._logger.exception(
                "Unexpected error during queue pop timer loop!"
            )
            # To avoid potential busy loops
            await asyncio.sleep(1)
    self._logger.info("%s queue stopped", self.name)

Periodically tries to match all Searches in the queue. The amount of time until next queue 'pop' is determined by the number of players in the queue.

async def search(self,
search: Search) ‑> None
Expand source code
async def search(self, search: Search) -> None:
    """
    Search for a match.

    Puts a search object into the queue and awaits completion.
    """
    assert search is not None

    try:
        with MatchmakerSearchTimer(self.name):
            self.push(search)
            await search.await_match()
        self._logger.debug("Search complete: %s", search)
    except CancelledError:
        pass
    finally:
        # If the queue was cancelled, or some other error occurred,
        # make sure to clean up.
        self.game_service.mark_dirty(self)
        if search in self._queue:
            del self._queue[search]

Search for a match.

Puts a search object into the queue and awaits completion.

def shutdown(self)
Expand source code
def shutdown(self):
    self._is_running = False
    self.timer.cancel()
def to_dict(self)
Expand source code
def to_dict(self):
    """
    Return a fuzzy representation of the searches currently in the queue
    """
    return {
        "queue_name": self.name,
        "queue_pop_time": datetime.fromtimestamp(
            self.timer.next_queue_pop, timezone.utc
        ).isoformat(),
        "queue_pop_time_delta": round(
            self.timer.next_queue_pop - time.time(),
            ndigits=2
        ),
        "num_players": self.num_players,
        "boundary_80s": [search.boundary_80 for search in self._queue.keys()],
        "boundary_75s": [search.boundary_75 for search in self._queue.keys()],
        # TODO: Remove, the client should query the API for this
        "team_size": self.team_size,
    }

Return a fuzzy representation of the searches currently in the queue

class PopTimer (queue: MatchmakerQueue)
Expand source code
@with_logger
class PopTimer(object):
    """ Calculates when the next pop should happen based on the rate of players
    queuing.

        timer = PopTimer(some_queue)
        # Pauses the coroutine until the next queue pop
        await timer.next_pop()

    The timer will adjust the pop times in an attempt to maintain a fixed queue
    size on each pop. So generally, the more people are in the queue, the
    shorter the time will be.

    The player queue rate is based on a moving average over the last few pops.
    The exact size can be set in config.
    """

    def __init__(self, queue: "MatchmakerQueue"):
        self.queue = queue
        # Set up deque's for calculating a moving average
        self.last_queue_amounts: deque[int] = deque(maxlen=config.QUEUE_POP_TIME_MOVING_AVG_SIZE)
        self.last_queue_times: deque[float] = deque(maxlen=config.QUEUE_POP_TIME_MOVING_AVG_SIZE)

        self._last_queue_pop = time()
        # Optimistically schedule first pop for half of the max pop time
        self.next_queue_pop = self._last_queue_pop + (config.QUEUE_POP_TIME_MAX / 2)
        self._wait_task = None

    async def next_pop(self):
        """ Wait for the timer to pop. """

        time_remaining = self.next_queue_pop - time()
        self._logger.info("Next %s wave happening in %is", self.queue.name, time_remaining)
        metrics.matchmaker_queue_pop.labels(self.queue.name).set(int(time_remaining))

        self._wait_task = asyncio.create_task(asyncio.sleep(time_remaining))
        await self._wait_task

        num_players = self.queue.num_players
        metrics.matchmaker_players.labels(self.queue.name).set(num_players)

        self._last_queue_pop = time()
        self.next_queue_pop = self._last_queue_pop + self.time_until_next_pop(
            num_players, time_remaining
        )

    def time_until_next_pop(self, num_queued: int, time_queued: float) -> float:
        """ Calculate how long we should wait for the next queue to pop based
        on the current rate of ladder queues
        """
        # Calculate moving average of player queue rate
        self.last_queue_amounts.append(num_queued)
        self.last_queue_times.append(time_queued)

        total_players = sum(self.last_queue_amounts)
        if total_players == 0:
            return config.QUEUE_POP_TIME_MAX

        total_times = sum(self.last_queue_times)
        if total_times:
            self._logger.debug(
                "Queue rate for %s: %f/s", self.queue.name,
                total_players / total_times
            )

        players_per_match = self.queue.team_size * 2
        desired_players = config.QUEUE_POP_DESIRED_MATCHES * players_per_match
        # Obtained by solving $ NUM_PLAYERS = rate * time $ for time.
        next_pop_time = desired_players * total_times / total_players
        if next_pop_time > config.QUEUE_POP_TIME_MAX:
            self._logger.info(
                "Required time (%.2fs) for %s is larger than max pop time (%ds). "
                "Consider increasing the max pop time",
                next_pop_time, self.queue.name, config.QUEUE_POP_TIME_MAX
            )
            return config.QUEUE_POP_TIME_MAX

        if next_pop_time < config.QUEUE_POP_TIME_MIN:
            self._logger.warning(
                "Required time (%.2fs) for %s is lower than min pop time (%ds). ",
                next_pop_time, self.queue.name, config.QUEUE_POP_TIME_MIN
            )
            return config.QUEUE_POP_TIME_MIN

        return next_pop_time

    def cancel(self):
        if self._wait_task:
            self._wait_task.cancel()

Calculates when the next pop should happen based on the rate of players queuing.

timer = PopTimer(some_queue)
# Pauses the coroutine until the next queue pop
await timer.next_pop()

The timer will adjust the pop times in an attempt to maintain a fixed queue size on each pop. So generally, the more people are in the queue, the shorter the time will be.

The player queue rate is based on a moving average over the last few pops. The exact size can be set in config.

Methods

def cancel(self)
Expand source code
def cancel(self):
    if self._wait_task:
        self._wait_task.cancel()
async def next_pop(self)
Expand source code
async def next_pop(self):
    """ Wait for the timer to pop. """

    time_remaining = self.next_queue_pop - time()
    self._logger.info("Next %s wave happening in %is", self.queue.name, time_remaining)
    metrics.matchmaker_queue_pop.labels(self.queue.name).set(int(time_remaining))

    self._wait_task = asyncio.create_task(asyncio.sleep(time_remaining))
    await self._wait_task

    num_players = self.queue.num_players
    metrics.matchmaker_players.labels(self.queue.name).set(num_players)

    self._last_queue_pop = time()
    self.next_queue_pop = self._last_queue_pop + self.time_until_next_pop(
        num_players, time_remaining
    )

Wait for the timer to pop.

def time_until_next_pop(self, num_queued: int, time_queued: float) ‑> float
Expand source code
def time_until_next_pop(self, num_queued: int, time_queued: float) -> float:
    """ Calculate how long we should wait for the next queue to pop based
    on the current rate of ladder queues
    """
    # Calculate moving average of player queue rate
    self.last_queue_amounts.append(num_queued)
    self.last_queue_times.append(time_queued)

    total_players = sum(self.last_queue_amounts)
    if total_players == 0:
        return config.QUEUE_POP_TIME_MAX

    total_times = sum(self.last_queue_times)
    if total_times:
        self._logger.debug(
            "Queue rate for %s: %f/s", self.queue.name,
            total_players / total_times
        )

    players_per_match = self.queue.team_size * 2
    desired_players = config.QUEUE_POP_DESIRED_MATCHES * players_per_match
    # Obtained by solving $ NUM_PLAYERS = rate * time $ for time.
    next_pop_time = desired_players * total_times / total_players
    if next_pop_time > config.QUEUE_POP_TIME_MAX:
        self._logger.info(
            "Required time (%.2fs) for %s is larger than max pop time (%ds). "
            "Consider increasing the max pop time",
            next_pop_time, self.queue.name, config.QUEUE_POP_TIME_MAX
        )
        return config.QUEUE_POP_TIME_MAX

    if next_pop_time < config.QUEUE_POP_TIME_MIN:
        self._logger.warning(
            "Required time (%.2fs) for %s is lower than min pop time (%ds). ",
            next_pop_time, self.queue.name, config.QUEUE_POP_TIME_MIN
        )
        return config.QUEUE_POP_TIME_MIN

    return next_pop_time

Calculate how long we should wait for the next queue to pop based on the current rate of ladder queues

class Search (players: list[Player],
start_time: float | None = None,
rating_type: str = 'ladder_1v1',
on_matched: Callable[[ForwardRef('Search'), ForwardRef('Search')], Any] = <function Search.<lambda>>)
Expand source code
@with_logger
class Search:
    """
    Represents the state of a users search for a match.
    """

    def __init__(
        self,
        players: list[Player],
        start_time: Optional[float] = None,
        rating_type: str = RatingType.LADDER_1V1,
        on_matched: OnMatchedCallback = lambda _1, _2: None
    ):
        assert isinstance(players, list)
        for player in players:
            assert player.ratings[rating_type] is not None

        self.players = players
        self.rating_type = rating_type
        self.start_time = start_time or time.time()
        self._match = asyncio.get_event_loop().create_future()
        self._failed_matching_attempts = 0
        self.on_matched = on_matched

        # Precompute this
        self.quality_against_self = self.quality_with(self)

    def adjusted_rating(self, player: Player) -> Rating:
        """
        Returns an adjusted mean with a simple linear interpolation between current mean and a specified base mean
        """
        mean, dev = player.ratings[self.rating_type]
        game_count = player.game_count[self.rating_type]
        adjusted_mean = ((config.NEWBIE_MIN_GAMES - game_count) * config.NEWBIE_BASE_MEAN
                         + game_count * mean) / config.NEWBIE_MIN_GAMES
        return Rating(adjusted_mean, dev)

    def is_newbie(self, player: Player) -> bool:
        return player.game_count[self.rating_type] <= config.NEWBIE_MIN_GAMES

    def is_single_party(self) -> bool:
        return len(self.players) == 1

    def has_newbie(self) -> bool:
        for player in self.players:
            if self.is_newbie(player):
                return True

        return False

    def num_newbies(self) -> int:
        return sum(self.is_newbie(player) for player in self.players)

    def has_high_rated_player(self) -> bool:
        max_rating = max(self.displayed_ratings)
        return max_rating >= config.HIGH_RATED_PLAYER_MIN_RATING

    def has_top_player(self) -> bool:
        max_rating = max(self.displayed_ratings)
        return max_rating >= config.TOP_PLAYER_MIN_RATING

    @property
    def ratings(self) -> list[Rating]:
        ratings = []
        for player, rating in zip(self.players, self.raw_ratings):
            # New players (less than config.NEWBIE_MIN_GAMES games) match against less skilled opponents
            if self.is_newbie(player):
                rating = self.adjusted_rating(player)
            ratings.append(rating)
        return ratings

    @property
    def cumulative_rating(self) -> float:
        return sum(self.displayed_ratings)

    @property
    def average_rating(self) -> float:
        return statistics.mean(self.displayed_ratings)

    @property
    def raw_ratings(self) -> list[Rating]:
        return [player.ratings[self.rating_type] for player in self.players]

    @property
    def displayed_ratings(self) -> list[float]:
        """
        The client always displays mean - 3 * dev as a player's rating.
        So generally this is perceived as a player's true rating.
        """
        return [rating.displayed() for rating in self.raw_ratings]

    def _nearby_rating_range(self, delta: int) -> tuple[int, int]:
        """
        Returns 'boundary' mu values for player matching. Adjust delta for
        different game qualities.
        """
        mu, _ = self.ratings[0]  # Takes the rating of the first player, only works for 1v1
        rounded_mu = int(math.ceil(mu / 10) * 10)  # Round to 10
        return rounded_mu - delta, rounded_mu + delta

    @property
    def boundary_80(self) -> tuple[int, int]:
        """ Achieves roughly 80% quality. """
        return self._nearby_rating_range(200)

    @property
    def boundary_75(self) -> tuple[int, int]:
        """ Achieves roughly 75% quality. FIXME - why is it MORE restrictive??? """
        return self._nearby_rating_range(100)

    @property
    def failed_matching_attempts(self) -> int:
        return self._failed_matching_attempts

    @property
    def search_expansion(self) -> float:
        """
        Defines how much to expand the search range of game quality due to waiting
        time.

        The threshold will expand linearly with every failed matching attempt
        until it reaches the specified MAX.

        Top players use bigger values to make matching easier.
        """
        if self.has_top_player():
            return min(
                self._failed_matching_attempts * config.LADDER_TOP_PLAYER_SEARCH_EXPANSION_STEP,
                config.LADDER_TOP_PLAYER_SEARCH_EXPANSION_MAX
            )
        elif self.has_newbie():
            return min(
                self._failed_matching_attempts * config.LADDER_NEWBIE_SEARCH_EXPANSION_STEP,
                config.LADDER_NEWBIE_SEARCH_EXPANSION_MAX
            )
        else:
            return min(
                self._failed_matching_attempts * config.LADDER_SEARCH_EXPANSION_STEP,
                config.LADDER_SEARCH_EXPANSION_MAX
            )

    def register_failed_matching_attempt(self):
        """
        Signal that matchmaker tried to match this search but was unsuccessful
        and increase internal counter by one.
        """

        self._failed_matching_attempts += 1

    @property
    def match_threshold(self) -> float:
        """
        Defines the threshold for game quality

        The base minimum quality is determined as 80% of the quality of a game
        against a copy of yourself. This is decreased by `self.search_expansion`
        if search is to be expanded.
        """

        return max(0.8 * self.quality_against_self - self.search_expansion, 0)

    def quality_with(self, other: "Search") -> float:
        assert all(other.raw_ratings)
        assert other.players

        team1 = [trueskill.Rating(*rating) for rating in self.ratings]
        team2 = [trueskill.Rating(*rating) for rating in other.ratings]

        return trueskill.quality([team1, team2])

    @property
    def is_matched(self) -> bool:
        return self._match.done() and not self._match.cancelled()

    def done(self) -> bool:
        return self._match.done()

    @property
    def is_cancelled(self) -> bool:
        return self._match.cancelled()

    def matches_with(self, other: "Search") -> bool:
        """
        Determine if this search is compatible with other given search according
        to both wishes.
        """
        if not isinstance(other, Search):
            return False

        quality = self.quality_with(other)
        return self._match_quality_acceptable(other, quality)

    def _match_quality_acceptable(self, other: "Search", quality: float) -> bool:
        """
        Determine if the given match quality is acceptable.

        This gets it's own function so we can call it from the Matchmaker using
        a cached `quality` value.
        """
        # NOTE: We are assuming for optimization purposes that quality is
        # symmetric. If this ever changes, update here
        return (quality >= self.match_threshold and
                quality >= other.match_threshold)

    def match(self, other: "Search"):
        """
        Mark as matched with given opponent
        """
        self._logger.info("Matched %s with %s", self, other)

        self.on_matched(self, other)

        for player, raw_rating in zip(self.players, self.raw_ratings):
            if self.is_newbie(player):
                mean, dev = raw_rating
                adjusted_mean, adjusted_dev = self.adjusted_rating(player)
                self._logger.info(
                    "Adjusted mean rating for %s with %d games from %.1f to %.1f",
                    player.login,
                    player.game_count[self.rating_type],
                    mean,
                    adjusted_mean
                )
        self._match.set_result(other)

    async def await_match(self):
        """
        Wait for this search to complete
        """
        await asyncio.wait_for(self._match, None)
        return self._match

    def cancel(self):
        """
        Cancel searching for a match
        """
        self._match.cancel()

    def __str__(self) -> str:
        return (
            f"Search({self.rating_type}, {self._players_repr()}, threshold="
            f"{self.match_threshold:.2f}, expansion={self.search_expansion:.2f})"
        )

    def _players_repr(self) -> str:
        contents = ", ".join(
            f"Player({p.login}, {p.id}, {p.ratings[self.rating_type]})"
            for p in self.players
        )
        return f"[{contents}]"

    def __repr__(self) -> str:
        """For debugging"""
        return (
            f"Search({[p.login for p in self.players]}, {self.average_rating}"
            f"{f', FMA: {self.failed_matching_attempts}' if self.failed_matching_attempts else ''}"
            f"{', has_newbie)' if self.has_newbie() else ')'}"
        )

    def get_original_searches(self) -> list["Search"]:
        """
        Returns the searches of which this Search is comprised,
        as if it were a CombinedSearch of one
        """
        return [self]

Represents the state of a users search for a match.

Subclasses

Instance variables

prop average_rating : float
Expand source code
@property
def average_rating(self) -> float:
    return statistics.mean(self.displayed_ratings)
prop boundary_75 : tuple[int, int]
Expand source code
@property
def boundary_75(self) -> tuple[int, int]:
    """ Achieves roughly 75% quality. FIXME - why is it MORE restrictive??? """
    return self._nearby_rating_range(100)

Achieves roughly 75% quality. FIXME - why is it MORE restrictive???

prop boundary_80 : tuple[int, int]
Expand source code
@property
def boundary_80(self) -> tuple[int, int]:
    """ Achieves roughly 80% quality. """
    return self._nearby_rating_range(200)

Achieves roughly 80% quality.

prop cumulative_rating : float
Expand source code
@property
def cumulative_rating(self) -> float:
    return sum(self.displayed_ratings)
prop displayed_ratings : list[float]
Expand source code
@property
def displayed_ratings(self) -> list[float]:
    """
    The client always displays mean - 3 * dev as a player's rating.
    So generally this is perceived as a player's true rating.
    """
    return [rating.displayed() for rating in self.raw_ratings]

The client always displays mean - 3 * dev as a player's rating. So generally this is perceived as a player's true rating.

prop failed_matching_attempts : int
Expand source code
@property
def failed_matching_attempts(self) -> int:
    return self._failed_matching_attempts
prop is_cancelled : bool
Expand source code
@property
def is_cancelled(self) -> bool:
    return self._match.cancelled()
prop is_matched : bool
Expand source code
@property
def is_matched(self) -> bool:
    return self._match.done() and not self._match.cancelled()
prop match_threshold : float
Expand source code
@property
def match_threshold(self) -> float:
    """
    Defines the threshold for game quality

    The base minimum quality is determined as 80% of the quality of a game
    against a copy of yourself. This is decreased by `self.search_expansion`
    if search is to be expanded.
    """

    return max(0.8 * self.quality_against_self - self.search_expansion, 0)

Defines the threshold for game quality

The base minimum quality is determined as 80% of the quality of a game against a copy of yourself. This is decreased by self.search_expansion if search is to be expanded.

prop ratings : list[Rating]
Expand source code
@property
def ratings(self) -> list[Rating]:
    ratings = []
    for player, rating in zip(self.players, self.raw_ratings):
        # New players (less than config.NEWBIE_MIN_GAMES games) match against less skilled opponents
        if self.is_newbie(player):
            rating = self.adjusted_rating(player)
        ratings.append(rating)
    return ratings
prop raw_ratings : list[Rating]
Expand source code
@property
def raw_ratings(self) -> list[Rating]:
    return [player.ratings[self.rating_type] for player in self.players]
prop search_expansion : float
Expand source code
@property
def search_expansion(self) -> float:
    """
    Defines how much to expand the search range of game quality due to waiting
    time.

    The threshold will expand linearly with every failed matching attempt
    until it reaches the specified MAX.

    Top players use bigger values to make matching easier.
    """
    if self.has_top_player():
        return min(
            self._failed_matching_attempts * config.LADDER_TOP_PLAYER_SEARCH_EXPANSION_STEP,
            config.LADDER_TOP_PLAYER_SEARCH_EXPANSION_MAX
        )
    elif self.has_newbie():
        return min(
            self._failed_matching_attempts * config.LADDER_NEWBIE_SEARCH_EXPANSION_STEP,
            config.LADDER_NEWBIE_SEARCH_EXPANSION_MAX
        )
    else:
        return min(
            self._failed_matching_attempts * config.LADDER_SEARCH_EXPANSION_STEP,
            config.LADDER_SEARCH_EXPANSION_MAX
        )

Defines how much to expand the search range of game quality due to waiting time.

The threshold will expand linearly with every failed matching attempt until it reaches the specified MAX.

Top players use bigger values to make matching easier.

Methods

def adjusted_rating(self,
player: Player) ‑> Rating
Expand source code
def adjusted_rating(self, player: Player) -> Rating:
    """
    Returns an adjusted mean with a simple linear interpolation between current mean and a specified base mean
    """
    mean, dev = player.ratings[self.rating_type]
    game_count = player.game_count[self.rating_type]
    adjusted_mean = ((config.NEWBIE_MIN_GAMES - game_count) * config.NEWBIE_BASE_MEAN
                     + game_count * mean) / config.NEWBIE_MIN_GAMES
    return Rating(adjusted_mean, dev)

Returns an adjusted mean with a simple linear interpolation between current mean and a specified base mean

async def await_match(self)
Expand source code
async def await_match(self):
    """
    Wait for this search to complete
    """
    await asyncio.wait_for(self._match, None)
    return self._match

Wait for this search to complete

def cancel(self)
Expand source code
def cancel(self):
    """
    Cancel searching for a match
    """
    self._match.cancel()

Cancel searching for a match

def done(self) ‑> bool
Expand source code
def done(self) -> bool:
    return self._match.done()
def get_original_searches(self) ‑> list['Search']
Expand source code
def get_original_searches(self) -> list["Search"]:
    """
    Returns the searches of which this Search is comprised,
    as if it were a CombinedSearch of one
    """
    return [self]

Returns the searches of which this Search is comprised, as if it were a CombinedSearch of one

def has_high_rated_player(self) ‑> bool
Expand source code
def has_high_rated_player(self) -> bool:
    max_rating = max(self.displayed_ratings)
    return max_rating >= config.HIGH_RATED_PLAYER_MIN_RATING
def has_newbie(self) ‑> bool
Expand source code
def has_newbie(self) -> bool:
    for player in self.players:
        if self.is_newbie(player):
            return True

    return False
def has_top_player(self) ‑> bool
Expand source code
def has_top_player(self) -> bool:
    max_rating = max(self.displayed_ratings)
    return max_rating >= config.TOP_PLAYER_MIN_RATING
def is_newbie(self,
player: Player) ‑> bool
Expand source code
def is_newbie(self, player: Player) -> bool:
    return player.game_count[self.rating_type] <= config.NEWBIE_MIN_GAMES
def is_single_party(self) ‑> bool
Expand source code
def is_single_party(self) -> bool:
    return len(self.players) == 1
def match(self,
other: Search)
Expand source code
def match(self, other: "Search"):
    """
    Mark as matched with given opponent
    """
    self._logger.info("Matched %s with %s", self, other)

    self.on_matched(self, other)

    for player, raw_rating in zip(self.players, self.raw_ratings):
        if self.is_newbie(player):
            mean, dev = raw_rating
            adjusted_mean, adjusted_dev = self.adjusted_rating(player)
            self._logger.info(
                "Adjusted mean rating for %s with %d games from %.1f to %.1f",
                player.login,
                player.game_count[self.rating_type],
                mean,
                adjusted_mean
            )
    self._match.set_result(other)

Mark as matched with given opponent

def matches_with(self,
other: Search) ‑> bool
Expand source code
def matches_with(self, other: "Search") -> bool:
    """
    Determine if this search is compatible with other given search according
    to both wishes.
    """
    if not isinstance(other, Search):
        return False

    quality = self.quality_with(other)
    return self._match_quality_acceptable(other, quality)

Determine if this search is compatible with other given search according to both wishes.

def num_newbies(self) ‑> int
Expand source code
def num_newbies(self) -> int:
    return sum(self.is_newbie(player) for player in self.players)
def quality_with(self,
other: Search) ‑> float
Expand source code
def quality_with(self, other: "Search") -> float:
    assert all(other.raw_ratings)
    assert other.players

    team1 = [trueskill.Rating(*rating) for rating in self.ratings]
    team2 = [trueskill.Rating(*rating) for rating in other.ratings]

    return trueskill.quality([team1, team2])
def register_failed_matching_attempt(self)
Expand source code
def register_failed_matching_attempt(self):
    """
    Signal that matchmaker tried to match this search but was unsuccessful
    and increase internal counter by one.
    """

    self._failed_matching_attempts += 1

Signal that matchmaker tried to match this search but was unsuccessful and increase internal counter by one.