Module server.protocol

Protocol format definitions

Sub-modules

server.protocol.gpgnet
server.protocol.protocol
server.protocol.qdatastream
server.protocol.simple_json

Classes

class DisconnectedError (*args, **kwargs)
Expand source code
class DisconnectedError(ConnectionError):
    """For signaling that a protocol has lost connection to the remote."""

For signaling that a protocol has lost connection to the remote.

Ancestors

  • builtins.ConnectionError
  • builtins.OSError
  • builtins.Exception
  • builtins.BaseException
class GpgNetClientProtocol
Expand source code
class GpgNetClientProtocol(metaclass=ABCMeta):
    def send_GameState(self, arguments: list[Union[int, str, bool]]) -> None:
        """
        Sent by the client when the state of LobbyComm changes
        """
        self.send_gpgnet_message("GameState", arguments)

    @abstractmethod
    def send_gpgnet_message(self, command_id, arguments: list[Union[int, str, bool]]) -> None:
        pass  # pragma: no cover

Methods

def send_GameState(self, arguments: list[int | str | bool]) ‑> None
Expand source code
def send_GameState(self, arguments: list[Union[int, str, bool]]) -> None:
    """
    Sent by the client when the state of LobbyComm changes
    """
    self.send_gpgnet_message("GameState", arguments)

Sent by the client when the state of LobbyComm changes

def send_gpgnet_message(self, command_id, arguments: list[int | str | bool]) ‑> None
Expand source code
@abstractmethod
def send_gpgnet_message(self, command_id, arguments: list[Union[int, str, bool]]) -> None:
    pass  # pragma: no cover
class GpgNetServerProtocol
Expand source code
class GpgNetServerProtocol(metaclass=ABCMeta):
    """
    Defines an interface for the server side GPGNet protocol
    """
    async def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool):
        """
        Tells a client that has a listening LobbyComm instance to connect to the
        given peer

        # Params
        - `player_name`: Remote player name
        - `player_uid`: Remote player identifier
        """
        await self.send_gpgnet_message("ConnectToPeer", [player_name, player_uid, offer])

    async def send_JoinGame(self, remote_player_name: str, remote_player_uid: int):
        """
        Tells the game to join the given peer by ID
        """
        await self.send_gpgnet_message("JoinGame", [remote_player_name, remote_player_uid])

    async def send_HostGame(self, map_path: str):
        """
        Tells the game to start listening for incoming connections as a host

        # Params
        - `map_path`: Which scenario to use
        """
        await self.send_gpgnet_message("HostGame", [str(map_path)])

    async def send_DisconnectFromPeer(self, id: int):
        """
        Instructs the game to disconnect from the peer given by id
        """
        await self.send_gpgnet_message("DisconnectFromPeer", [id])

    async def send_gpgnet_message(self, command_id: str, arguments: list[Union[int, str, bool]]):
        message = {"command": command_id, "args": arguments}
        await self.send(message)

    @abstractmethod
    async def send(self, message):
        pass  # pragma: no cover

Defines an interface for the server side GPGNet protocol

Subclasses

Methods

async def send(self, message)
Expand source code
@abstractmethod
async def send(self, message):
    pass  # pragma: no cover
async def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool)
Expand source code
async def send_ConnectToPeer(self, player_name: str, player_uid: int, offer: bool):
    """
    Tells a client that has a listening LobbyComm instance to connect to the
    given peer

    # Params
    - `player_name`: Remote player name
    - `player_uid`: Remote player identifier
    """
    await self.send_gpgnet_message("ConnectToPeer", [player_name, player_uid, offer])

Tells a client that has a listening LobbyComm instance to connect to the given peer

Params

  • player_name: Remote player name
  • player_uid: Remote player identifier
async def send_DisconnectFromPeer(self, id: int)
Expand source code
async def send_DisconnectFromPeer(self, id: int):
    """
    Instructs the game to disconnect from the peer given by id
    """
    await self.send_gpgnet_message("DisconnectFromPeer", [id])

Instructs the game to disconnect from the peer given by id

async def send_HostGame(self, map_path: str)
Expand source code
async def send_HostGame(self, map_path: str):
    """
    Tells the game to start listening for incoming connections as a host

    # Params
    - `map_path`: Which scenario to use
    """
    await self.send_gpgnet_message("HostGame", [str(map_path)])

