Module server.gameconnection
Game communication over GpgNet
Classes
class GameConnection (database: FAFDatabase,
game: Game,
player: Player,
protocol: Protocol,
player_service: PlayerService,
games: GameService,
state: GameConnectionState = GameConnectionState.INITIALIZING,
setup_timeout: int = 60)-
Expand source code
class GameConnection(GpgNetServerProtocol): """ Responsible for connections to the game, using the GPGNet protocol """ def __init__( self, database: FAFDatabase, game: Game, player: Player, protocol: Protocol, player_service: PlayerService, games: GameService, state: GameConnectionState = GameConnectionState.INITIALIZING, setup_timeout: int = 60, ): """ Construct a new GameConnection """ super().__init__() self._logger = logging.getLogger( f"{self.__class__.__qualname__}.{game.id}" ) self._db = database self.protocol = protocol self._state = state self.game_service = games self.player_service = player_service self.player = player player.game_connection = self # Set up weak reference to self self.game = game self.setup_timeout = setup_timeout self.finished_sim = False self._logger.debug("GameConnection initializing") if self.state is GameConnectionState.INITIALIZING: asyncio.get_event_loop().create_task( self.timeout_game_connection(setup_timeout) ) async def timeout_game_connection(self, timeout): await asyncio.sleep(timeout) if self.state is GameConnectionState.INITIALIZING: self._logger.debug("GameConection timed out...") await self.abort("Player took too long to start the game") @property def state(self) -> GameConnectionState: return self._state def is_host(self) -> bool: if not self.game or not self.player: return False return ( self.player.state == PlayerState.HOSTING and self.player == self.game.host ) async def send(self, message): """ Send a game message to the client. # Errors May raise `DisconnectedError` NOTE: When calling this on a connection other than `self` make sure to handle `DisconnectedError`, otherwise failure to send the message will cause the caller to be disconnected as well. """ message["target"] = "game" self._logger.log(TRACE, ">> %s: %s", self.player.login, message) await self.protocol.send_message(message) async def _handle_idle_state(self): """ This message is sent by FA when it doesn't know what to do. """ assert self.game if self.player == self.game.host: self.game.state = GameState.LOBBY self._state = GameConnectionState.CONNECTED_TO_HOST self.game.add_game_connection(self) self.player.state = PlayerState.HOSTING else: self._state = GameConnectionState.INITIALIZED self.player.state = PlayerState.JOINING async def _handle_lobby_state(self): """ The game has told us it is ready and listening on self.player.game_port for UDP. We determine the connectivity of the peer and respond appropriately """ player_state = self.player.state if player_state == PlayerState.HOSTING: await self.send_HostGame(self.game.map.folder_name) self.game.set_hosted() # If the player is joining, we connect him to host # followed by the rest of the players. elif player_state == PlayerState.JOINING: await self.connect_to_host(self.game.host.game_connection) if self._state is GameConnectionState.ENDED: # We aborted while trying to connect return self._state = GameConnectionState.CONNECTED_TO_HOST try: self.game.add_game_connection(self) except GameError as e: await self.abort(f"GameError while joining {self.game.id}: {e}") return tasks = [] for peer in self.game.connections: if peer != self and peer.player != self.game.host: self._logger.debug("%s connecting to %s", self.player, peer) tasks.append(self.connect_to_peer(peer)) await asyncio.gather(*tasks) async def connect_to_host(self, peer: "GameConnection"): """ Connect self to a given peer (host) """ if not peer or peer.player.state != PlayerState.HOSTING: await self.abort("The host left the lobby") return await self.send_JoinGame(peer.player.login, peer.player.id) if not peer: await self.abort("The host left the lobby") return await peer.send_ConnectToPeer( player_name=self.player.login, player_uid=self.player.id, offer=True ) async def connect_to_peer(self, peer: "GameConnection"): """ Connect two peers """ if peer is not None: await self.send_ConnectToPeer( player_name=peer.player.login, player_uid=peer.player.id, offer=True ) if peer is not None: with contextlib.suppress(DisconnectedError): await peer.send_ConnectToPeer( player_name=self.player.login, player_uid=self.player.id, offer=False ) async def handle_action(self, command: str, args: list[Any]): """ Handle GpgNetSend messages, wrapped in the JSON protocol """ try: await COMMAND_HANDLERS[command](self, *args) except KeyError: self._logger.warning( "Unrecognized command %s: %s from player %s", command, args, self.player ) except (TypeError, ValueError): self._logger.exception("Bad command arguments") except ConnectionError as e: raise e except Exception as e: # pragma: no cover self._logger.exception( "Something awful happened in a game thread! %s", e ) await self.abort() async def handle_desync(self, *_args): # pragma: no cover self.game.desyncs += 1 async def handle_game_option(self, key: str, value: Any): if not self.is_host(): return await self.game.game_options.set_option(key, value) self._mark_dirty() async def handle_game_mods(self, mode: Any, args: list[Any]): if not self.is_host(): return if mode == "activated": # In this case args is the number of mods if int(args) == 0: self.game.mods = {} elif mode == "uids": uids = str(args).split() self.game.mods = {uid: "Unknown sim mod" for uid in uids} async with self._db.acquire() as conn: rows = await conn.execute( "SELECT `uid`, `name` from `table_mod` WHERE `uid` in :ids", ids=tuple(uids) ) for row in rows: self.game.mods[row.uid] = row.name else: self._logger.warning("Ignoring game mod: %s, %s", mode, args) return self._mark_dirty() async def handle_player_option( self, player_id: Any, key: Any, value: Any ): if not self.is_host(): return self.game.set_player_option(int(player_id), key, value) self._mark_dirty() async def handle_ai_option(self, name: Any, key: Any, value: Any): if not self.is_host(): return self.game.set_ai_option(str(name), key, value) self._mark_dirty() async def handle_clear_slot(self, slot: Any): if not self.is_host(): return self.game.clear_slot(int(slot)) self._mark_dirty() async def handle_game_result(self, army: Any, result: Any): army = int(army) result = str(result).lower() try: *metadata, result_type, score = result.split() except ValueError: self._logger.warning("Invalid result for %s reported: %s", army, result) else: await self.game.add_result( self.player.id, army, result_type, int(score), frozenset(metadata), ) async def handle_operation_complete( self, primary: Any, secondary: Any, delta: str ): """ # Params - `primary`: are primary mission objectives complete? - `secondary`: are secondary mission objectives complete? - `delta`: the time it took to complete the mission """ primary = FA.ENABLED == primary secondary = FA.ENABLED == secondary if not primary: return if not isinstance(self.game, CoopGame): self._logger.warning( "OperationComplete called for non-coop game: %s", self.game.id ) return if self.game.validity != ValidityState.COOP_NOT_RANKED: return secondary, delta = secondary, str(delta) async with self._db.acquire() as conn: result = await conn.execute( select(coop_map.c.id).where( coop_map.c.filename == self.game.map.file_path ) ) row = result.fetchone() if not row: self._logger.debug( "can't find coop map: %s", self.game.map.file_path ) return mission = row.id # Each player in a co-op game will send the OperationComplete # message but we only need to perform this insert once async with self.game.leaderboard_lock: if not self.game.leaderboard_saved: await conn.execute( coop_leaderboard.insert().values( mission=mission, gameuid=self.game.id, secondary=secondary, time=delta, player_count=len(self.game.players), ) ) self.game.leaderboard_saved = True async def handle_json_stats(self, stats: str): try: self.game.report_army_stats(stats) except json.JSONDecodeError as e: self._logger.warning( "Malformed game stats reported by %s: '...%s...'", self.player.login, stats[e.pos-20:e.pos+20] ) async def handle_enforce_rating(self): self.game.enforce_rating = True async def handle_teamkill_report( self, gametime: Any, reporter_id: Any, reporter_name: str, teamkiller_id: Any, teamkiller_name: str, ): """ Sent when a player is teamkilled and clicks the 'Report' button. # Params - `gametime`: seconds of gametime when kill happened - `reporter_id`: reporter id - `reporter_name`: reporter nickname (for debug purpose only) - `teamkiller_id`: teamkiller id - `teamkiller_name`: teamkiller nickname (for debug purpose only) """ pass async def handle_teamkill_happened( self, gametime: Any, victim_id: Any, victim_name: str, teamkiller_id: Any, teamkiller_name: str, ): """ Send automatically by the game whenever a teamkill happens. Takes the same parameters as TeamkillReport. # Params - `gametime`: seconds of gametime when kill happened - `victim_id`: victim id - `victim_name`: victim nickname (for debug purpose only) - `teamkiller_id`: teamkiller id - `teamkiller_name`: teamkiller nickname (for debug purpose only) """ victim_id = int(victim_id) teamkiller_id = int(teamkiller_id) if 0 in (victim_id, teamkiller_id): self._logger.debug("Ignoring teamkill for AI player") return async with self._db.acquire() as conn: await conn.execute( teamkills.insert().values( teamkiller=teamkiller_id, victim=victim_id, game_id=self.game.id, gametime=gametime, ) ) async def handle_ice_message(self, receiver_id: Any, ice_msg: str): receiver_id = int(receiver_id) peer = self.player_service.get_player(receiver_id) if not peer: self._logger.debug( "Ignoring ICE message for unknown player: %s", receiver_id ) return game_connection = peer.game_connection if not game_connection: self._logger.debug( "Ignoring ICE message for player without game connection: %s", receiver_id ) return try: await game_connection.send({ "command": "IceMsg", "args": [int(self.player.id), ice_msg] }) except DisconnectedError: self._logger.debug( "Failed to send ICE message to player due to a disconnect: %s", receiver_id ) async def handle_game_state(self, state: str): """ Changes in game state """ if state == "Idle": await self._handle_idle_state() # Don't mark as dirty return elif state == "Lobby": # TODO: Do we still need to schedule with `ensure_future`? # # We do not yield from the task, since we # need to keep processing other commands while it runs await self._handle_lobby_state() elif state == "Launching": if self.player.state != PlayerState.HOSTING: return if self.game.state is not GameState.LOBBY: self._logger.warning( "Trying to launch game %s in invalid state %s", self.game, self.game.state ) return self._logger.info("Launching game %s", self.game) await self.game.launch() if len(self.game.mods.keys()) > 0: async with self._db.acquire() as conn: uids = list(self.game.mods.keys()) await conn.execute( "UPDATE mod_stats s JOIN mod_version v ON " "v.mod_id = s.mod_id " "SET s.times_played = s.times_played + 1 " "WHERE v.uid in :ids", ids=tuple(uids) ) # Signals that the FA executable has been closed elif state == "Ended": await self.on_connection_closed() self._mark_dirty() async def handle_game_ended(self, *args: list[Any]): """ Signals that the simulation has ended. """ self.finished_sim = True await self.game.check_game_finish(self.player) async def handle_rehost(self, *args: list[Any]): """ Signals that the user has rehosted the game. This is currently unused but included for documentation purposes. """ pass async def handle_launch_status(self, status: str): """ Currently is sent with status `Rejected` if a matchmaker game failed to start due to players using differing game settings. """ pass async def handle_bottleneck(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass async def handle_bottleneck_cleared(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass async def handle_disconnected(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass async def handle_chat(self, message: str): """ Whenever the player sends a chat message during the game lobby. """ pass async def handle_game_full(self): """ Sent when all game slots are full """ pass def _mark_dirty(self): if self.game: self.game_service.mark_dirty(self.game) async def abort(self, log_message: str = ""): """ Abort the connection Removes the GameConnection object from the any associated Game object, and deletes references to Player and Game held by this object. """ try: if self._state is GameConnectionState.ENDED: return self._logger.debug("%s.abort(%s)", self, log_message) if self.game.state is GameState.LOBBY: await self.disconnect_all_peers() self._state = GameConnectionState.ENDED await self.game.remove_game_connection(self) self._mark_dirty() self.player.state = PlayerState.IDLE if self.player.lobby_connection: self.player.lobby_connection.game_connection = None del self.player.game del self.player.game_connection except Exception as ex: # pragma: no cover self._logger.debug("Exception in abort(): %s", ex) async def disconnect_all_peers(self): tasks = [] for peer in self.game.connections: if peer == self: continue tasks.append(peer.send_DisconnectFromPeer(self.player.id)) for fut in asyncio.as_completed(tasks): try: await fut except Exception: self._logger.debug( "peer_sendDisconnectFromPeer failed for player %i", self.player.id, exc_info=True ) async def on_connection_closed(self): """ The connection is closed by the player. """ try: await self.game.disconnect_player(self.player) except Exception as e: # pragma: no cover self._logger.exception(e) finally: await self.abort() async def on_connection_lost(self): """ The connection is lost due to a disconnect from the lobby server. """ try: await self.game.remove_game_connection(self) except Exception as e: # pragma: no cover self._logger.exception(e) finally: await self.abort() def __str__(self): return f"GameConnection({self.player}, {self.game})"
Responsible for connections to the game, using the GPGNet protocol
Construct a new GameConnection
Ancestors
Instance variables
prop state : GameConnectionState
-
Expand source code
@property def state(self) -> GameConnectionState: return self._state
Methods
async def abort(self, log_message: str = '')
-
Expand source code
async def abort(self, log_message: str = ""): """ Abort the connection Removes the GameConnection object from the any associated Game object, and deletes references to Player and Game held by this object. """ try: if self._state is GameConnectionState.ENDED: return self._logger.debug("%s.abort(%s)", self, log_message) if self.game.state is GameState.LOBBY: await self.disconnect_all_peers() self._state = GameConnectionState.ENDED await self.game.remove_game_connection(self) self._mark_dirty() self.player.state = PlayerState.IDLE if self.player.lobby_connection: self.player.lobby_connection.game_connection = None del self.player.game del self.player.game_connection except Exception as ex: # pragma: no cover self._logger.debug("Exception in abort(): %s", ex)
Abort the connection
Removes the GameConnection object from the any associated Game object, and deletes references to Player and Game held by this object.
async def connect_to_host(self,
peer: GameConnection)-
Expand source code
async def connect_to_host(self, peer: "GameConnection"): """ Connect self to a given peer (host) """ if not peer or peer.player.state != PlayerState.HOSTING: await self.abort("The host left the lobby") return await self.send_JoinGame(peer.player.login, peer.player.id) if not peer: await self.abort("The host left the lobby") return await peer.send_ConnectToPeer( player_name=self.player.login, player_uid=self.player.id, offer=True )
Connect self to a given peer (host)
async def connect_to_peer(self,
peer: GameConnection)-
Expand source code
async def connect_to_peer(self, peer: "GameConnection"): """ Connect two peers """ if peer is not None: await self.send_ConnectToPeer( player_name=peer.player.login, player_uid=peer.player.id, offer=True ) if peer is not None: with contextlib.suppress(DisconnectedError): await peer.send_ConnectToPeer( player_name=self.player.login, player_uid=self.player.id, offer=False )
Connect two peers
async def disconnect_all_peers(self)
-
Expand source code
async def disconnect_all_peers(self): tasks = [] for peer in self.game.connections: if peer == self: continue tasks.append(peer.send_DisconnectFromPeer(self.player.id)) for fut in asyncio.as_completed(tasks): try: await fut except Exception: self._logger.debug( "peer_sendDisconnectFromPeer failed for player %i", self.player.id, exc_info=True )
async def handle_action(self, command: str, args: list[typing.Any])
-
Expand source code
async def handle_action(self, command: str, args: list[Any]): """ Handle GpgNetSend messages, wrapped in the JSON protocol """ try: await COMMAND_HANDLERS[command](self, *args) except KeyError: self._logger.warning( "Unrecognized command %s: %s from player %s", command, args, self.player ) except (TypeError, ValueError): self._logger.exception("Bad command arguments") except ConnectionError as e: raise e except Exception as e: # pragma: no cover self._logger.exception( "Something awful happened in a game thread! %s", e ) await self.abort()
Handle GpgNetSend messages, wrapped in the JSON protocol
async def handle_ai_option(self, name: Any, key: Any, value: Any)
-
Expand source code
async def handle_ai_option(self, name: Any, key: Any, value: Any): if not self.is_host(): return self.game.set_ai_option(str(name), key, value) self._mark_dirty()
async def handle_bottleneck(self, *args: list[typing.Any])
-
Expand source code
async def handle_bottleneck(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass
Not sure what this command means. This is currently unused but included for documentation purposes.
async def handle_bottleneck_cleared(self, *args: list[typing.Any])
-
Expand source code
async def handle_bottleneck_cleared(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass
Not sure what this command means. This is currently unused but included for documentation purposes.
async def handle_chat(self, message: str)
-
Expand source code
async def handle_chat(self, message: str): """ Whenever the player sends a chat message during the game lobby. """ pass
Whenever the player sends a chat message during the game lobby.
async def handle_clear_slot(self, slot: Any)
-
Expand source code
async def handle_clear_slot(self, slot: Any): if not self.is_host(): return self.game.clear_slot(int(slot)) self._mark_dirty()
async def handle_desync(self, *_args)
-
Expand source code
async def handle_desync(self, *_args): # pragma: no cover self.game.desyncs += 1
async def handle_disconnected(self, *args: list[typing.Any])
-
Expand source code
async def handle_disconnected(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass
Not sure what this command means. This is currently unused but included for documentation purposes.
async def handle_enforce_rating(self)
-
Expand source code
async def handle_enforce_rating(self): self.game.enforce_rating = True
async def handle_game_ended(self, *args: list[typing.Any])
-
Expand source code
async def handle_game_ended(self, *args: list[Any]): """ Signals that the simulation has ended. """ self.finished_sim = True await self.game.check_game_finish(self.player)
Signals that the simulation has ended.
async def handle_game_full(self)
-
Expand source code
async def handle_game_full(self): """ Sent when all game slots are full """ pass
Sent when all game slots are full
async def handle_game_mods(self, mode: Any, args: list[typing.Any])
-
Expand source code
async def handle_game_mods(self, mode: Any, args: list[Any]): if not self.is_host(): return if mode == "activated": # In this case args is the number of mods if int(args) == 0: self.game.mods = {} elif mode == "uids": uids = str(args).split() self.game.mods = {uid: "Unknown sim mod" for uid in uids} async with self._db.acquire() as conn: rows = await conn.execute( "SELECT `uid`, `name` from `table_mod` WHERE `uid` in :ids", ids=tuple(uids) ) for row in rows: self.game.mods[row.uid] = row.name else: self._logger.warning("Ignoring game mod: %s, %s", mode, args) return self._mark_dirty()
async def handle_game_option(self, key: str, value: Any)
-
Expand source code
async def handle_game_option(self, key: str, value: Any): if not self.is_host(): return await self.game.game_options.set_option(key, value) self._mark_dirty()
async def handle_game_result(self, army: Any, result: Any)
-
Expand source code
async def handle_game_result(self, army: Any, result: Any): army = int(army) result = str(result).lower() try: *metadata, result_type, score = result.split() except ValueError: self._logger.warning("Invalid result for %s reported: %s", army, result) else: await self.game.add_result( self.player.id, army, result_type, int(score), frozenset(metadata), )
async def handle_game_state(self, state: str)
-
Expand source code
async def handle_game_state(self, state: str): """ Changes in game state """ if state == "Idle": await self._handle_idle_state() # Don't mark as dirty return elif state == "Lobby": # TODO: Do we still need to schedule with `ensure_future`? # # We do not yield from the task, since we # need to keep processing other commands while it runs await self._handle_lobby_state() elif state == "Launching": if self.player.state != PlayerState.HOSTING: return if self.game.state is not GameState.LOBBY: self._logger.warning( "Trying to launch game %s in invalid state %s", self.game, self.game.state ) return self._logger.info("Launching game %s", self.game) await self.game.launch() if len(self.game.mods.keys()) > 0: async with self._db.acquire() as conn: uids = list(self.game.mods.keys()) await conn.execute( "UPDATE mod_stats s JOIN mod_version v ON " "v.mod_id = s.mod_id " "SET s.times_played = s.times_played + 1 " "WHERE v.uid in :ids", ids=tuple(uids) ) # Signals that the FA executable has been closed elif state == "Ended": await self.on_connection_closed() self._mark_dirty()
Changes in game state
async def handle_ice_message(self, receiver_id: Any, ice_msg: str)
-
Expand source code
async def handle_ice_message(self, receiver_id: Any, ice_msg: str): receiver_id = int(receiver_id) peer = self.player_service.get_player(receiver_id) if not peer: self._logger.debug( "Ignoring ICE message for unknown player: %s", receiver_id ) return game_connection = peer.game_connection if not game_connection: self._logger.debug( "Ignoring ICE message for player without game connection: %s", receiver_id ) return try: await game_connection.send({ "command": "IceMsg", "args": [int(self.player.id), ice_msg] }) except DisconnectedError: self._logger.debug( "Failed to send ICE message to player due to a disconnect: %s", receiver_id )
async def handle_json_stats(self, stats: str)
-
Expand source code
async def handle_json_stats(self, stats: str): try: self.game.report_army_stats(stats) except json.JSONDecodeError as e: self._logger.warning( "Malformed game stats reported by %s: '...%s...'", self.player.login, stats[e.pos-20:e.pos+20] )
async def handle_launch_status(self, status: str)
-
Expand source code
async def handle_launch_status(self, status: str): """ Currently is sent with status `Rejected` if a matchmaker game failed to start due to players using differing game settings. """ pass
Currently is sent with status
Rejected
if a matchmaker game failed to start due to players using differing game settings. async def handle_operation_complete(self, primary: Any, secondary: Any, delta: str)
-
Expand source code
async def handle_operation_complete( self, primary: Any, secondary: Any, delta: str ): """ # Params - `primary`: are primary mission objectives complete? - `secondary`: are secondary mission objectives complete? - `delta`: the time it took to complete the mission """ primary = FA.ENABLED == primary secondary = FA.ENABLED == secondary if not primary: return if not isinstance(self.game, CoopGame): self._logger.warning( "OperationComplete called for non-coop game: %s", self.game.id ) return if self.game.validity != ValidityState.COOP_NOT_RANKED: return secondary, delta = secondary, str(delta) async with self._db.acquire() as conn: result = await conn.execute( select(coop_map.c.id).where( coop_map.c.filename == self.game.map.file_path ) ) row = result.fetchone() if not row: self._logger.debug( "can't find coop map: %s", self.game.map.file_path ) return mission = row.id # Each player in a co-op game will send the OperationComplete # message but we only need to perform this insert once async with self.game.leaderboard_lock: if not self.game.leaderboard_saved: await conn.execute( coop_leaderboard.insert().values( mission=mission, gameuid=self.game.id, secondary=secondary, time=delta, player_count=len(self.game.players), ) ) self.game.leaderboard_saved = True
Params
primary
: are primary mission objectives complete?secondary
: are secondary mission objectives complete?delta
: the time it took to complete the mission
async def handle_player_option(self, player_id: Any, key: Any, value: Any)
-
Expand source code
async def handle_player_option( self, player_id: Any, key: Any, value: Any ): if not self.is_host(): return self.game.set_player_option(int(player_id), key, value) self._mark_dirty()
async def handle_rehost(self, *args: list[typing.Any])
-
Expand source code
async def handle_rehost(self, *args: list[Any]): """ Signals that the user has rehosted the game. This is currently unused but included for documentation purposes. """ pass
Signals that the user has rehosted the game. This is currently unused but included for documentation purposes.
async def handle_teamkill_happened(self,
gametime: Any,
victim_id: Any,
victim_name: str,
teamkiller_id: Any,
teamkiller_name: str)-
Expand source code
async def handle_teamkill_happened( self, gametime: Any, victim_id: Any, victim_name: str, teamkiller_id: Any, teamkiller_name: str, ): """ Send automatically by the game whenever a teamkill happens. Takes the same parameters as TeamkillReport. # Params - `gametime`: seconds of gametime when kill happened - `victim_id`: victim id - `victim_name`: victim nickname (for debug purpose only) - `teamkiller_id`: teamkiller id - `teamkiller_name`: teamkiller nickname (for debug purpose only) """ victim_id = int(victim_id) teamkiller_id = int(teamkiller_id) if 0 in (victim_id, teamkiller_id): self._logger.debug("Ignoring teamkill for AI player") return async with self._db.acquire() as conn: await conn.execute( teamkills.insert().values( teamkiller=teamkiller_id, victim=victim_id, game_id=self.game.id, gametime=gametime, ) )
Send automatically by the game whenever a teamkill happens. Takes the same parameters as TeamkillReport.
Params
gametime
: seconds of gametime when kill happenedvictim_id
: victim idvictim_name
: victim nickname (for debug purpose only)teamkiller_id
: teamkiller idteamkiller_name
: teamkiller nickname (for debug purpose only)
async def handle_teamkill_report(self,
gametime: Any,
reporter_id: Any,
reporter_name: str,
teamkiller_id: Any,
teamkiller_name: str)-
Expand source code
async def handle_teamkill_report( self, gametime: Any, reporter_id: Any, reporter_name: str, teamkiller_id: Any, teamkiller_name: str, ): """ Sent when a player is teamkilled and clicks the 'Report' button. # Params - `gametime`: seconds of gametime when kill happened - `reporter_id`: reporter id - `reporter_name`: reporter nickname (for debug purpose only) - `teamkiller_id`: teamkiller id - `teamkiller_name`: teamkiller nickname (for debug purpose only) """ pass
Sent when a player is teamkilled and clicks the 'Report' button.
Params
gametime
: seconds of gametime when kill happenedreporter_id
: reporter idreporter_name
: reporter nickname (for debug purpose only)teamkiller_id
: teamkiller idteamkiller_name
: teamkiller nickname (for debug purpose only)
def is_host(self) ‑> bool
-
Expand source code
def is_host(self) -> bool: if not self.game or not self.player: return False return ( self.player.state == PlayerState.HOSTING and self.player == self.game.host )
async def on_connection_closed(self)
-
Expand source code
async def on_connection_closed(self): """ The connection is closed by the player. """ try: await self.game.disconnect_player(self.player) except Exception as e: # pragma: no cover self._logger.exception(e) finally: await self.abort()
The connection is closed by the player.
async def on_connection_lost(self)
-
Expand source code
async def on_connection_lost(self): """ The connection is lost due to a disconnect from the lobby server. """ try: await self.game.remove_game_connection(self) except Exception as e: # pragma: no cover self._logger.exception(e) finally: await self.abort()
The connection is lost due to a disconnect from the lobby server.
async def send(self, message)
-
Expand source code
async def send(self, message): """ Send a game message to the client. # Errors May raise `DisconnectedError` NOTE: When calling this on a connection other than `self` make sure to handle `DisconnectedError`, otherwise failure to send the message will cause the caller to be disconnected as well. """ message["target"] = "game" self._logger.log(TRACE, ">> %s: %s", self.player.login, message) await self.protocol.send_message(message)
Send a game message to the client.
Errors
May raise
DisconnectedError
NOTE: When calling this on a connection other than
self
make sure to handleDisconnectedError
, otherwise failure to send the message will cause the caller to be disconnected as well. async def timeout_game_connection(self, timeout)
-
Expand source code
async def timeout_game_connection(self, timeout): await asyncio.sleep(timeout) if self.state is GameConnectionState.INITIALIZING: self._logger.debug("GameConection timed out...") await self.abort("Player took too long to start the game")
Inherited members