Module server.protocol

Protocol format definitions

Sub-modules

server.protocol.gpgnet
server.protocol.protocol
server.protocol.qdatastream
server.protocol.simple_json

Classes

class DisconnectedError (*args, **kwargs)

For signaling that a protocol has lost connection to the remote.

Expand source code
class DisconnectedError(ConnectionError):
    """For signaling that a protocol has lost connection to the remote."""

Ancestors

  • builtins.ConnectionError
  • builtins.OSError
  • builtins.Exception
  • builtins.BaseException
class GpgNetClientProtocol
Expand source code
class GpgNetClientProtocol(metaclass=ABCMeta):
    def send_GameState(self, arguments: list[Union[int, str, bool]]) -> None:
        """
        Sent by the client when the state of LobbyComm changes
        """
        self.send_gpgnet_message("GameState", arguments)

    @abstractmethod
    def send_gpgnet_message(self, command_id, arguments: list[Union[int, str, bool]]) -> None:
        pass  # pragma: no cover

Methods

def send_GameState(self, arguments: list[typing.Union[int, str, bool]]) ‑> None

Sent by the client when the state of LobbyComm changes

def send_gpgnet_message(self, command_id, arguments: list[typing.Union[int, str, bool]]) ‑> None
class GpgNetServerProtocol

Defines an interface for the server side GPGNet protocol

Expand source code
class GpgNetServerProtocol(metaclass=ABCMeta):
    """
    Defines an interface for the server side GPGNet protocol
    """
    async def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool):
        """
        Tells a client that has a listening LobbyComm instance to connect to the
        given peer

        # Params
        - `player_name`: Remote player name
        - `player_uid`: Remote player identifier
        """
        await self.send_gpgnet_message("ConnectToPeer", [player_name, player_uid, offer])

    async def send_JoinGame(self, remote_player_name: str, remote_player_uid: int):
        """
        Tells the game to join the given peer by ID
        """
        await self.send_gpgnet_message("JoinGame", [remote_player_name, remote_player_uid])

    async def send_HostGame(self, map_path: str):
        """
        Tells the game to start listening for incoming connections as a host

        # Params
        - `map_path`: Which scenario to use
        """
        await self.send_gpgnet_message("HostGame", [str(map_path)])

    async def send_DisconnectFromPeer(self, id: int):
        """
        Instructs the game to disconnect from the peer given by id
        """
        await self.send_gpgnet_message("DisconnectFromPeer", [id])

    async def send_gpgnet_message(self, command_id: str, arguments: list[Union[int, str, bool]]):
        message = {"command": command_id, "args": arguments}
        await self.send(message)

    @abstractmethod
    async def send(self, message):
        pass  # pragma: no cover

Subclasses

Methods

async def send(self, message)
async def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool)

Tells a client that has a listening LobbyComm instance to connect to the given peer

Params

  • player_name: Remote player name
  • player_uid: Remote player identifier
async def send_DisconnectFromPeer(self, id: int)

Instructs the game to disconnect from the peer given by id

async def send_HostGame(self, map_path: str)

Tells the game to start listening for incoming connections as a host

Params

  • map_path: Which scenario to use
async def send_JoinGame(self, remote_player_name: str, remote_player_uid: int)

Tells the game to join the given peer by ID