Tells the game to start listening for incoming connections as a host

Params

  • map_path: Which scenario to use
async def send_JoinGame(self, remote_player_name: str, remote_player_uid: int)
Expand source code
async def send_JoinGame(self, remote_player_name: str, remote_player_uid: int):
    """
    Tells the game to join the given peer by ID
    """
    await self.send_gpgnet_message("JoinGame", [remote_player_name, remote_player_uid])

Tells the game to join the given peer by ID

async def send_gpgnet_message(self, command_id: str, arguments: list[int | str | bool])
Expand source code
async def send_gpgnet_message(self, command_id: str, arguments: list[Union[int, str, bool]]):
    message = {"command": command_id, "args": arguments}
    await self.send(message)
class Protocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
Expand source code
class Protocol(metaclass=ABCMeta):
    def __init__(self, reader: StreamReader, writer: StreamWriter):
        self.reader = reader
        self.writer = writer
        # Force calls to drain() to only return once the data has been sent
        self.writer.transport.set_write_buffer_limits(high=0)

    @staticmethod
    @abstractmethod
    def encode_message(message: dict) -> bytes:
        """
        Encode a message as raw bytes. Can be used along with `*_raw` methods.
        """
        pass  # pragma: no cover

    @staticmethod
    @abstractmethod
    def decode_message(data: bytes) -> dict:
        """
        Decode a message from raw bytes.
        """
        pass  # pragma: no cover

    def is_connected(self) -> bool:
        """
        Return whether or not the connection is still alive
        """
        return not self.writer.is_closing()

    @abstractmethod
    async def read_message(self) -> dict:
        """
        Asynchronously read a message from the stream

        # Returns
        The parsed message

        # Errors
        May raise `IncompleteReadError`.
        """
        pass  # pragma: no cover

    async def send_message(self, message: dict) -> None:
        """
        Send a single message in the form of a dictionary

        # Errors
        May raise `DisconnectedError`.
        """
        await self.send_raw(self.encode_message(message))

    async def send_messages(self, messages: list[dict]) -> None:
        """
        Send multiple messages in the form of a list of dictionaries.

        May be more optimal than sending a single message.

        # Errors
        May raise `DisconnectedError`.
        """
        self.write_messages(messages)
        await self.drain()

    async def send_raw(self, data: bytes) -> None:
        """
        Send raw bytes. Should generally not be used.

        # Errors
        May raise `DisconnectedError`.
        """
        self.write_raw(data)
        await self.drain()

    def write_message(self, message: dict) -> None:
        """
        Write a single message into the message buffer. Should be used when
        sending broadcasts or when sending messages that are triggered by
        incoming messages from other players.

        # Errors
        May raise `DisconnectedError`.
        """
        if not self.is_connected():
            raise DisconnectedError("Protocol is not connected!")

        self.write_raw(self.encode_message(message))

    def write_messages(self, messages: list[dict]) -> None:
        """
        Write multiple message into the message buffer.

        # Errors
        May raise `DisconnectedError`.
        """
        metrics.sent_messages.labels(self.__class__.__name__).inc()
        if not self.is_connected():
            raise DisconnectedError("Protocol is not connected!")

        self.writer.writelines([self.encode_message(msg) for msg in messages])

    def write_raw(self, data: bytes) -> None:
        """
        Write raw bytes into the message buffer. Should generally not be used.

        # Errors
        May raise `DisconnectedError`.
        """
        metrics.sent_messages.labels(self.__class__.__name__).inc()
        if not self.is_connected():
            raise DisconnectedError("Protocol is not connected!")

        self.writer.write(data)

    def abort(self) -> None:
        # SelectorTransport only
        self.writer.transport.abort()

    async def close(self) -> None:
        """
        Close the underlying writer as soon as the buffer has emptied.

        # Errors
        Never raises. Any exceptions that occur while waiting to close are
        ignored.
        """
        self.writer.close()
        with contextlib.suppress(Exception):
            await self.writer.wait_closed()

    @synchronizedmethod
    async def drain(self) -> None:
        """
        Await the write buffer to empty.
        See StreamWriter.drain()

        # Errors
        Raises `DisconnectedError` if the client disconnects while waiting for
        the write buffer to empty.
        """
        # Method needs to be synchronized as drain() cannot be called
        # concurrently by multiple coroutines:
        # http://bugs.python.org/issue29930.
        try:
            await self.writer.drain()
        except Exception as e:
            await self.close()
            raise DisconnectedError("Protocol connection lost!") from e

