Module server.protocol
Protocol format definitions
Every message sent between the lobby server and a client is expected to
deserialize to a python dictionary. The structure of that dictionary depends on
the application logic (see server.lobbyconnection).
This module defines the classes that handle the wire format, i.e. how messages are serialized to bytes and sent across the network.
Sub-modules
- server.protocol.gpgnet
- server.protocol.protocol
- 
Protocol base class 
- server.protocol.qdatastream
- 
DEPRECATED: Legacy QDataStream (UTF-16, BigEndian) encoded data format … 
- server.protocol.simple_json
- 
A simple newline terminated JSON data format … 
Classes
- class DisconnectedError (*args, **kwargs)
- 
Expand source codeclass DisconnectedError(ConnectionError): """For signaling that a protocol has lost connection to the remote."""For signaling that a protocol has lost connection to the remote. Ancestors- builtins.ConnectionError
- builtins.OSError
- builtins.Exception
- builtins.BaseException
 
- class GpgNetClientProtocol
- 
Expand source codeclass GpgNetClientProtocol(metaclass=ABCMeta): def send_GameState(self, arguments: list[Union[int, str, bool]]) -> None: """ Sent by the client when the state of LobbyComm changes """ self.send_gpgnet_message("GameState", arguments) @abstractmethod def send_gpgnet_message(self, command_id, arguments: list[Union[int, str, bool]]) -> None: pass # pragma: no coverMethods- def send_GameState(self, arguments: list[int | str | bool]) ‑> None
- 
Expand source codedef send_GameState(self, arguments: list[Union[int, str, bool]]) -> None: """ Sent by the client when the state of LobbyComm changes """ self.send_gpgnet_message("GameState", arguments)Sent by the client when the state of LobbyComm changes 
- def send_gpgnet_message(self, command_id, arguments: list[int | str | bool]) ‑> None
- 
Expand source code@abstractmethod def send_gpgnet_message(self, command_id, arguments: list[Union[int, str, bool]]) -> None: pass # pragma: no cover
 
- class GpgNetServerProtocol
- 
Expand source codeclass GpgNetServerProtocol(metaclass=ABCMeta): """ Defines an interface for the server side GPGNet protocol """ async def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool): """ Tells a client that has a listening LobbyComm instance to connect to the given peer # Params - `player_name`: Remote player name - `player_uid`: Remote player identifier """ await self.send_gpgnet_message("ConnectToPeer", [player_name, player_uid, offer]) async def send_JoinGame(self, remote_player_name: str, remote_player_uid: int): """ Tells the game to join the given peer by ID """ await self.send_gpgnet_message("JoinGame", [remote_player_name, remote_player_uid]) async def send_HostGame(self, map_path: str): """ Tells the game to start listening for incoming connections as a host # Params - `map_path`: Which scenario to use """ await self.send_gpgnet_message("HostGame", [str(map_path)]) async def send_DisconnectFromPeer(self, id: int): """ Instructs the game to disconnect from the peer given by id """ await self.send_gpgnet_message("DisconnectFromPeer", [id]) async def send_gpgnet_message(self, command_id: str, arguments: list[Union[int, str, bool]]): message = {"command": command_id, "args": arguments} await self.send(message) @abstractmethod async def send(self, message): pass # pragma: no coverDefines an interface for the server side GPGNet protocol SubclassesMethods- async def send(self, message)
- 
Expand source code@abstractmethod async def send(self, message): pass # pragma: no cover
- async def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool)
- 
Expand source codeasync def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool): """ Tells a client that has a listening LobbyComm instance to connect to the given peer # Params - `player_name`: Remote player name - `player_uid`: Remote player identifier """ await self.send_gpgnet_message("ConnectToPeer", [player_name, player_uid, offer])Tells a client that has a listening LobbyComm instance to connect to the given peer Params- player_name: Remote player name
- player_uid: Remote player identifier
 
- async def send_DisconnectFromPeer(self, id: int)
- 
Expand source codeasync def send_DisconnectFromPeer(self, id: int): """ Instructs the game to disconnect from the peer given by id """ await self.send_gpgnet_message("DisconnectFromPeer", [id])Instructs the game to disconnect from the peer given by id 
- async def send_HostGame(self, map_path: str)
- 
Expand source codeasync def send_HostGame(self, map_path: str): """ Tells the game to start listening for incoming connections as a host # Params - `map_path`: Which scenario to use """ await self.send_gpgnet_message("HostGame", [str(map_path)])Tells the game to start listening for incoming connections as a host Params- map_path: Which scenario to use
 
