Module server.ladder_service.violation_service
Classes
class Violation (count: int = 1, time: datetime.datetime | None = None)
-
Expand source code
@dataclass class Violation: count: int time: datetime def __init__(self, count: int = 1, time: Optional[datetime] = None): self.count = count self.time = time or datetime_now() def register(self): self.count += 1 self.time = datetime_now() def get_ban_expiration(self) -> datetime: if self.count < 2: # No ban, expires as soon as it's registered return self.time elif self.count == 2: return self.time + timedelta(minutes=10) else: return self.time + timedelta(minutes=30) def get_remaining(self, now: Optional[datetime] = None) -> timedelta: return self.get_ban_expiration() - (now or datetime_now()) def is_expired(self, now: Optional[datetime] = None) -> bool: """ Whether the violation history should be reset. This is different from the ban expiration time which should be checked by calling `get_ban_expiration`. """ now = now or datetime_now() # TODO: Config? return self.time + timedelta(hours=1) <= now def to_dict(self) -> dict: return { "count": self.count, "time": self.time.isoformat() }
Violation(count: int = 1, time: Optional[datetime.datetime] = None)
Class variables
var count : int
var time : datetime.datetime
Methods
def get_ban_expiration(self) ‑> datetime.datetime
-
Expand source code
def get_ban_expiration(self) -> datetime: if self.count < 2: # No ban, expires as soon as it's registered return self.time elif self.count == 2: return self.time + timedelta(minutes=10) else: return self.time + timedelta(minutes=30)
def get_remaining(self, now: datetime.datetime | None = None) ‑> datetime.timedelta
-
Expand source code
def get_remaining(self, now: Optional[datetime] = None) -> timedelta: return self.get_ban_expiration() - (now or datetime_now())
def is_expired(self, now: datetime.datetime | None = None) ‑> bool
-
Expand source code
def is_expired(self, now: Optional[datetime] = None) -> bool: """ Whether the violation history should be reset. This is different from the ban expiration time which should be checked by calling `get_ban_expiration`. """ now = now or datetime_now() # TODO: Config? return self.time + timedelta(hours=1) <= now
Whether the violation history should be reset. This is different from the ban expiration time which should be checked by calling
get_ban_expiration
. def register(self)
-
Expand source code
def register(self): self.count += 1 self.time = datetime_now()
def to_dict(self) ‑> dict
-
Expand source code
def to_dict(self) -> dict: return { "count": self.count, "time": self.time.isoformat() }
class ViolationService
-
Expand source code
@with_logger class ViolationService(Service): """ Track who is banned from searching and for how long. Apply progressive discipline for repeated violations. A violation could be anything, but it is usually any time a player fails to connect to a game. """ def __init__(self): # We store a reference to the original `Player` object for logging only self._violations: dict[int, tuple[Player, Violation]] = {} async def initialize(self): self._cleanup_task = at_interval(5, func=self.clear_expired) def clear_expired(self): now = datetime_now() for player, violation in list(self._violations.values()): if violation.is_expired(now): self._clear_violation(player) def register_violations(self, players: list[Player]): now = datetime_now() for player in players: violation = self.get_violation(player) if violation is None or violation.is_expired(now): violation = Violation(time=now) self.set_violation(player, violation) else: violation.register() player.write_message({ "command": "search_violation", **violation.to_dict() }) extra_text = "" if violation.count > 1: delta_text = humanize.precisedelta( violation.get_ban_expiration() - now ) extra_text = f" You can queue again in {delta_text}" player.write_message({ "command": "notice", "style": "info", "text": ( f"You have caused a matchmaking connection failure {violation.count} time(s). " "Multiple failures result in temporary time-outs from matchmaker. " "Please seek support on the forums or discord for persistent issues." + extra_text ) }) def get_violations(self, players: list[Player]) -> dict[Player, Violation]: now = datetime_now() result = {} for player in players: violation = self.get_violation(player) if not violation: continue elif violation.get_ban_expiration() > now: result[player] = violation elif violation.is_expired(now): self._clear_violation(player) return result def get_violation(self, player: Player) -> Optional[Violation]: _, violation = self._violations.get(player.id, (None, None)) return violation def set_violation(self, player: Player, violation: Violation): self._violations[player.id] = (player, violation) def _clear_violation(self, player: Player): violation = self.get_violation(player) self._logger.debug( "Cleared violation for player %s: %s", player.login, violation ) del self._violations[player.id]
Track who is banned from searching and for how long. Apply progressive discipline for repeated violations.
A violation could be anything, but it is usually any time a player fails to connect to a game.
Ancestors
Methods
def clear_expired(self)
-
Expand source code
def clear_expired(self): now = datetime_now() for player, violation in list(self._violations.values()): if violation.is_expired(now): self._clear_violation(player)
def get_violation(self,
player: Player) ‑> Violation | None-
Expand source code
def get_violation(self, player: Player) -> Optional[Violation]: _, violation = self._violations.get(player.id, (None, None)) return violation
def get_violations(self,
players: list[Player]) ‑> dict[Player, Violation]-
Expand source code
def get_violations(self, players: list[Player]) -> dict[Player, Violation]: now = datetime_now() result = {} for player in players: violation = self.get_violation(player) if not violation: continue elif violation.get_ban_expiration() > now: result[player] = violation elif violation.is_expired(now): self._clear_violation(player) return result
def register_violations(self,
players: list[Player])-
Expand source code
def register_violations(self, players: list[Player]): now = datetime_now() for player in players: violation = self.get_violation(player) if violation is None or violation.is_expired(now): violation = Violation(time=now) self.set_violation(player, violation) else: violation.register() player.write_message({ "command": "search_violation", **violation.to_dict() }) extra_text = "" if violation.count > 1: delta_text = humanize.precisedelta( violation.get_ban_expiration() - now ) extra_text = f" You can queue again in {delta_text}" player.write_message({ "command": "notice", "style": "info", "text": ( f"You have caused a matchmaking connection failure {violation.count} time(s). " "Multiple failures result in temporary time-outs from matchmaker. " "Please seek support on the forums or discord for persistent issues." + extra_text ) })
def set_violation(self,
player: Player,
violation: Violation)-
Expand source code
def set_violation(self, player: Player, violation: Violation): self._violations[player.id] = (player, violation)
Inherited members