async def send_gpgnet_message(self, command_id: str, arguments: list[typing.Union[int, str, bool]])
class Protocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
Expand source code
class Protocol(metaclass=ABCMeta):
    def __init__(self, reader: StreamReader, writer: StreamWriter):
        self.reader = reader
        self.writer = writer
        # Force calls to drain() to only return once the data has been sent
        self.writer.transport.set_write_buffer_limits(high=0)

    @staticmethod
    @abstractmethod
    def encode_message(message: dict) -> bytes:
        """
        Encode a message as raw bytes. Can be used along with `*_raw` methods.
        """
        pass  # pragma: no cover

    @staticmethod
    @abstractmethod
    def decode_message(data: bytes) -> dict:
        """
        Decode a message from raw bytes.
        """
        pass  # pragma: no cover

    def is_connected(self) -> bool:
        """
        Return whether or not the connection is still alive
        """
        return not self.writer.is_closing()

    @abstractmethod
    async def read_message(self) -> dict:
        """
        Asynchronously read a message from the stream

        # Returns
        The parsed message

        # Errors
        May raise `IncompleteReadError`.
        """
        pass  # pragma: no cover

    async def send_message(self, message: dict) -> None:
        """
        Send a single message in the form of a dictionary

        # Errors
        May raise `DisconnectedError`.
        """
        await self.send_raw(self.encode_message(message))

    async def send_messages(self, messages: list[dict]) -> None:
        """
        Send multiple messages in the form of a list of dictionaries.

        May be more optimal than sending a single message.

        # Errors
        May raise `DisconnectedError`.
        """
        self.write_messages(messages)
        await self.drain()

    async def send_raw(self, data: bytes) -> None:
        """
        Send raw bytes. Should generally not be used.

        # Errors
        May raise `DisconnectedError`.
        """
        self.write_raw(data)
        await self.drain()

    def write_message(self, message: dict) -> None:
        """
        Write a single message into the message buffer. Should be used when
        sending broadcasts or when sending messages that are triggered by
        incoming messages from other players.

        # Errors
        May raise `DisconnectedError`.
        """
        if not self.is_connected():
            raise DisconnectedError("Protocol is not connected!")

        self.write_raw(self.encode_message(message))

    def write_messages(self, messages: list[dict]) -> None:
        """
        Write multiple message into the message buffer.

        # Errors
        May raise `DisconnectedError`.
        """
        metrics.sent_messages.labels(self.__class__.__name__).inc()
        if not self.is_connected():
            raise DisconnectedError("Protocol is not connected!")

        self.writer.writelines([self.encode_message(msg) for msg in messages])

    def write_raw(self, data: bytes) -> None:
        """
        Write raw bytes into the message buffer. Should generally not be used.

        # Errors
        May raise `DisconnectedError`.
        """
        metrics.sent_messages.labels(self.__class__.__name__).inc()
        if not self.is_connected():
            raise DisconnectedError("Protocol is not connected!")

        self.writer.write(data)

    def abort(self) -> None:
        # SelectorTransport only
        self.writer.transport.abort()

    async def close(self) -> None:
        """
        Close the underlying writer as soon as the buffer has emptied.

        # Errors
        Never raises. Any exceptions that occur while waiting to close are
        ignored.
        """
        self.writer.close()
        with contextlib.suppress(Exception):
            await self.writer.wait_closed()

    @synchronizedmethod
    async def drain(self) -> None:
        """
        Await the write buffer to empty.
        See StreamWriter.drain()

        # Errors
        Raises `DisconnectedError` if the client disconnects while waiting for
        the write buffer to empty.
        """
        # Method needs to be synchronized as drain() cannot be called
        # concurrently by multiple coroutines:
        # http://bugs.python.org/issue29930.
        try:
            await self.writer.drain()
        except Exception as e:
            await self.close()
            raise DisconnectedError("Protocol connection lost!") from e

Subclasses

Static methods

def decode_message(data: bytes) ‑> dict

Decode a message from raw bytes.

def encode_message(message: dict) ‑> bytes

Encode a message as raw bytes. Can be used along with *_raw methods.

Methods

def abort(self) ‑> None
async def close(self) ‑> None

Close the underlying writer as soon as the buffer has emptied.

Errors

Never raises. Any exceptions that occur while waiting to close are ignored.

async def drain(self) ‑> None

Await the write buffer to empty. See StreamWriter.drain()

Errors

Raises DisconnectedError if the client disconnects while waiting for the write buffer to empty.

def is_connected(self) ‑> bool

Return whether or not the connection is still alive

async def read_message(self) ‑> dict

Asynchronously read a message from the stream

Returns

The parsed message

Errors

May raise IncompleteReadError.

async def send_message(self, message: dict) ‑> None

Send a single message in the form of a dictionary

Errors

May raise DisconnectedError.

async def send_messages(self, messages: list[dict]) ‑> None

Send multiple messages in the form of a list of dictionaries.

May be more optimal than sending a single message.

Errors

May raise DisconnectedError.

async def send_raw(self, data: bytes) ‑> None

Send raw bytes. Should generally not be used.

Errors

May raise DisconnectedError.

def write_message(self, message: dict) ‑> None

Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players.

Errors

May raise DisconnectedError.

def write_messages(self, messages: list[dict]) ‑> None

Write multiple message into the message buffer.

Errors

May raise DisconnectedError.

def write_raw(self, data: bytes) ‑> None

Write raw bytes into the message buffer. Should generally not be used.

Errors

May raise DisconnectedError.

class QDataStreamProtocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)

Implements the legacy QDataStream-based encoding scheme