- async def send_JoinGame(self, remote_player_name: str, remote_player_uid: int)
- 
Expand source codeasync def send_JoinGame(self, remote_player_name: str, remote_player_uid: int): """ Tells the game to join the given peer by ID """ await self.send_gpgnet_message("JoinGame", [remote_player_name, remote_player_uid])Tells the game to join the given peer by ID 
- async def send_gpgnet_message(self, command_id: str, arguments: list[int | str | bool])
- 
Expand source codeasync def send_gpgnet_message(self, command_id: str, arguments: list[Union[int, str, bool]]): message = {"command": command_id, "args": arguments} await self.send(message)
 
- class Protocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
- 
Expand source codeclass Protocol(metaclass=ABCMeta): def __init__(self, reader: StreamReader, writer: StreamWriter): self.reader = reader self.writer = writer # Force calls to drain() to only return once the data has been sent self.writer.transport.set_write_buffer_limits(high=0) @staticmethod @abstractmethod def encode_message(message: dict) -> bytes: """ Encode a message as raw bytes. Can be used along with `*_raw` methods. """ pass # pragma: no cover @staticmethod @abstractmethod def decode_message(data: bytes) -> dict: """ Decode a message from raw bytes. """ pass # pragma: no cover def is_connected(self) -> bool: """ Return whether or not the connection is still alive """ return not self.writer.is_closing() @abstractmethod async def read_message(self) -> dict: """ Asynchronously read a message from the stream # Returns The parsed message # Errors May raise `IncompleteReadError`. """ pass # pragma: no cover async def send_message(self, message: dict) -> None: """ Send a single message in the form of a dictionary # Errors May raise `DisconnectedError`. """ await self.send_raw(self.encode_message(message)) async def send_messages(self, messages: list[dict]) -> None: """ Send multiple messages in the form of a list of dictionaries. May be more optimal than sending a single message. # Errors May raise `DisconnectedError`. """ self.write_messages(messages) await self.drain() async def send_raw(self, data: bytes) -> None: """ Send raw bytes. Should generally not be used. # Errors May raise `DisconnectedError`. """ self.write_raw(data) await self.drain() def write_message(self, message: dict) -> None: """ Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players. # Errors May raise `DisconnectedError`. """ if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.write_raw(self.encode_message(message)) def write_messages(self, messages: list[dict]) -> None: """ Write multiple message into the message buffer. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.writelines([self.encode_message(msg) for msg in messages]) def write_raw(self, data: bytes) -> None: """ Write raw bytes into the message buffer. Should generally not be used. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.write(data) def abort(self) -> None: # SelectorTransport only self.writer.transport.abort() async def close(self) -> None: """ Close the underlying writer as soon as the buffer has emptied. # Errors Never raises. Any exceptions that occur while waiting to close are ignored. """ self.writer.close() with contextlib.suppress(Exception): await self.writer.wait_closed() @synchronizedmethod async def drain(self) -> None: """ Await the write buffer to empty. See StreamWriter.drain() # Errors Raises `DisconnectedError` if the client disconnects while waiting for the write buffer to empty. """ # Method needs to be synchronized as drain() cannot be called # concurrently by multiple coroutines: # http://bugs.python.org/issue29930. try: await self.writer.drain() except Exception as e: await self.close() raise DisconnectedError("Protocol connection lost!") from eSubclassesStatic methods- def decode_message(data: bytes) ‑> dict
- 
Expand source code@staticmethod @abstractmethod def decode_message(data: bytes) -> dict: """ Decode a message from raw bytes. """ pass # pragma: no coverDecode a message from raw bytes. 
- def encode_message(message: dict) ‑> bytes
- 
Expand source code@staticmethod @abstractmethod def encode_message(message: dict) -> bytes: """ Encode a message as raw bytes. Can be used along with `*_raw` methods. """ pass # pragma: no coverEncode a message as raw bytes. Can be used along with *_rawmethods.
 Methods- def abort(self) ‑> None