Subclasses

Static methods

def decode_message(data: bytes) ‑> dict
Expand source code
@staticmethod
@abstractmethod
def decode_message(data: bytes) -> dict:
    """
    Decode a message from raw bytes.
    """
    pass  # pragma: no cover

Decode a message from raw bytes.

def encode_message(message: dict) ‑> bytes
Expand source code
@staticmethod
@abstractmethod
def encode_message(message: dict) -> bytes:
    """
    Encode a message as raw bytes. Can be used along with `*_raw` methods.
    """
    pass  # pragma: no cover

Encode a message as raw bytes. Can be used along with *_raw methods.

Methods

def abort(self) ‑> None
Expand source code
def abort(self) -> None:
    # SelectorTransport only
    self.writer.transport.abort()
async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """
    Close the underlying writer as soon as the buffer has emptied.

    # Errors
    Never raises. Any exceptions that occur while waiting to close are
    ignored.
    """
    self.writer.close()
    with contextlib.suppress(Exception):
        await self.writer.wait_closed()

Close the underlying writer as soon as the buffer has emptied.

Errors

Never raises. Any exceptions that occur while waiting to close are ignored.

async def drain(self) ‑> None
Expand source code
@synchronizedmethod
async def drain(self) -> None:
    """
    Await the write buffer to empty.
    See StreamWriter.drain()

    # Errors
    Raises `DisconnectedError` if the client disconnects while waiting for
    the write buffer to empty.
    """
    # Method needs to be synchronized as drain() cannot be called
    # concurrently by multiple coroutines:
    # http://bugs.python.org/issue29930.
    try:
        await self.writer.drain()
    except Exception as e:
        await self.close()
        raise DisconnectedError("Protocol connection lost!") from e

Await the write buffer to empty. See StreamWriter.drain()

Errors

Raises DisconnectedError if the client disconnects while waiting for the write buffer to empty.

def is_connected(self) ‑> bool
Expand source code
def is_connected(self) -> bool:
    """
    Return whether or not the connection is still alive
    """
    return not self.writer.is_closing()

Return whether or not the connection is still alive

async def read_message(self) ‑> dict
Expand source code
@abstractmethod
async def read_message(self) -> dict:
    """
    Asynchronously read a message from the stream

    # Returns
    The parsed message

    # Errors
    May raise `IncompleteReadError`.
    """
    pass  # pragma: no cover

Asynchronously read a message from the stream

Returns

The parsed message

Errors

May raise IncompleteReadError.

async def send_message(self, message: dict) ‑> None
Expand source code
async def send_message(self, message: dict) -> None:
    """
    Send a single message in the form of a dictionary

    # Errors
    May raise `DisconnectedError`.
    """
    await self.send_raw(self.encode_message(message))

Send a single message in the form of a dictionary

Errors

May raise DisconnectedError.

async def send_messages(self, messages: list[dict]) ‑> None
Expand source code
async def send_messages(self, messages: list[dict]) -> None:
    """
    Send multiple messages in the form of a list of dictionaries.

    May be more optimal than sending a single message.

    # Errors
    May raise `DisconnectedError`.
    """
    self.write_messages(messages)
    await self.drain()

Send multiple messages in the form of a list of dictionaries.

May be more optimal than sending a single message.

Errors

May raise DisconnectedError.

async def send_raw(self, data: bytes) ‑> None
Expand source code
async def send_raw(self, data: bytes) -> None:
    """
    Send raw bytes. Should generally not be used.

    # Errors
    May raise `DisconnectedError`.
    """
    self.write_raw(data)
    await self.drain()

Send raw bytes. Should generally not be used.

Errors

May raise DisconnectedError.

def write_message(self, message: dict) ‑> None
Expand source code
def write_message(self, message: dict) -> None:
    """
    Write a single message into the message buffer. Should be used when
    sending broadcasts or when sending messages that are triggered by
    incoming messages from other players.

    # Errors
    May raise `DisconnectedError`.
    """
    if not self.is_connected():
        raise DisconnectedError("Protocol is not connected!")

    self.write_raw(self.encode_message(message))

Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players.

