Module server.protocol
Protocol format definitions
Sub-modules
server.protocol.gpgnet
server.protocol.protocol
server.protocol.qdatastream
server.protocol.simple_json
Classes
class DisconnectedError (*args, **kwargs)
-
Expand source code
class DisconnectedError(ConnectionError): """For signaling that a protocol has lost connection to the remote."""
For signaling that a protocol has lost connection to the remote.
Ancestors
- builtins.ConnectionError
- builtins.OSError
- builtins.Exception
- builtins.BaseException
class GpgNetClientProtocol
-
Expand source code
class GpgNetClientProtocol(metaclass=ABCMeta): def send_GameState(self, arguments: list[Union[int, str, bool]]) -> None: """ Sent by the client when the state of LobbyComm changes """ self.send_gpgnet_message("GameState", arguments) @abstractmethod def send_gpgnet_message(self, command_id, arguments: list[Union[int, str, bool]]) -> None: pass # pragma: no cover
Methods
def send_GameState(self, arguments: list[int | str | bool]) ‑> None
-
Expand source code
def send_GameState(self, arguments: list[Union[int, str, bool]]) -> None: """ Sent by the client when the state of LobbyComm changes """ self.send_gpgnet_message("GameState", arguments)
Sent by the client when the state of LobbyComm changes
def send_gpgnet_message(self, command_id, arguments: list[int | str | bool]) ‑> None
-
Expand source code
@abstractmethod def send_gpgnet_message(self, command_id, arguments: list[Union[int, str, bool]]) -> None: pass # pragma: no cover
class GpgNetServerProtocol
-
Expand source code
class GpgNetServerProtocol(metaclass=ABCMeta): """ Defines an interface for the server side GPGNet protocol """ async def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool): """ Tells a client that has a listening LobbyComm instance to connect to the given peer # Params - `player_name`: Remote player name - `player_uid`: Remote player identifier """ await self.send_gpgnet_message("ConnectToPeer", [player_name, player_uid, offer]) async def send_JoinGame(self, remote_player_name: str, remote_player_uid: int): """ Tells the game to join the given peer by ID """ await self.send_gpgnet_message("JoinGame", [remote_player_name, remote_player_uid]) async def send_HostGame(self, map_path: str): """ Tells the game to start listening for incoming connections as a host # Params - `map_path`: Which scenario to use """ await self.send_gpgnet_message("HostGame", [str(map_path)]) async def send_DisconnectFromPeer(self, id: int): """ Instructs the game to disconnect from the peer given by id """ await self.send_gpgnet_message("DisconnectFromPeer", [id]) async def send_gpgnet_message(self, command_id: str, arguments: list[Union[int, str, bool]]): message = {"command": command_id, "args": arguments} await self.send(message) @abstractmethod async def send(self, message): pass # pragma: no cover
Defines an interface for the server side GPGNet protocol
Subclasses
Methods
async def send(self, message)
-
Expand source code
@abstractmethod async def send(self, message): pass # pragma: no cover
async def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool)
-
Expand source code
async def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool): """ Tells a client that has a listening LobbyComm instance to connect to the given peer # Params - `player_name`: Remote player name - `player_uid`: Remote player identifier """ await self.send_gpgnet_message("ConnectToPeer", [player_name, player_uid, offer])
Tells a client that has a listening LobbyComm instance to connect to the given peer
Params
player_name
: Remote player nameplayer_uid
: Remote player identifier
async def send_DisconnectFromPeer(self, id: int)
-
Expand source code
async def send_DisconnectFromPeer(self, id: int): """ Instructs the game to disconnect from the peer given by id """ await self.send_gpgnet_message("DisconnectFromPeer", [id])
Instructs the game to disconnect from the peer given by id
async def send_HostGame(self, map_path: str)
-
Expand source code
async def send_HostGame(self, map_path: str): """ Tells the game to start listening for incoming connections as a host # Params - `map_path`: Which scenario to use """ await self.send_gpgnet_message("HostGame", [str(map_path)])
Tells the game to start listening for incoming connections as a host
Params
map_path
: Which scenario to use
async def send_JoinGame(self, remote_player_name: str, remote_player_uid: int)
-
Expand source code
async def send_JoinGame(self, remote_player_name: str, remote_player_uid: int): """ Tells the game to join the given peer by ID """ await self.send_gpgnet_message("JoinGame", [remote_player_name, remote_player_uid])
Tells the game to join the given peer by ID
async def send_gpgnet_message(self, command_id: str, arguments: list[int | str | bool])
-
Expand source code
async def send_gpgnet_message(self, command_id: str, arguments: list[Union[int, str, bool]]): message = {"command": command_id, "args": arguments} await self.send(message)
class Protocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
-
Expand source code
class Protocol(metaclass=ABCMeta): def __init__(self, reader: StreamReader, writer: StreamWriter): self.reader = reader self.writer = writer # Force calls to drain() to only return once the data has been sent self.writer.transport.set_write_buffer_limits(high=0) @staticmethod @abstractmethod def encode_message(message: dict) -> bytes: """ Encode a message as raw bytes. Can be used along with `*_raw` methods. """ pass # pragma: no cover @staticmethod @abstractmethod def decode_message(data: bytes) -> dict: """ Decode a message from raw bytes. """ pass # pragma: no cover def is_connected(self) -> bool: """ Return whether or not the connection is still alive """ return not self.writer.is_closing() @abstractmethod async def read_message(self) -> dict: """ Asynchronously read a message from the stream # Returns The parsed message # Errors May raise `IncompleteReadError`. """ pass # pragma: no cover async def send_message(self, message: dict) -> None: """ Send a single message in the form of a dictionary # Errors May raise `DisconnectedError`. """ await self.send_raw(self.encode_message(message)) async def send_messages(self, messages: list[dict]) -> None: """ Send multiple messages in the form of a list of dictionaries. May be more optimal than sending a single message. # Errors May raise `DisconnectedError`. """ self.write_messages(messages) await self.drain() async def send_raw(self, data: bytes) -> None: """ Send raw bytes. Should generally not be used. # Errors May raise `DisconnectedError`. """ self.write_raw(data) await self.drain() def write_message(self, message: dict) -> None: """ Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players. # Errors May raise `DisconnectedError`. """ if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.write_raw(self.encode_message(message)) def write_messages(self, messages: list[dict]) -> None: """ Write multiple message into the message buffer. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.writelines([self.encode_message(msg) for msg in messages]) def write_raw(self, data: bytes) -> None: """ Write raw bytes into the message buffer. Should generally not be used. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.write(data) def abort(self) -> None: # SelectorTransport only self.writer.transport.abort() async def close(self) -> None: """ Close the underlying writer as soon as the buffer has emptied. # Errors Never raises. Any exceptions that occur while waiting to close are ignored. """ self.writer.close() with contextlib.suppress(Exception): await self.writer.wait_closed() @synchronizedmethod async def drain(self) -> None: """ Await the write buffer to empty. See StreamWriter.drain() # Errors Raises `DisconnectedError` if the client disconnects while waiting for the write buffer to empty. """ # Method needs to be synchronized as drain() cannot be called # concurrently by multiple coroutines: # http://bugs.python.org/issue29930. try: await self.writer.drain() except Exception as e: await self.close() raise DisconnectedError("Protocol connection lost!") from e
Subclasses
Static methods
def decode_message(data: bytes) ‑> dict
-
Expand source code
@staticmethod @abstractmethod def decode_message(data: bytes) -> dict: """ Decode a message from raw bytes. """ pass # pragma: no cover
Decode a message from raw bytes.
def encode_message(message: dict) ‑> bytes
-
Expand source code
@staticmethod @abstractmethod def encode_message(message: dict) -> bytes: """ Encode a message as raw bytes. Can be used along with `*_raw` methods. """ pass # pragma: no cover
Encode a message as raw bytes. Can be used along with
*_raw
methods.
Methods
def abort(self) ‑> None
-
Expand source code
def abort(self) -> None: # SelectorTransport only self.writer.transport.abort()
async def close(self) ‑> None
-
Expand source code
async def close(self) -> None: """ Close the underlying writer as soon as the buffer has emptied. # Errors Never raises. Any exceptions that occur while waiting to close are ignored. """ self.writer.close() with contextlib.suppress(Exception): await self.writer.wait_closed()
Close the underlying writer as soon as the buffer has emptied.
Errors
Never raises. Any exceptions that occur while waiting to close are ignored.
async def drain(self) ‑> None
-
Expand source code
@synchronizedmethod async def drain(self) -> None: """ Await the write buffer to empty. See StreamWriter.drain() # Errors Raises `DisconnectedError` if the client disconnects while waiting for the write buffer to empty. """ # Method needs to be synchronized as drain() cannot be called # concurrently by multiple coroutines: # http://bugs.python.org/issue29930. try: await self.writer.drain() except Exception as e: await self.close() raise DisconnectedError("Protocol connection lost!") from e
Await the write buffer to empty. See StreamWriter.drain()
Errors
Raises
DisconnectedError
if the client disconnects while waiting for the write buffer to empty. def is_connected(self) ‑> bool
-
Expand source code
def is_connected(self) -> bool: """ Return whether or not the connection is still alive """ return not self.writer.is_closing()
Return whether or not the connection is still alive
async def read_message(self) ‑> dict
-
Expand source code
@abstractmethod async def read_message(self) -> dict: """ Asynchronously read a message from the stream # Returns The parsed message # Errors May raise `IncompleteReadError`. """ pass # pragma: no cover
Asynchronously read a message from the stream
Returns
The parsed message
Errors
May raise
IncompleteReadError
. async def send_message(self, message: dict) ‑> None
-
Expand source code
async def send_message(self, message: dict) -> None: """ Send a single message in the form of a dictionary # Errors May raise `DisconnectedError`. """ await self.send_raw(self.encode_message(message))
async def send_messages(self, messages: list[dict]) ‑> None
-
Expand source code
async def send_messages(self, messages: list[dict]) -> None: """ Send multiple messages in the form of a list of dictionaries. May be more optimal than sending a single message. # Errors May raise `DisconnectedError`. """ self.write_messages(messages) await self.drain()
Send multiple messages in the form of a list of dictionaries.
May be more optimal than sending a single message.
Errors
May raise
DisconnectedError
. async def send_raw(self, data: bytes) ‑> None
-
Expand source code
async def send_raw(self, data: bytes) -> None: """ Send raw bytes. Should generally not be used. # Errors May raise `DisconnectedError`. """ self.write_raw(data) await self.drain()
def write_message(self, message: dict) ‑> None
-
Expand source code
def write_message(self, message: dict) -> None: """ Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players. # Errors May raise `DisconnectedError`. """ if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.write_raw(self.encode_message(message))
Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players.
Errors
May raise
DisconnectedError
. def write_messages(self, messages: list[dict]) ‑> None
-
Expand source code
def write_messages(self, messages: list[dict]) -> None: """ Write multiple message into the message buffer. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.writelines([self.encode_message(msg) for msg in messages])
def write_raw(self, data: bytes) ‑> None
-
Expand source code
def write_raw(self, data: bytes) -> None: """ Write raw bytes into the message buffer. Should generally not be used. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.write(data)
Write raw bytes into the message buffer. Should generally not be used.
Errors
May raise
DisconnectedError
.
class QDataStreamProtocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
-
Expand source code
@with_logger class QDataStreamProtocol(Protocol): """ Implements the legacy QDataStream-based encoding scheme """ @staticmethod def read_qstring(buffer: bytes, pos: int = 0) -> tuple[int, str]: """ Parse a serialized QString from buffer (A bytes like object) at given position. Requires `len(buffer[pos:]) >= 4`. Pos is added to buffer_pos. # Returns The new buffer position and the message. """ chunk = buffer[pos:pos + 4] rest = buffer[pos + 4:] assert len(chunk) == 4 (size, ) = struct.unpack("!I", chunk) if len(rest) < size: raise ValueError( "Malformed QString: Claims length {} but actually {}. Entire buffer: {}" .format(size, len(rest), base64.b64encode(buffer))) return size + pos + 4, (buffer[pos + 4:pos + 4 + size]).decode("UTF-16BE") @staticmethod def pack_qstring(message: str) -> bytes: encoded = message.encode("UTF-16BE") return struct.pack("!i", len(encoded)) + encoded @staticmethod def pack_block(block: bytes) -> bytes: return struct.pack("!I", len(block)) + block @staticmethod def read_block(data): buffer_pos = 0 while len(data[buffer_pos:]) > 4: buffer_pos, msg = QDataStreamProtocol.read_qstring(data, buffer_pos) yield msg @staticmethod def pack_message(*args: str) -> bytes: """ For sending a bunch of QStrings packed together in a 'block' """ msg = bytearray() for arg in args: if not isinstance(arg, str): raise NotImplementedError("Only string serialization is supported") msg += QDataStreamProtocol.pack_qstring(arg) return QDataStreamProtocol.pack_block(msg) @staticmethod def encode_message(message: dict) -> bytes: """ Encodes a python object as a block of QStrings """ command = message.get("command") if command == "ping": return PING_MSG elif command == "pong": return PONG_MSG return QDataStreamProtocol.pack_message(json_encoder.encode(message)) @staticmethod def decode_message(data: bytes) -> dict: _, action = QDataStreamProtocol.read_qstring(data) if action in ("PING", "PONG"): return {"command": action.lower()} message = json.loads(action) try: for part in QDataStreamProtocol.read_block(data): try: message_part = json.loads(part) if part != action: message.update(message_part) except (ValueError, TypeError): if "legacy" not in message: message["legacy"] = [] message["legacy"].append(part) except (KeyError, ValueError): pass return message async def read_message(self): """ Read a message from the stream # Errors Raises `IncompleteReadError` on malformed stream. """ try: length, *_ = struct.unpack("!I", await self.reader.readexactly(4)) block = await self.reader.readexactly(length) except IncompleteReadError as e: if self.reader.at_eof() and not e.partial: raise DisconnectedError() # Otherwise reraise raise return QDataStreamProtocol.decode_message(block)
Implements the legacy QDataStream-based encoding scheme
Ancestors
Static methods
def encode_message(message: dict) ‑> bytes
-
Expand source code
@staticmethod def encode_message(message: dict) -> bytes: """ Encodes a python object as a block of QStrings """ command = message.get("command") if command == "ping": return PING_MSG elif command == "pong": return PONG_MSG return QDataStreamProtocol.pack_message(json_encoder.encode(message))
Encodes a python object as a block of QStrings
def pack_block(block: bytes) ‑> bytes
-
Expand source code
@staticmethod def pack_block(block: bytes) -> bytes: return struct.pack("!I", len(block)) + block
def pack_message(*args: str) ‑> bytes
-
Expand source code
@staticmethod def pack_message(*args: str) -> bytes: """ For sending a bunch of QStrings packed together in a 'block' """ msg = bytearray() for arg in args: if not isinstance(arg, str): raise NotImplementedError("Only string serialization is supported") msg += QDataStreamProtocol.pack_qstring(arg) return QDataStreamProtocol.pack_block(msg)
For sending a bunch of QStrings packed together in a 'block'
def pack_qstring(message: str) ‑> bytes
-
Expand source code
@staticmethod def pack_qstring(message: str) -> bytes: encoded = message.encode("UTF-16BE") return struct.pack("!i", len(encoded)) + encoded
def read_block(data)
-
Expand source code
@staticmethod def read_block(data): buffer_pos = 0 while len(data[buffer_pos:]) > 4: buffer_pos, msg = QDataStreamProtocol.read_qstring(data, buffer_pos) yield msg
def read_qstring(buffer: bytes, pos: int = 0) ‑> tuple[int, str]
-
Expand source code
@staticmethod def read_qstring(buffer: bytes, pos: int = 0) -> tuple[int, str]: """ Parse a serialized QString from buffer (A bytes like object) at given position. Requires `len(buffer[pos:]) >= 4`. Pos is added to buffer_pos. # Returns The new buffer position and the message. """ chunk = buffer[pos:pos + 4] rest = buffer[pos + 4:] assert len(chunk) == 4 (size, ) = struct.unpack("!I", chunk) if len(rest) < size: raise ValueError( "Malformed QString: Claims length {} but actually {}. Entire buffer: {}" .format(size, len(rest), base64.b64encode(buffer))) return size + pos + 4, (buffer[pos + 4:pos + 4 + size]).decode("UTF-16BE")
Parse a serialized QString from buffer (A bytes like object) at given position.
Requires
len(buffer[pos:]) >= 4
.Pos is added to buffer_pos.
Returns
The new buffer position and the message.
Methods
async def read_message(self)
-
Expand source code
async def read_message(self): """ Read a message from the stream # Errors Raises `IncompleteReadError` on malformed stream. """ try: length, *_ = struct.unpack("!I", await self.reader.readexactly(4)) block = await self.reader.readexactly(length) except IncompleteReadError as e: if self.reader.at_eof() and not e.partial: raise DisconnectedError() # Otherwise reraise raise return QDataStreamProtocol.decode_message(block)
Read a message from the stream
Errors
Raises
IncompleteReadError
on malformed stream.
Inherited members
class SimpleJsonProtocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
-
Expand source code
class SimpleJsonProtocol(Protocol): @staticmethod def encode_message(message: dict) -> bytes: return (json_encoder.encode(message) + "\n").encode() @staticmethod def decode_message(data: bytes) -> dict: return json.loads(data.strip()) async def read_message(self) -> dict: line = await self.reader.readline() if not line: raise DisconnectedError() return SimpleJsonProtocol.decode_message(line)
Ancestors
Inherited members