Module server.gameconnection
Game communication over GpgNet
Classes
class GameConnection (database: FAFDatabase, game: Game, player: Player, protocol: Protocol, player_service: PlayerService, games: GameService, state: GameConnectionState = GameConnectionState.INITIALIZING, setup_timeout: int = 60)
-
Responsible for connections to the game, using the GPGNet protocol
Construct a new GameConnection
Expand source code
class GameConnection(GpgNetServerProtocol): """ Responsible for connections to the game, using the GPGNet protocol """ def __init__( self, database: FAFDatabase, game: Game, player: Player, protocol: Protocol, player_service: PlayerService, games: GameService, state: GameConnectionState = GameConnectionState.INITIALIZING, setup_timeout: int = 60, ): """ Construct a new GameConnection """ super().__init__() self._logger = logging.getLogger( f"{self.__class__.__qualname__}.{game.id}" ) self._db = database self.protocol = protocol self._state = state self.game_service = games self.player_service = player_service self.player = player player.game_connection = self # Set up weak reference to self self.game = game self.setup_timeout = setup_timeout self.finished_sim = False self._logger.debug("GameConnection initializing") if self.state is GameConnectionState.INITIALIZING: asyncio.get_event_loop().create_task( self.timeout_game_connection(setup_timeout) ) async def timeout_game_connection(self, timeout): await asyncio.sleep(timeout) if self.state is GameConnectionState.INITIALIZING: self._logger.debug("GameConection timed out...") await self.abort("Player took too long to start the game") @property def state(self) -> GameConnectionState: return self._state def is_host(self) -> bool: if not self.game or not self.player: return False return ( self.player.state == PlayerState.HOSTING and self.player == self.game.host ) async def send(self, message): """ Send a game message to the client. # Errors May raise `DisconnectedError` NOTE: When calling this on a connection other than `self` make sure to handle `DisconnectedError`, otherwise failure to send the message will cause the caller to be disconnected as well. """ message["target"] = "game" self._logger.log(TRACE, ">> %s: %s", self.player.login, message) await self.protocol.send_message(message) async def _handle_idle_state(self): """ This message is sent by FA when it doesn't know what to do. """ assert self.game if self.player == self.game.host: self.game.state = GameState.LOBBY self._state = GameConnectionState.CONNECTED_TO_HOST self.game.add_game_connection(self) self.player.state = PlayerState.HOSTING else: self._state = GameConnectionState.INITIALIZED self.player.state = PlayerState.JOINING async def _handle_lobby_state(self): """ The game has told us it is ready and listening on self.player.game_port for UDP. We determine the connectivity of the peer and respond appropriately """ player_state = self.player.state if player_state == PlayerState.HOSTING: await self.send_HostGame(self.game.map.folder_name) self.game.set_hosted() # If the player is joining, we connect him to host # followed by the rest of the players. elif player_state == PlayerState.JOINING: await self.connect_to_host(self.game.host.game_connection) if self._state is GameConnectionState.ENDED: # We aborted while trying to connect return self._state = GameConnectionState.CONNECTED_TO_HOST try: self.game.add_game_connection(self) except GameError as e: await self.abort(f"GameError while joining {self.game.id}: {e}") return tasks = [] for peer in self.game.connections: if peer != self and peer.player != self.game.host: self._logger.debug("%s connecting to %s", self.player, peer) tasks.append(self.connect_to_peer(peer)) await asyncio.gather(*tasks) async def connect_to_host(self, peer: "GameConnection"): """ Connect self to a given peer (host) """ if not peer or peer.player.state != PlayerState.HOSTING: await self.abort("The host left the lobby") return await self.send_JoinGame(peer.player.login, peer.player.id) if not peer: await self.abort("The host left the lobby") return await peer.send_ConnectToPeer( player_name=self.player.login, player_uid=self.player.id, offer=True ) async def connect_to_peer(self, peer: "GameConnection"): """ Connect two peers """ if peer is not None: await self.send_ConnectToPeer( player_name=peer.player.login, player_uid=peer.player.id, offer=True ) if peer is not None: with contextlib.suppress(DisconnectedError): await peer.send_ConnectToPeer( player_name=self.player.login, player_uid=self.player.id, offer=False ) async def handle_action(self, command: str, args: list[Any]): """ Handle GpgNetSend messages, wrapped in the JSON protocol """ try: await COMMAND_HANDLERS[command](self, *args) except KeyError: self._logger.warning( "Unrecognized command %s: %s from player %s", command, args, self.player ) except (TypeError, ValueError): self._logger.exception("Bad command arguments") except ConnectionError as e: raise e except Exception as e: # pragma: no cover self._logger.exception( "Something awful happened in a game thread! %s", e ) await self.abort() async def handle_desync(self, *_args): # pragma: no cover self.game.desyncs += 1 async def handle_game_option(self, key: str, value: Any): if not self.is_host(): return await self.game.game_options.set_option(key, value) self._mark_dirty() async def handle_game_mods(self, mode: Any, args: list[Any]): if not self.is_host(): return if mode == "activated": # In this case args is the number of mods if int(args) == 0: self.game.mods = {} elif mode == "uids": uids = str(args).split() self.game.mods = {uid: "Unknown sim mod" for uid in uids} async with self._db.acquire() as conn: rows = await conn.execute( "SELECT `uid`, `name` from `table_mod` WHERE `uid` in :ids", ids=tuple(uids) ) for row in rows: self.game.mods[row.uid] = row.name else: self._logger.warning("Ignoring game mod: %s, %s", mode, args) return self._mark_dirty() async def handle_player_option( self, player_id: Any, key: Any, value: Any ): if not self.is_host(): return self.game.set_player_option(int(player_id), key, value) self._mark_dirty() async def handle_ai_option(self, name: Any, key: Any, value: Any): if not self.is_host(): return self.game.set_ai_option(str(name), key, value) self._mark_dirty() async def handle_clear_slot(self, slot: Any): if not self.is_host(): return self.game.clear_slot(int(slot)) self._mark_dirty() async def handle_game_result(self, army: Any, result: Any): army = int(army) result = str(result).lower() try: *metadata, result_type, score = result.split() except ValueError: self._logger.warning("Invalid result for %s reported: %s", army, result) else: await self.game.add_result( self.player.id, army, result_type, int(score), frozenset(metadata), ) async def handle_operation_complete( self, primary: Any, secondary: Any, delta: str ): """ # Params - `primary`: are primary mission objectives complete? - `secondary`: are secondary mission objectives complete? - `delta`: the time it took to complete the mission """ primary = FA.ENABLED == primary secondary = FA.ENABLED == secondary if not primary: return if not isinstance(self.game, CoopGame): self._logger.warning( "OperationComplete called for non-coop game: %s", self.game.id ) return if self.game.validity != ValidityState.COOP_NOT_RANKED: return secondary, delta = secondary, str(delta) async with self._db.acquire() as conn: result = await conn.execute( select(coop_map.c.id).where( coop_map.c.filename == self.game.map.file_path ) ) row = result.fetchone() if not row: self._logger.debug( "can't find coop map: %s", self.game.map.file_path ) return mission = row.id # Each player in a co-op game will send the OperationComplete # message but we only need to perform this insert once async with self.game.leaderboard_lock: if not self.game.leaderboard_saved: await conn.execute( coop_leaderboard.insert().values( mission=mission, gameuid=self.game.id, secondary=secondary, time=delta, player_count=len(self.game.players), ) ) self.game.leaderboard_saved = True async def handle_json_stats(self, stats: str): try: self.game.report_army_stats(stats) except json.JSONDecodeError as e: self._logger.warning( "Malformed game stats reported by %s: '...%s...'", self.player.login, stats[e.pos-20:e.pos+20] ) async def handle_enforce_rating(self): self.game.enforce_rating = True async def handle_teamkill_report( self, gametime: Any, reporter_id: Any, reporter_name: str, teamkiller_id: Any, teamkiller_name: str, ): """ Sent when a player is teamkilled and clicks the 'Report' button. # Params - `gametime`: seconds of gametime when kill happened - `reporter_id`: reporter id - `reporter_name`: reporter nickname (for debug purpose only) - `teamkiller_id`: teamkiller id - `teamkiller_name`: teamkiller nickname (for debug purpose only) """ pass async def handle_teamkill_happened( self, gametime: Any, victim_id: Any, victim_name: str, teamkiller_id: Any, teamkiller_name: str, ): """ Send automatically by the game whenever a teamkill happens. Takes the same parameters as TeamkillReport. # Params - `gametime`: seconds of gametime when kill happened - `victim_id`: victim id - `victim_name`: victim nickname (for debug purpose only) - `teamkiller_id`: teamkiller id - `teamkiller_name`: teamkiller nickname (for debug purpose only) """ victim_id = int(victim_id) teamkiller_id = int(teamkiller_id) if 0 in (victim_id, teamkiller_id): self._logger.debug("Ignoring teamkill for AI player") return async with self._db.acquire() as conn: await conn.execute( teamkills.insert().values( teamkiller=teamkiller_id, victim=victim_id, game_id=self.game.id, gametime=gametime, ) ) async def handle_ice_message(self, receiver_id: Any, ice_msg: str): receiver_id = int(receiver_id) peer = self.player_service.get_player(receiver_id) if not peer: self._logger.debug( "Ignoring ICE message for unknown player: %s", receiver_id ) return game_connection = peer.game_connection if not game_connection: self._logger.debug( "Ignoring ICE message for player without game connection: %s", receiver_id ) return try: await game_connection.send({ "command": "IceMsg", "args": [int(self.player.id), ice_msg] }) except DisconnectedError: self._logger.debug( "Failed to send ICE message to player due to a disconnect: %s", receiver_id ) async def handle_game_state(self, state: str): """ Changes in game state """ if state == "Idle": await self._handle_idle_state() # Don't mark as dirty return elif state == "Lobby": # TODO: Do we still need to schedule with `ensure_future`? # # We do not yield from the task, since we # need to keep processing other commands while it runs await self._handle_lobby_state() elif state == "Launching": if self.player.state != PlayerState.HOSTING: return if self.game.state is not GameState.LOBBY: self._logger.warning( "Trying to launch game %s in invalid state %s", self.game, self.game.state ) return self._logger.info("Launching game %s", self.game) await self.game.launch() if len(self.game.mods.keys()) > 0: async with self._db.acquire() as conn: uids = list(self.game.mods.keys()) await conn.execute( "UPDATE mod_stats s JOIN mod_version v ON " "v.mod_id = s.mod_id " "SET s.times_played = s.times_played + 1 " "WHERE v.uid in :ids", ids=tuple(uids) ) # Signals that the FA executable has been closed elif state == "Ended": await self.on_connection_closed() self._mark_dirty() async def handle_game_ended(self, *args: list[Any]): """ Signals that the simulation has ended. """ self.finished_sim = True await self.game.check_game_finish(self.player) async def handle_rehost(self, *args: list[Any]): """ Signals that the user has rehosted the game. This is currently unused but included for documentation purposes. """ pass async def handle_launch_status(self, status: str): """ Currently is sent with status `Rejected` if a matchmaker game failed to start due to players using differing game settings. """ pass async def handle_bottleneck(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass async def handle_bottleneck_cleared(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass async def handle_disconnected(self, *args: list[Any]): """ Not sure what this command means. This is currently unused but included for documentation purposes. """ pass async def handle_chat(self, message: str): """ Whenever the player sends a chat message during the game lobby. """ pass async def handle_game_full(self): """ Sent when all game slots are full """ pass def _mark_dirty(self): if self.game: self.game_service.mark_dirty(self.game) async def abort(self, log_message: str = ""): """ Abort the connection Removes the GameConnection object from the any associated Game object, and deletes references to Player and Game held by this object. """ try: if self._state is GameConnectionState.ENDED: return self._logger.debug("%s.abort(%s)", self, log_message) if self.game.state is GameState.LOBBY: await self.disconnect_all_peers() self._state = GameConnectionState.ENDED await self.game.remove_game_connection(self) self._mark_dirty() self.player.state = PlayerState.IDLE if self.player.lobby_connection: self.player.lobby_connection.game_connection = None del self.player.game del self.player.game_connection except Exception as ex: # pragma: no cover self._logger.debug("Exception in abort(): %s", ex) async def disconnect_all_peers(self): tasks = [] for peer in self.game.connections: if peer == self: continue tasks.append(peer.send_DisconnectFromPeer(self.player.id)) for fut in asyncio.as_completed(tasks): try: await fut except Exception: self._logger.debug( "peer_sendDisconnectFromPeer failed for player %i", self.player.id, exc_info=True ) async def on_connection_closed(self): """ The connection is closed by the player. """ try: await self.game.disconnect_player(self.player) except Exception as e: # pragma: no cover self._logger.exception(e) finally: await self.abort() async def on_connection_lost(self): """ The connection is lost due to a disconnect from the lobby server. """ try: await self.game.remove_game_connection(self) except Exception as e: # pragma: no cover self._logger.exception(e) finally: await self.abort() def __str__(self): return f"GameConnection({self.player}, {self.game})"
Ancestors
Instance variables
prop state : GameConnectionState
-
Expand source code
@property def state(self) -> GameConnectionState: return self._state
Methods
async def abort(self, log_message: str = '')
-
Abort the connection
Removes the GameConnection object from the any associated Game object, and deletes references to Player and Game held by this object.
async def connect_to_host(self, peer: GameConnection)
-
Connect self to a given peer (host)
async def connect_to_peer(self, peer: GameConnection)
-
Connect two peers
async def disconnect_all_peers(self)
async def handle_action(self, command: str, args: list[typing.Any])
-
Handle GpgNetSend messages, wrapped in the JSON protocol
async def handle_ai_option(self, name: Any, key: Any, value: Any)
async def handle_bottleneck(self, *args: list[typing.Any])
-
Not sure what this command means. This is currently unused but included for documentation purposes.
async def handle_bottleneck_cleared(self, *args: list[typing.Any])
-
Not sure what this command means. This is currently unused but included for documentation purposes.
async def handle_chat(self, message: str)
-
Whenever the player sends a chat message during the game lobby.
async def handle_clear_slot(self, slot: Any)
async def handle_desync(self, *_args)
async def handle_disconnected(self, *args: list[typing.Any])
-
Not sure what this command means. This is currently unused but included for documentation purposes.
async def handle_enforce_rating(self)
async def handle_game_ended(self, *args: list[typing.Any])
-
Signals that the simulation has ended.
async def handle_game_full(self)
-
Sent when all game slots are full
async def handle_game_mods(self, mode: Any, args: list[typing.Any])
async def handle_game_option(self, key: str, value: Any)
async def handle_game_result(self, army: Any, result: Any)
async def handle_game_state(self, state: str)
-
Changes in game state
async def handle_ice_message(self, receiver_id: Any, ice_msg: str)
async def handle_json_stats(self, stats: str)
async def handle_launch_status(self, status: str)
-
Currently is sent with status
Rejected
if a matchmaker game failed to start due to players using differing game settings. async def handle_operation_complete(self, primary: Any, secondary: Any, delta: str)
-
Params
primary
: are primary mission objectives complete?secondary
: are secondary mission objectives complete?delta
: the time it took to complete the mission
async def handle_player_option(self, player_id: Any, key: Any, value: Any)
async def handle_rehost(self, *args: list[typing.Any])
-
Signals that the user has rehosted the game. This is currently unused but included for documentation purposes.
async def handle_teamkill_happened(self, gametime: Any, victim_id: Any, victim_name: str, teamkiller_id: Any, teamkiller_name: str)
-
Send automatically by the game whenever a teamkill happens. Takes the same parameters as TeamkillReport.
Params
gametime
: seconds of gametime when kill happenedvictim_id
: victim idvictim_name
: victim nickname (for debug purpose only)teamkiller_id
: teamkiller idteamkiller_name
: teamkiller nickname (for debug purpose only)
async def handle_teamkill_report(self, gametime: Any, reporter_id: Any, reporter_name: str, teamkiller_id: Any, teamkiller_name: str)
-
Sent when a player is teamkilled and clicks the 'Report' button.
Params
gametime
: seconds of gametime when kill happenedreporter_id
: reporter idreporter_name
: reporter nickname (for debug purpose only)teamkiller_id
: teamkiller idteamkiller_name
: teamkiller nickname (for debug purpose only)
def is_host(self) ‑> bool
async def on_connection_closed(self)
-
The connection is closed by the player.
async def on_connection_lost(self)
-
The connection is lost due to a disconnect from the lobby server.
async def send(self, message)
-
Send a game message to the client.
Errors
May raise
DisconnectedError
NOTE: When calling this on a connection other than
self
make sure to handleDisconnectedError
, otherwise failure to send the message will cause the caller to be disconnected as well. async def timeout_game_connection(self, timeout)
Inherited members