Errors

May raise DisconnectedError.

def write_messages(self, messages: list[dict]) ‑> None
Expand source code
def write_messages(self, messages: list[dict]) -> None:
    """
    Write multiple message into the message buffer.

    # Errors
    May raise `DisconnectedError`.
    """
    metrics.sent_messages.labels(self.__class__.__name__).inc()
    if not self.is_connected():
        raise DisconnectedError("Protocol is not connected!")

    self.writer.writelines([self.encode_message(msg) for msg in messages])

Write multiple message into the message buffer.

Errors

May raise DisconnectedError.

def write_raw(self, data: bytes) ‑> None
Expand source code
def write_raw(self, data: bytes) -> None:
    """
    Write raw bytes into the message buffer. Should generally not be used.

    # Errors
    May raise `DisconnectedError`.
    """
    metrics.sent_messages.labels(self.__class__.__name__).inc()
    if not self.is_connected():
        raise DisconnectedError("Protocol is not connected!")

    self.writer.write(data)

Write raw bytes into the message buffer. Should generally not be used.

Errors

May raise DisconnectedError.

class QDataStreamProtocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
Expand source code
@with_logger
class QDataStreamProtocol(Protocol):
    """
    Implements the legacy QDataStream-based encoding scheme
    """

    @staticmethod
    def read_qstring(buffer: bytes, pos: int = 0) -> tuple[int, str]:
        """
        Parse a serialized QString from buffer (A bytes like object) at given
        position.

        Requires `len(buffer[pos:]) >= 4`.

        Pos is added to buffer_pos.

        # Returns
        The new buffer position and the message.
        """
        chunk = buffer[pos:pos + 4]
        rest = buffer[pos + 4:]
        assert len(chunk) == 4

        (size, ) = struct.unpack("!I", chunk)
        if len(rest) < size:
            raise ValueError(
                "Malformed QString: Claims length {} but actually {}. Entire buffer: {}"
                .format(size, len(rest), base64.b64encode(buffer)))
        return size + pos + 4, (buffer[pos + 4:pos + 4 + size]).decode("UTF-16BE")

    @staticmethod
    def pack_qstring(message: str) -> bytes:
        encoded = message.encode("UTF-16BE")
        return struct.pack("!i", len(encoded)) + encoded

    @staticmethod
    def pack_block(block: bytes) -> bytes:
        return struct.pack("!I", len(block)) + block

    @staticmethod
    def read_block(data):
        buffer_pos = 0
        while len(data[buffer_pos:]) > 4:
            buffer_pos, msg = QDataStreamProtocol.read_qstring(data, buffer_pos)
            yield msg

    @staticmethod
    def pack_message(*args: str) -> bytes:
        """
        For sending a bunch of QStrings packed together in a 'block'
        """
        msg = bytearray()
        for arg in args:
            if not isinstance(arg, str):
                raise NotImplementedError("Only string serialization is supported")

            msg += QDataStreamProtocol.pack_qstring(arg)
        return QDataStreamProtocol.pack_block(msg)

    @staticmethod
    def encode_message(message: dict) -> bytes:
        """
        Encodes a python object as a block of QStrings
        """
        command = message.get("command")
        if command == "ping":
            return PING_MSG
        elif command == "pong":
            return PONG_MSG

        return QDataStreamProtocol.pack_message(json_encoder.encode(message))

    @staticmethod
    def decode_message(data: bytes) -> dict:
        _, action = QDataStreamProtocol.read_qstring(data)
        if action in ("PING", "PONG"):
            return {"command": action.lower()}

        message = json.loads(action)
        try:
            for part in QDataStreamProtocol.read_block(data):
                try:
                    message_part = json.loads(part)
                    if part != action:
                        message.update(message_part)
                except (ValueError, TypeError):
                    if "legacy" not in message:
                        message["legacy"] = []
                    message["legacy"].append(part)
        except (KeyError, ValueError):
            pass
        return message

    async def read_message(self):
        """
        Read a message from the stream

        # Errors
        Raises `IncompleteReadError` on malformed stream.
        """
        try:
            length, *_ = struct.unpack("!I", await self.reader.readexactly(4))
            block = await self.reader.readexactly(length)
        except IncompleteReadError as e:
            if self.reader.at_eof() and not e.partial:
                raise DisconnectedError()
            # Otherwise reraise
            raise

        return QDataStreamProtocol.decode_message(block)