Expand source code
@with_logger
class QDataStreamProtocol(Protocol):
    """
    Implements the legacy QDataStream-based encoding scheme
    """

    @staticmethod
    def read_qstring(buffer: bytes, pos: int = 0) -> tuple[int, str]:
        """
        Parse a serialized QString from buffer (A bytes like object) at given
        position.

        Requires `len(buffer[pos:]) >= 4`.

        Pos is added to buffer_pos.

        # Returns
        The new buffer position and the message.
        """
        chunk = buffer[pos:pos + 4]
        rest = buffer[pos + 4:]
        assert len(chunk) == 4

        (size, ) = struct.unpack("!I", chunk)
        if len(rest) < size:
            raise ValueError(
                "Malformed QString: Claims length {} but actually {}. Entire buffer: {}"
                .format(size, len(rest), base64.b64encode(buffer)))
        return size + pos + 4, (buffer[pos + 4:pos + 4 + size]).decode("UTF-16BE")

    @staticmethod
    def pack_qstring(message: str) -> bytes:
        encoded = message.encode("UTF-16BE")
        return struct.pack("!i", len(encoded)) + encoded

    @staticmethod
    def pack_block(block: bytes) -> bytes:
        return struct.pack("!I", len(block)) + block

    @staticmethod
    def read_block(data):
        buffer_pos = 0
        while len(data[buffer_pos:]) > 4:
            buffer_pos, msg = QDataStreamProtocol.read_qstring(data, buffer_pos)
            yield msg

    @staticmethod
    def pack_message(*args: str) -> bytes:
        """
        For sending a bunch of QStrings packed together in a 'block'
        """
        msg = bytearray()
        for arg in args:
            if not isinstance(arg, str):
                raise NotImplementedError("Only string serialization is supported")

            msg += QDataStreamProtocol.pack_qstring(arg)
        return QDataStreamProtocol.pack_block(msg)

    @staticmethod
    def encode_message(message: dict) -> bytes:
        """
        Encodes a python object as a block of QStrings
        """
        command = message.get("command")
        if command == "ping":
            return PING_MSG
        elif command == "pong":
            return PONG_MSG

        return QDataStreamProtocol.pack_message(json_encoder.encode(message))

    @staticmethod
    def decode_message(data: bytes) -> dict:
        _, action = QDataStreamProtocol.read_qstring(data)
        if action in ("PING", "PONG"):
            return {"command": action.lower()}

        message = json.loads(action)
        try:
            for part in QDataStreamProtocol.read_block(data):
                try:
                    message_part = json.loads(part)
                    if part != action:
                        message.update(message_part)
                except (ValueError, TypeError):
                    if "legacy" not in message:
                        message["legacy"] = []
                    message["legacy"].append(part)
        except (KeyError, ValueError):
            pass
        return message

    async def read_message(self):
        """
        Read a message from the stream

        # Errors
        Raises `IncompleteReadError` on malformed stream.
        """
        try:
            length, *_ = struct.unpack("!I", await self.reader.readexactly(4))
            block = await self.reader.readexactly(length)
        except IncompleteReadError as e:
            if self.reader.at_eof() and not e.partial:
                raise DisconnectedError()
            # Otherwise reraise
            raise

        return QDataStreamProtocol.decode_message(block)

Ancestors

Static methods

def encode_message(message: dict) ‑> bytes

Encodes a python object as a block of QStrings

def pack_block(block: bytes) ‑> bytes
def pack_message(*args: str) ‑> bytes

For sending a bunch of QStrings packed together in a 'block'

def pack_qstring(message: str) ‑> bytes
def read_block(data)
def read_qstring(buffer: bytes, pos: int = 0) ‑> tuple[int, str]

Parse a serialized QString from buffer (A bytes like object) at given position.

Requires len(buffer[pos:]) >= 4.

Pos is added to buffer_pos.

Returns

The new buffer position and the message.

Methods

async def read_message(self)

Read a message from the stream

Errors

Raises IncompleteReadError on malformed stream.

Inherited members

class SimpleJsonProtocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
Expand source code
class SimpleJsonProtocol(Protocol):
    @staticmethod
    def encode_message(message: dict) -> bytes:
        return (json_encoder.encode(message) + "\n").encode()

    @staticmethod
    def decode_message(data: bytes) -> dict:
        return json.loads(data.strip())

    async def read_message(self) -> dict:
        line = await self.reader.readline()
        if not line:
            raise DisconnectedError()
        return SimpleJsonProtocol.decode_message(line)

Ancestors

Inherited members