Module server.protocol
Protocol format definitions
Every message sent between the lobby server and a client is expected to
deserialize to a python dictionary. The structure of that dictionary depends on
the application logic (see server.lobbyconnection
).
This module defines the classes that handle the wire format, i.e. how messages are serialized to bytes and sent across the network.
Sub-modules
server.protocol.gpgnet
server.protocol.protocol
-
Protocol base class
server.protocol.qdatastream
-
DEPRECATED: Legacy QDataStream (UTF-16, BigEndian) encoded data format …
server.protocol.simple_json
-
A simple newline terminated JSON data format …
Classes
class DisconnectedError (*args, **kwargs)
-
Expand source code
class DisconnectedError(ConnectionError): """For signaling that a protocol has lost connection to the remote."""
For signaling that a protocol has lost connection to the remote.
Ancestors
- builtins.ConnectionError
- builtins.OSError
- builtins.Exception
- builtins.BaseException
class GpgNetClientProtocol
-
Expand source code
class GpgNetClientProtocol(metaclass=ABCMeta): def send_GameState(self, arguments: list[Union[int, str, bool]]) -> None: """ Sent by the client when the state of LobbyComm changes """ self.send_gpgnet_message("GameState", arguments) @abstractmethod def send_gpgnet_message(self, command_id, arguments: list[Union[int, str, bool]]) -> None: pass # pragma: no cover
Methods
def send_GameState(self, arguments: list[int | str | bool]) ‑> None
-
Expand source code
def send_GameState(self, arguments: list[Union[int, str, bool]]) -> None: """ Sent by the client when the state of LobbyComm changes """ self.send_gpgnet_message("GameState", arguments)
Sent by the client when the state of LobbyComm changes
def send_gpgnet_message(self, command_id, arguments: list[int | str | bool]) ‑> None
-
Expand source code
@abstractmethod def send_gpgnet_message(self, command_id, arguments: list[Union[int, str, bool]]) -> None: pass # pragma: no cover
class GpgNetServerProtocol
-
Expand source code
class GpgNetServerProtocol(metaclass=ABCMeta): """ Defines an interface for the server side GPGNet protocol """ async def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool): """ Tells a client that has a listening LobbyComm instance to connect to the given peer # Params - `player_name`: Remote player name - `player_uid`: Remote player identifier """ await self.send_gpgnet_message("ConnectToPeer", [player_name, player_uid, offer]) async def send_JoinGame(self, remote_player_name: str, remote_player_uid: int): """ Tells the game to join the given peer by ID """ await self.send_gpgnet_message("JoinGame", [remote_player_name, remote_player_uid]) async def send_HostGame(self, map_path: str): """ Tells the game to start listening for incoming connections as a host # Params - `map_path`: Which scenario to use """ await self.send_gpgnet_message("HostGame", [str(map_path)]) async def send_DisconnectFromPeer(self, id: int): """ Instructs the game to disconnect from the peer given by id """ await self.send_gpgnet_message("DisconnectFromPeer", [id]) async def send_gpgnet_message(self, command_id: str, arguments: list[Union[int, str, bool]]): message = {"command": command_id, "args": arguments} await self.send(message) @abstractmethod async def send(self, message): pass # pragma: no cover
Defines an interface for the server side GPGNet protocol
Subclasses
Methods
async def send(self, message)
-
Expand source code
@abstractmethod async def send(self, message): pass # pragma: no cover
async def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool)
-
Expand source code
async def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool): """ Tells a client that has a listening LobbyComm instance to connect to the given peer # Params - `player_name`: Remote player name - `player_uid`: Remote player identifier """ await self.send_gpgnet_message("ConnectToPeer", [player_name, player_uid, offer])
Tells a client that has a listening LobbyComm instance to connect to the given peer
Params
player_name
: Remote player nameplayer_uid
: Remote player identifier
async def send_DisconnectFromPeer(self, id: int)
-
Expand source code
async def send_DisconnectFromPeer(self, id: int): """ Instructs the game to disconnect from the peer given by id """ await self.send_gpgnet_message("DisconnectFromPeer", [id])
Instructs the game to disconnect from the peer given by id
async def send_HostGame(self, map_path: str)
-
Expand source code
async def send_HostGame(self, map_path: str): """ Tells the game to start listening for incoming connections as a host # Params - `map_path`: Which scenario to use """ await self.send_gpgnet_message("HostGame", [str(map_path)])
Tells the game to start listening for incoming connections as a host
Params
map_path
: Which scenario to use
async def send_JoinGame(self, remote_player_name: str, remote_player_uid: int)
-
Expand source code
async def send_JoinGame(self, remote_player_name: str, remote_player_uid: int): """ Tells the game to join the given peer by ID """ await self.send_gpgnet_message("JoinGame", [remote_player_name, remote_player_uid])
Tells the game to join the given peer by ID
async def send_gpgnet_message(self, command_id: str, arguments: list[int | str | bool])
-
Expand source code
async def send_gpgnet_message(self, command_id: str, arguments: list[Union[int, str, bool]]): message = {"command": command_id, "args": arguments} await self.send(message)
class Protocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
-
Expand source code
class Protocol(metaclass=ABCMeta): def __init__(self, reader: StreamReader, writer: StreamWriter): self.reader = reader self.writer = writer # Force calls to drain() to only return once the data has been sent self.writer.transport.set_write_buffer_limits(high=0) @staticmethod @abstractmethod def encode_message(message: dict) -> bytes: """ Encode a message as raw bytes. Can be used along with `*_raw` methods. """ pass # pragma: no cover @staticmethod @abstractmethod def decode_message(data: bytes) -> dict: """ Decode a message from raw bytes. """ pass # pragma: no cover def is_connected(self) -> bool: """ Return whether or not the connection is still alive """ return not self.writer.is_closing() @abstractmethod async def read_message(self) -> dict: """ Asynchronously read a message from the stream # Returns The parsed message # Errors May raise `IncompleteReadError`. """ pass # pragma: no cover async def send_message(self, message: dict) -> None: """ Send a single message in the form of a dictionary # Errors May raise `DisconnectedError`. """ await self.send_raw(self.encode_message(message)) async def send_messages(self, messages: list[dict]) -> None: """ Send multiple messages in the form of a list of dictionaries. May be more optimal than sending a single message. # Errors May raise `DisconnectedError`. """ self.write_messages(messages) await self.drain() async def send_raw(self, data: bytes) -> None: """ Send raw bytes. Should generally not be used. # Errors May raise `DisconnectedError`. """ self.write_raw(data) await self.drain() def write_message(self, message: dict) -> None: """ Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players. # Errors May raise `DisconnectedError`. """ if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.write_raw(self.encode_message(message)) def write_messages(self, messages: list[dict]) -> None: """ Write multiple message into the message buffer. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.writelines([self.encode_message(msg) for msg in messages]) def write_raw(self, data: bytes) -> None: """ Write raw bytes into the message buffer. Should generally not be used. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.write(data) def abort(self) -> None: # SelectorTransport only self.writer.transport.abort() async def close(self) -> None: """ Close the underlying writer as soon as the buffer has emptied. # Errors Never raises. Any exceptions that occur while waiting to close are ignored. """ self.writer.close() with contextlib.suppress(Exception): await self.writer.wait_closed() @synchronizedmethod async def drain(self) -> None: """ Await the write buffer to empty. See StreamWriter.drain() # Errors Raises `DisconnectedError` if the client disconnects while waiting for the write buffer to empty. """ # Method needs to be synchronized as drain() cannot be called # concurrently by multiple coroutines: # http://bugs.python.org/issue29930. try: await self.writer.drain() except Exception as e: await self.close() raise DisconnectedError("Protocol connection lost!") from e
Subclasses
Static methods
def decode_message(data: bytes) ‑> dict
-
Expand source code
@staticmethod @abstractmethod def decode_message(data: bytes) -> dict: """ Decode a message from raw bytes. """ pass # pragma: no cover
Decode a message from raw bytes.
def encode_message(message: dict) ‑> bytes
-
Expand source code
@staticmethod @abstractmethod def encode_message(message: dict) -> bytes: """ Encode a message as raw bytes. Can be used along with `*_raw` methods. """ pass # pragma: no cover
Encode a message as raw bytes. Can be used along with
*_raw
methods.
Methods
def abort(self) ‑> None
-
Expand source code
def abort(self) -> None: # SelectorTransport only self.writer.transport.abort()
async def close(self) ‑> None
-
Expand source code
async def close(self) -> None: """ Close the underlying writer as soon as the buffer has emptied. # Errors Never raises. Any exceptions that occur while waiting to close are ignored. """ self.writer.close() with contextlib.suppress(Exception): await self.writer.wait_closed()
Close the underlying writer as soon as the buffer has emptied.
Errors
Never raises. Any exceptions that occur while waiting to close are ignored.
async def drain(self) ‑> None
-
Expand source code
@synchronizedmethod async def drain(self) -> None: """ Await the write buffer to empty. See StreamWriter.drain() # Errors Raises `DisconnectedError` if the client disconnects while waiting for the write buffer to empty. """ # Method needs to be synchronized as drain() cannot be called # concurrently by multiple coroutines: # http://bugs.python.org/issue29930. try: await self.writer.drain() except Exception as e: await self.close() raise DisconnectedError("Protocol connection lost!") from e
Await the write buffer to empty. See StreamWriter.drain()
Errors
Raises
DisconnectedError
if the client disconnects while waiting for the write buffer to empty. def is_connected(self) ‑> bool
-
Expand source code
def is_connected(self) -> bool: """ Return whether or not the connection is still alive """ return not self.writer.is_closing()
Return whether or not the connection is still alive
async def read_message(self) ‑> dict
-
Expand source code
@abstractmethod async def read_message(self) -> dict: """ Asynchronously read a message from the stream # Returns The parsed message # Errors May raise `IncompleteReadError`. """ pass # pragma: no cover
Asynchronously read a message from the stream
Returns
The parsed message
Errors
May raise
IncompleteReadError
. async def send_message(self, message: dict) ‑> None
-
Expand source code
async def send_message(self, message: dict) -> None: """ Send a single message in the form of a dictionary # Errors May raise `DisconnectedError`. """ await self.send_raw(self.encode_message(message))
async def send_messages(self, messages: list[dict]) ‑> None
-
Expand source code
async def send_messages(self, messages: list[dict]) -> None: """ Send multiple messages in the form of a list of dictionaries. May be more optimal than sending a single message. # Errors May raise `DisconnectedError`. """ self.write_messages(messages) await self.drain()
Send multiple messages in the form of a list of dictionaries.
May be more optimal than sending a single message.
Errors
May raise
DisconnectedError
. async def send_raw(self, data: bytes) ‑> None
-
Expand source code
async def send_raw(self, data: bytes) -> None: """ Send raw bytes. Should generally not be used. # Errors May raise `DisconnectedError`. """ self.write_raw(data) await self.drain()
def write_message(self, message: dict) ‑> None
-
Expand source code
def write_message(self, message: dict) -> None: """ Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players. # Errors May raise `DisconnectedError`. """ if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.write_raw(self.encode_message(message))
Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players.
Errors
May raise
DisconnectedError
. def write_messages(self, messages: list[dict]) ‑> None
-
Expand source code
def write_messages(self, messages: list[dict]) -> None: """ Write multiple message into the message buffer. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.writelines([self.encode_message(msg) for msg in messages])
def write_raw(self, data: bytes) ‑> None
-
Expand source code
def write_raw(self, data: bytes) -> None: """ Write raw bytes into the message buffer. Should generally not be used. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.write(data)
Write raw bytes into the message buffer. Should generally not be used.
Errors
May raise
DisconnectedError
.
class QDataStreamProtocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
-
Expand source code
@with_logger class QDataStreamProtocol(Protocol): """ Implements the legacy QDataStream-based encoding scheme """ _logger: ClassVar[logging.Logger] @staticmethod def read_qstring(buffer: bytes, pos: int = 0) -> tuple[int, str]: """ Parse a serialized QString from buffer (A bytes like object) at given position. Requires `len(buffer[pos:]) >= 4`. Pos is added to buffer_pos. # Returns The new buffer position and the message. """ chunk = buffer[pos:pos + 4] rest = buffer[pos + 4:] assert len(chunk) == 4 (size, ) = struct.unpack("!I", chunk) if len(rest) < size: raise ValueError( f"Malformed QString: Claims length {size} " f"but actually {len(rest)}. " f"Entire buffer: {base64.b64encode(buffer).decode()}", ) return size + pos + 4, (buffer[pos + 4:pos + 4 + size]).decode("UTF-16BE") @staticmethod def pack_qstring(message: str) -> bytes: encoded = message.encode("UTF-16BE") return struct.pack("!i", len(encoded)) + encoded @staticmethod def pack_block(block: bytes) -> bytes: return struct.pack("!I", len(block)) + block @staticmethod def read_block(data): buffer_pos = 0 while len(data[buffer_pos:]) > 4: buffer_pos, msg = QDataStreamProtocol.read_qstring(data, buffer_pos) yield msg @staticmethod def pack_message(*args: str) -> bytes: """ For sending a bunch of QStrings packed together in a 'block' """ msg = bytearray() for arg in args: if not isinstance(arg, str): raise NotImplementedError("Only string serialization is supported") msg += QDataStreamProtocol.pack_qstring(arg) return QDataStreamProtocol.pack_block(msg) @staticmethod def encode_message(message: dict) -> bytes: """ Encodes a python object as a block of QStrings """ command = message.get("command") if command == "ping": return PING_MSG elif command == "pong": return PONG_MSG return QDataStreamProtocol.pack_message(json_encoder.encode(message)) @staticmethod def decode_message(data: bytes) -> dict: _, action = QDataStreamProtocol.read_qstring(data) if action in ("PING", "PONG"): return {"command": action.lower()} message = json.loads(action) try: for part in QDataStreamProtocol.read_block(data): try: message_part = json.loads(part) if part != action: message.update(message_part) except (ValueError, TypeError): if "legacy" not in message: message["legacy"] = [] message["legacy"].append(part) except (KeyError, ValueError): pass return message async def read_message(self): """ Read a message from the stream # Errors Raises `IncompleteReadError` on malformed stream. """ try: length, *_ = struct.unpack("!I", await self.reader.readexactly(4)) block = await self.reader.readexactly(length) except IncompleteReadError as e: if self.reader.at_eof() and not e.partial: raise DisconnectedError() # Otherwise reraise raise return QDataStreamProtocol.decode_message(block)
Implements the legacy QDataStream-based encoding scheme
Ancestors
Static methods
def encode_message(message: dict) ‑> bytes
-
Expand source code
@staticmethod def encode_message(message: dict) -> bytes: """ Encodes a python object as a block of QStrings """ command = message.get("command") if command == "ping": return PING_MSG elif command == "pong": return PONG_MSG return QDataStreamProtocol.pack_message(json_encoder.encode(message))
Encodes a python object as a block of QStrings
def pack_block(block: bytes) ‑> bytes
-
Expand source code
@staticmethod def pack_block(block: bytes) -> bytes: return struct.pack("!I", len(block)) + block
def pack_message(*args: str) ‑> bytes
-
Expand source code
@staticmethod def pack_message(*args: str) -> bytes: """ For sending a bunch of QStrings packed together in a 'block' """ msg = bytearray() for arg in args: if not isinstance(arg, str): raise NotImplementedError("Only string serialization is supported") msg += QDataStreamProtocol.pack_qstring(arg) return QDataStreamProtocol.pack_block(msg)
For sending a bunch of QStrings packed together in a 'block'
def pack_qstring(message: str) ‑> bytes
-
Expand source code
@staticmethod def pack_qstring(message: str) -> bytes: encoded = message.encode("UTF-16BE") return struct.pack("!i", len(encoded)) + encoded
def read_block(data)
-
Expand source code
@staticmethod def read_block(data): buffer_pos = 0 while len(data[buffer_pos:]) > 4: buffer_pos, msg = QDataStreamProtocol.read_qstring(data, buffer_pos) yield msg
def read_qstring(buffer: bytes, pos: int = 0) ‑> tuple[int, str]
-
Expand source code
@staticmethod def read_qstring(buffer: bytes, pos: int = 0) -> tuple[int, str]: """ Parse a serialized QString from buffer (A bytes like object) at given position. Requires `len(buffer[pos:]) >= 4`. Pos is added to buffer_pos. # Returns The new buffer position and the message. """ chunk = buffer[pos:pos + 4] rest = buffer[pos + 4:] assert len(chunk) == 4 (size, ) = struct.unpack("!I", chunk) if len(rest) < size: raise ValueError( f"Malformed QString: Claims length {size} " f"but actually {len(rest)}. " f"Entire buffer: {base64.b64encode(buffer).decode()}", ) return size + pos + 4, (buffer[pos + 4:pos + 4 + size]).decode("UTF-16BE")
Parse a serialized QString from buffer (A bytes like object) at given position.
Requires
len(buffer[pos:]) >= 4
.Pos is added to buffer_pos.
Returns
The new buffer position and the message.
Methods
async def read_message(self)
-
Expand source code
async def read_message(self): """ Read a message from the stream # Errors Raises `IncompleteReadError` on malformed stream. """ try: length, *_ = struct.unpack("!I", await self.reader.readexactly(4)) block = await self.reader.readexactly(length) except IncompleteReadError as e: if self.reader.at_eof() and not e.partial: raise DisconnectedError() # Otherwise reraise raise return QDataStreamProtocol.decode_message(block)
Read a message from the stream
Errors
Raises
IncompleteReadError
on malformed stream.
Inherited members
class SimpleJsonProtocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
-
Expand source code
class SimpleJsonProtocol(Protocol): @staticmethod def encode_message(message: dict) -> bytes: return (json_encoder.encode(message) + "\n").encode() @staticmethod def decode_message(data: bytes) -> dict: return json.loads(data.strip()) async def read_message(self) -> dict: line = await self.reader.readline() if not line: raise DisconnectedError() return SimpleJsonProtocol.decode_message(line)
Ancestors
Inherited members