Implements the legacy QDataStream-based encoding scheme

Ancestors

Static methods

def encode_message(message: dict) ‑> bytes
Expand source code
@staticmethod
def encode_message(message: dict) -> bytes:
    """
    Encodes a python object as a block of QStrings
    """
    command = message.get("command")
    if command == "ping":
        return PING_MSG
    elif command == "pong":
        return PONG_MSG

    return QDataStreamProtocol.pack_message(json_encoder.encode(message))

Encodes a python object as a block of QStrings

def pack_block(block: bytes) ‑> bytes
Expand source code
@staticmethod
def pack_block(block: bytes) -> bytes:
    return struct.pack("!I", len(block)) + block
def pack_message(*args: str) ‑> bytes
Expand source code
@staticmethod
def pack_message(*args: str) -> bytes:
    """
    For sending a bunch of QStrings packed together in a 'block'
    """
    msg = bytearray()
    for arg in args:
        if not isinstance(arg, str):
            raise NotImplementedError("Only string serialization is supported")

        msg += QDataStreamProtocol.pack_qstring(arg)
    return QDataStreamProtocol.pack_block(msg)

For sending a bunch of QStrings packed together in a 'block'

def pack_qstring(message: str) ‑> bytes
Expand source code
@staticmethod
def pack_qstring(message: str) -> bytes:
    encoded = message.encode("UTF-16BE")
    return struct.pack("!i", len(encoded)) + encoded
def read_block(data)
Expand source code
@staticmethod
def read_block(data):
    buffer_pos = 0
    while len(data[buffer_pos:]) > 4:
        buffer_pos, msg = QDataStreamProtocol.read_qstring(data, buffer_pos)
        yield msg
def read_qstring(buffer: bytes, pos: int = 0) ‑> tuple[int, str]
Expand source code
@staticmethod
def read_qstring(buffer: bytes, pos: int = 0) -> tuple[int, str]:
    """
    Parse a serialized QString from buffer (A bytes like object) at given
    position.

    Requires `len(buffer[pos:]) >= 4`.

    Pos is added to buffer_pos.

    # Returns
    The new buffer position and the message.
    """
    chunk = buffer[pos:pos + 4]
    rest = buffer[pos + 4:]
    assert len(chunk) == 4

    (size, ) = struct.unpack("!I", chunk)
    if len(rest) < size:
        raise ValueError(
            "Malformed QString: Claims length {} but actually {}. Entire buffer: {}"
            .format(size, len(rest), base64.b64encode(buffer)))
    return size + pos + 4, (buffer[pos + 4:pos + 4 + size]).decode("UTF-16BE")

Parse a serialized QString from buffer (A bytes like object) at given position.

Requires len(buffer[pos:]) >= 4.

Pos is added to buffer_pos.

Returns

The new buffer position and the message.

Methods

async def read_message(self)
Expand source code
async def read_message(self):
    """
    Read a message from the stream

    # Errors
    Raises `IncompleteReadError` on malformed stream.
    """
    try:
        length, *_ = struct.unpack("!I", await self.reader.readexactly(4))
        block = await self.reader.readexactly(length)
    except IncompleteReadError as e:
        if self.reader.at_eof() and not e.partial:
            raise DisconnectedError()
        # Otherwise reraise
        raise

    return QDataStreamProtocol.decode_message(block)

Read a message from the stream

Errors

Raises IncompleteReadError on malformed stream.

Inherited members

class SimpleJsonProtocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
Expand source code
class SimpleJsonProtocol(Protocol):
    @staticmethod
    def encode_message(message: dict) -> bytes:
        return (json_encoder.encode(message) + "\n").encode()

    @staticmethod
    def decode_message(data: bytes) -> dict:
        return json.loads(data.strip())

    async def read_message(self) -> dict:
        line = await self.reader.readline()
        if not line:
            raise DisconnectedError()
        return SimpleJsonProtocol.decode_message(line)

Ancestors

Inherited members