Module server.ladder_service.violation_service

Classes

class Violation (count: int = 1, time: datetime.datetime | None = None)
Expand source code
@dataclass
class Violation:
    count: int
    time: datetime

    def __init__(self, count: int = 1, time: Optional[datetime] = None):
        self.count = count
        self.time = time or datetime_now()

    def register(self):
        self.count += 1
        self.time = datetime_now()

    def get_ban_expiration(self) -> datetime:
        if self.count < 2:
            # No ban, expires as soon as it's registered
            return self.time
        elif self.count == 2:
            return self.time + timedelta(minutes=10)
        else:
            return self.time + timedelta(minutes=30)

    def get_remaining(self, now: Optional[datetime] = None) -> timedelta:
        return self.get_ban_expiration() - (now or datetime_now())

    def is_expired(self, now: Optional[datetime] = None) -> bool:
        """
        Whether the violation history should be reset. This is different from
        the ban expiration time which should be checked by calling
        `get_ban_expiration`.
        """
        now = now or datetime_now()
        # TODO: Config?
        return self.time + timedelta(hours=1) <= now

    def to_dict(self) -> dict:
        return {
            "count": self.count,
            "time": self.time.isoformat()
        }

Violation(count: int = 1, time: Optional[datetime.datetime] = None)

Class variables

var count : int
var time : datetime.datetime

Methods

def get_ban_expiration(self) ‑> datetime.datetime
Expand source code
def get_ban_expiration(self) -> datetime:
    if self.count < 2:
        # No ban, expires as soon as it's registered
        return self.time
    elif self.count == 2:
        return self.time + timedelta(minutes=10)
    else:
        return self.time + timedelta(minutes=30)
def get_remaining(self, now: datetime.datetime | None = None) ‑> datetime.timedelta
Expand source code
def get_remaining(self, now: Optional[datetime] = None) -> timedelta:
    return self.get_ban_expiration() - (now or datetime_now())
def is_expired(self, now: datetime.datetime | None = None) ‑> bool
Expand source code
def is_expired(self, now: Optional[datetime] = None) -> bool:
    """
    Whether the violation history should be reset. This is different from
    the ban expiration time which should be checked by calling
    `get_ban_expiration`.
    """
    now = now or datetime_now()
    # TODO: Config?
    return self.time + timedelta(hours=1) <= now

Whether the violation history should be reset. This is different from the ban expiration time which should be checked by calling get_ban_expiration.

def register(self)
Expand source code
def register(self):
    self.count += 1
    self.time = datetime_now()
def to_dict(self) ‑> dict
Expand source code
def to_dict(self) -> dict:
    return {
        "count": self.count,
        "time": self.time.isoformat()
    }
class ViolationService
Expand source code
@with_logger
class ViolationService(Service):
    """
    Track who is banned from searching and for how long. Apply progressive
    discipline for repeated violations.

    A violation could be anything, but it is usually any time a player fails
    to connect to a game.
    """

    def __init__(self):
        # We store a reference to the original `Player` object for logging only
        self._violations: dict[int, tuple[Player, Violation]] = {}

    async def initialize(self):
        self._cleanup_task = at_interval(5, func=self.clear_expired)

    def clear_expired(self):
        now = datetime_now()
        for player, violation in list(self._violations.values()):
            if violation.is_expired(now):
                self._clear_violation(player)

    def register_violations(self, players: list[Player]):
        now = datetime_now()
        for player in players:
            violation = self.get_violation(player)
            if violation is None or violation.is_expired(now):
                violation = Violation(time=now)
                self.set_violation(player, violation)
            else:
                violation.register()

            player.write_message({
                "command": "search_violation",
                **violation.to_dict()
            })
            extra_text = ""
            if violation.count > 1:
                delta_text = humanize.precisedelta(
                    violation.get_ban_expiration() - now
                )
                extra_text = f" You can queue again in {delta_text}"
            player.write_message({
                "command": "notice",
                "style": "info",
                "text": (
                    f"You have caused a matchmaking connection failure {violation.count} time(s). "
                    "Multiple failures result in temporary time-outs from matchmaker. "
                    "Please seek support on the forums or discord for persistent issues." +
                    extra_text
                )
            })

    def get_violations(self, players: list[Player]) -> dict[Player, Violation]:
        now = datetime_now()
        result = {}
        for player in players:
            violation = self.get_violation(player)
            if not violation:
                continue
            elif violation.get_ban_expiration() > now:
                result[player] = violation
            elif violation.is_expired(now):
                self._clear_violation(player)

        return result

    def get_violation(self, player: Player) -> Optional[Violation]:
        _, violation = self._violations.get(player.id, (None, None))
        return violation

    def set_violation(self, player: Player, violation: Violation):
        self._violations[player.id] = (player, violation)

    def _clear_violation(self, player: Player):
        violation = self.get_violation(player)
        self._logger.debug(
            "Cleared violation for player %s: %s",
            player.login,
            violation
        )
        del self._violations[player.id]

Track who is banned from searching and for how long. Apply progressive discipline for repeated violations.

A violation could be anything, but it is usually any time a player fails to connect to a game.

Ancestors

Methods

def clear_expired(self)
Expand source code
def clear_expired(self):
    now = datetime_now()
    for player, violation in list(self._violations.values()):
        if violation.is_expired(now):
            self._clear_violation(player)
def get_violation(self,
player: Player) ‑> Violation | None
Expand source code
def get_violation(self, player: Player) -> Optional[Violation]:
    _, violation = self._violations.get(player.id, (None, None))
    return violation
def get_violations(self,
players: list[Player]) ‑> dict[PlayerViolation]
Expand source code
def get_violations(self, players: list[Player]) -> dict[Player, Violation]:
    now = datetime_now()
    result = {}
    for player in players:
        violation = self.get_violation(player)
        if not violation:
            continue
        elif violation.get_ban_expiration() > now:
            result[player] = violation
        elif violation.is_expired(now):
            self._clear_violation(player)

    return result
def register_violations(self,
players: list[Player])
Expand source code
def register_violations(self, players: list[Player]):
    now = datetime_now()
    for player in players:
        violation = self.get_violation(player)
        if violation is None or violation.is_expired(now):
            violation = Violation(time=now)
            self.set_violation(player, violation)
        else:
            violation.register()

        player.write_message({
            "command": "search_violation",
            **violation.to_dict()
        })
        extra_text = ""
        if violation.count > 1:
            delta_text = humanize.precisedelta(
                violation.get_ban_expiration() - now
            )
            extra_text = f" You can queue again in {delta_text}"
        player.write_message({
            "command": "notice",
            "style": "info",
            "text": (
                f"You have caused a matchmaking connection failure {violation.count} time(s). "
                "Multiple failures result in temporary time-outs from matchmaker. "
                "Please seek support on the forums or discord for persistent issues." +
                extra_text
            )
        })
def set_violation(self,
player: Player,
violation: Violation)
Expand source code
def set_violation(self, player: Player, violation: Violation):
    self._violations[player.id] = (player, violation)

Inherited members