- 
Expand source codedef abort(self) -> None: # SelectorTransport only self.writer.transport.abort()
- async def close(self) ‑> None
- 
Expand source codeasync def close(self) -> None: """ Close the underlying writer as soon as the buffer has emptied. # Errors Never raises. Any exceptions that occur while waiting to close are ignored. """ self.writer.close() with contextlib.suppress(Exception): await self.writer.wait_closed()Close the underlying writer as soon as the buffer has emptied. ErrorsNever raises. Any exceptions that occur while waiting to close are ignored. 
- async def drain(self) ‑> None
- 
Expand source code@synchronizedmethod async def drain(self) -> None: """ Await the write buffer to empty. See StreamWriter.drain() # Errors Raises `DisconnectedError` if the client disconnects while waiting for the write buffer to empty. """ # Method needs to be synchronized as drain() cannot be called # concurrently by multiple coroutines: # http://bugs.python.org/issue29930. try: await self.writer.drain() except Exception as e: await self.close() raise DisconnectedError("Protocol connection lost!") from eAwait the write buffer to empty. See StreamWriter.drain() ErrorsRaises DisconnectedErrorif the client disconnects while waiting for the write buffer to empty.
- def is_connected(self) ‑> bool
- 
Expand source codedef is_connected(self) -> bool: """ Return whether or not the connection is still alive """ return not self.writer.is_closing()Return whether or not the connection is still alive 
- async def read_message(self) ‑> dict
- 
Expand source code@abstractmethod async def read_message(self) -> dict: """ Asynchronously read a message from the stream # Returns The parsed message # Errors May raise `IncompleteReadError`. """ pass # pragma: no coverAsynchronously read a message from the stream ReturnsThe parsed message ErrorsMay raise IncompleteReadError.
- async def send_message(self, message: dict) ‑> None
- 
Expand source codeasync def send_message(self, message: dict) -> None: """ Send a single message in the form of a dictionary # Errors May raise `DisconnectedError`. """ await self.send_raw(self.encode_message(message))
- async def send_messages(self, messages: list[dict]) ‑> None
- 
Expand source codeasync def send_messages(self, messages: list[dict]) -> None: """ Send multiple messages in the form of a list of dictionaries. May be more optimal than sending a single message. # Errors May raise `DisconnectedError`. """ self.write_messages(messages) await self.drain()Send multiple messages in the form of a list of dictionaries. May be more optimal than sending a single message. ErrorsMay raise DisconnectedError.
- async def send_raw(self, data: bytes) ‑> None
- 
Expand source codeasync def send_raw(self, data: bytes) -> None: """ Send raw bytes. Should generally not be used. # Errors May raise `DisconnectedError`. """ self.write_raw(data) await self.drain()
- def write_message(self, message: dict) ‑> None
- 
Expand source codedef write_message(self, message: dict) -> None: """ Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players. # Errors May raise `DisconnectedError`. """ if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.write_raw(self.encode_message(message))Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players. ErrorsMay raise DisconnectedError.
- def write_messages(self, messages: list[dict]) ‑> None
- 
Expand source codedef write_messages(self, messages: list[dict]) -> None: """ Write multiple message into the message buffer. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.writelines([self.encode_message(msg) for msg in messages])
- def write_raw(self, data: bytes) ‑> None
- 
Expand source codedef write_raw(self, data: bytes) -> None: """ Write raw bytes into the message buffer. Should generally not be used. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.write(data)Write raw bytes into the message buffer. Should generally not be used. ErrorsMay raise DisconnectedError.
 
- class QDataStreamProtocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
- 
Expand source code@with_logger class QDataStreamProtocol(Protocol): """ Implements the legacy QDataStream-based encoding scheme """ _logger: ClassVar[logging.Logger] @staticmethod def read_qstring(buffer: bytes, pos: int = 0) -> tuple[int, str]: """ Parse a serialized QString from buffer (A bytes like object) at given position. Requires `len(buffer[pos:]) >= 4`. Pos is added to buffer_pos. # Returns The new buffer position and the message. """ chunk = buffer[pos:pos + 4] rest = buffer[pos + 4:] assert len(chunk) == 4 (size, ) = struct.unpack("!I", chunk) if len(rest) < size: raise ValueError( f"Malformed QString: Claims length {size} " f"but actually {len(rest)}. " f"Entire buffer: {base64.b64encode(buffer).decode()}", ) return size + pos + 4, (buffer[pos + 4:pos + 4 + size]).decode("UTF-16BE") @staticmethod def pack_qstring(message: str) -> bytes: encoded = message.encode("UTF-16BE") return struct.pack("!i", len(encoded)) + encoded @staticmethod def pack_block(block: bytes) -> bytes: return struct.pack("!I", len(block)) + block @staticmethod def read_block(data): buffer_pos = 0 while len(data[buffer_pos:]) > 4: buffer_pos, msg = QDataStreamProtocol.read_qstring(data, buffer_pos) yield msg @staticmethod def pack_message(*args: str) -> bytes: """ For sending a bunch of QStrings packed together in a 'block' """ msg = bytearray() for arg in args: if not isinstance(arg, str): raise NotImplementedError("Only string serialization is supported") msg += QDataStreamProtocol.pack_qstring(arg) return QDataStreamProtocol.pack_block(msg) @staticmethod def encode_message(message: dict) -> bytes: """ Encodes a python object as a block of QStrings """ command = message.get("command") if command == "ping": return PING_MSG elif command == "pong": return PONG_MSG return QDataStreamProtocol.pack_message(json_encoder.encode(message)) @staticmethod def decode_message(data: bytes) -> dict: _, action = QDataStreamProtocol.read_qstring(data) if action in ("PING", "PONG"): return {"command": action.lower()} message = json.loads(action) try: for part in QDataStreamProtocol.read_block(data): try: message_part = json.loads(part) if part != action: message.update(message_part) except (ValueError, TypeError): if "legacy" not in message: message["legacy"] = [] message["legacy"].append(part) except (KeyError, ValueError): pass return message async def read_message(self): """ Read a message from the stream # Errors Raises `IncompleteReadError` on malformed stream. """ try: length, *_ = struct.unpack("!I", await self.reader.readexactly(4)) block = await self.reader.readexactly(length) except IncompleteReadError as e: if self.reader.at_eof() and not e.partial: raise DisconnectedError() # Otherwise reraise raise return QDataStreamProtocol.decode_message(block)Implements the legacy QDataStream-based encoding scheme AncestorsStatic methods- def encode_message(message: dict) ‑> bytes
- 
Expand source code@staticmethod def encode_message(message: dict) -> bytes: """ Encodes a python object as a block of QStrings """ command = message.get("command") if command == "ping": return PING_MSG elif command == "pong": return PONG_MSG return QDataStreamProtocol.pack_message(json_encoder.encode(message))Encodes a python object as a block of QStrings 
- def pack_block(block: bytes) ‑> bytes
- 
Expand source code@staticmethod def pack_block(block: bytes) -> bytes: return struct.pack("!I", len(block)) + block
- def pack_message(*args: str) ‑> bytes
- 
Expand source code@staticmethod def pack_message(*args: str) -> bytes: """ For sending a bunch of QStrings packed together in a 'block' """ msg = bytearray() for arg in args: if not isinstance(arg, str): raise NotImplementedError("Only string serialization is supported") msg += QDataStreamProtocol.pack_qstring(arg) return QDataStreamProtocol.pack_block(msg)For sending a bunch of QStrings packed together in a 'block' 
- def pack_qstring(message: str) ‑> bytes
- 
Expand source code@staticmethod def pack_qstring(message: str) -> bytes: encoded = message.encode("UTF-16BE") return struct.pack("!i", len(encoded)) + encoded
- def read_block(data)
- 
Expand source code@staticmethod def read_block(data): buffer_pos = 0 while len(data[buffer_pos:]) > 4: buffer_pos, msg = QDataStreamProtocol.read_qstring(data, buffer_pos) yield msg
- def read_qstring(buffer: bytes, pos: int = 0) ‑> tuple[int, str]
- 
Expand source code@staticmethod def read_qstring(buffer: bytes, pos: int = 0) -> tuple[int, str]: """ Parse a serialized QString from buffer (A bytes like object) at given position. Requires `len(buffer[pos:]) >= 4`. Pos is added to buffer_pos. # Returns The new buffer position and the message. """ chunk = buffer[pos:pos + 4] rest = buffer[pos + 4:] assert len(chunk) == 4 (size, ) = struct.unpack("!I", chunk) if len(rest) < size: raise ValueError( f"Malformed QString: Claims length {size} " f"but actually {len(rest)}. " f"Entire buffer: {base64.b64encode(buffer).decode()}", ) return size + pos + 4, (buffer[pos + 4:pos + 4 + size]).decode("UTF-16BE")Parse a serialized QString from buffer (A bytes like object) at given position. Requires len(buffer[pos:]) >= 4.Pos is added to buffer_pos. ReturnsThe new buffer position and the message. 
 Methods- async def read_message(self)
- 
Expand source codeasync def read_message(self): """ Read a message from the stream # Errors Raises `IncompleteReadError` on malformed stream. """ try: length, *_ = struct.unpack("!I", await self.reader.readexactly(4)) block = await self.reader.readexactly(length) except IncompleteReadError as e: if self.reader.at_eof() and not e.partial: raise DisconnectedError() # Otherwise reraise raise return QDataStreamProtocol.decode_message(block)Read a message from the stream ErrorsRaises IncompleteReadErroron malformed stream.
 Inherited members
- class SimpleJsonProtocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
- 
Expand source codeclass SimpleJsonProtocol(Protocol): @staticmethod def encode_message(message: dict) -> bytes: return (json_encoder.encode(message) + "\n").encode() @staticmethod def decode_message(data: bytes) -> dict: return json.loads(data.strip()) async def read_message(self) -> dict: line = await self.reader.readline() if not line: raise DisconnectedError() return SimpleJsonProtocol.decode_message(line)AncestorsInherited members