Module server.protocol.qdatastream

Classes

class QDataStreamProtocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
Expand source code
@with_logger
class QDataStreamProtocol(Protocol):
    """
    Implements the legacy QDataStream-based encoding scheme
    """

    @staticmethod
    def read_qstring(buffer: bytes, pos: int = 0) -> tuple[int, str]:
        """
        Parse a serialized QString from buffer (A bytes like object) at given
        position.

        Requires `len(buffer[pos:]) >= 4`.

        Pos is added to buffer_pos.

        # Returns
        The new buffer position and the message.
        """
        chunk = buffer[pos:pos + 4]
        rest = buffer[pos + 4:]
        assert len(chunk) == 4

        (size, ) = struct.unpack("!I", chunk)
        if len(rest) < size:
            raise ValueError(
                "Malformed QString: Claims length {} but actually {}. Entire buffer: {}"
                .format(size, len(rest), base64.b64encode(buffer)))
        return size + pos + 4, (buffer[pos + 4:pos + 4 + size]).decode("UTF-16BE")

    @staticmethod
    def pack_qstring(message: str) -> bytes:
        encoded = message.encode("UTF-16BE")
        return struct.pack("!i", len(encoded)) + encoded

    @staticmethod
    def pack_block(block: bytes) -> bytes:
        return struct.pack("!I", len(block)) + block

    @staticmethod
    def read_block(data):
        buffer_pos = 0
        while len(data[buffer_pos:]) > 4:
            buffer_pos, msg = QDataStreamProtocol.read_qstring(data, buffer_pos)
            yield msg

    @staticmethod
    def pack_message(*args: str) -> bytes:
        """
        For sending a bunch of QStrings packed together in a 'block'
        """
        msg = bytearray()
        for arg in args:
            if not isinstance(arg, str):
                raise NotImplementedError("Only string serialization is supported")

            msg += QDataStreamProtocol.pack_qstring(arg)
        return QDataStreamProtocol.pack_block(msg)

    @staticmethod
    def encode_message(message: dict) -> bytes:
        """
        Encodes a python object as a block of QStrings
        """
        command = message.get("command")
        if command == "ping":
            return PING_MSG
        elif command == "pong":
            return PONG_MSG

        return QDataStreamProtocol.pack_message(json_encoder.encode(message))

    @staticmethod
    def decode_message(data: bytes) -> dict:
        _, action = QDataStreamProtocol.read_qstring(data)
        if action in ("PING", "PONG"):
            return {"command": action.lower()}

        message = json.loads(action)
        try:
            for part in QDataStreamProtocol.read_block(data):
                try:
                    message_part = json.loads(part)
                    if part != action:
                        message.update(message_part)
                except (ValueError, TypeError):
                    if "legacy" not in message:
                        message["legacy"] = []
                    message["legacy"].append(part)
        except (KeyError, ValueError):
            pass
        return message

    async def read_message(self):
        """
        Read a message from the stream

        # Errors
        Raises `IncompleteReadError` on malformed stream.
        """
        try:
            length, *_ = struct.unpack("!I", await self.reader.readexactly(4))
            block = await self.reader.readexactly(length)
        except IncompleteReadError as e:
            if self.reader.at_eof() and not e.partial:
                raise DisconnectedError()
            # Otherwise reraise
            raise

        return QDataStreamProtocol.decode_message(block)

Implements the legacy QDataStream-based encoding scheme

Ancestors

Static methods

def encode_message(message: dict) ‑> bytes
Expand source code
@staticmethod
def encode_message(message: dict) -> bytes:
    """
    Encodes a python object as a block of QStrings
    """
    command = message.get("command")
    if command == "ping":
        return PING_MSG
    elif command == "pong":
        return PONG_MSG

    return QDataStreamProtocol.pack_message(json_encoder.encode(message))

Encodes a python object as a block of QStrings

def pack_block(block: bytes) ‑> bytes
Expand source code
@staticmethod
def pack_block(block: bytes) -> bytes:
    return struct.pack("!I", len(block)) + block
def pack_message(*args: str) ‑> bytes
Expand source code
@staticmethod
def pack_message(*args: str) -> bytes:
    """
    For sending a bunch of QStrings packed together in a 'block'
    """
    msg = bytearray()
    for arg in args:
        if not isinstance(arg, str):
            raise NotImplementedError("Only string serialization is supported")

        msg += QDataStreamProtocol.pack_qstring(arg)
    return QDataStreamProtocol.pack_block(msg)

For sending a bunch of QStrings packed together in a 'block'

def pack_qstring(message: str) ‑> bytes
Expand source code
@staticmethod
def pack_qstring(message: str) -> bytes:
    encoded = message.encode("UTF-16BE")
    return struct.pack("!i", len(encoded)) + encoded
def read_block(data)
Expand source code
@staticmethod
def read_block(data):
    buffer_pos = 0
    while len(data[buffer_pos:]) > 4:
        buffer_pos, msg = QDataStreamProtocol.read_qstring(data, buffer_pos)
        yield msg
def read_qstring(buffer: bytes, pos: int = 0) ‑> tuple[int, str]
Expand source code
@staticmethod
def read_qstring(buffer: bytes, pos: int = 0) -> tuple[int, str]:
    """
    Parse a serialized QString from buffer (A bytes like object) at given
    position.

    Requires `len(buffer[pos:]) >= 4`.

    Pos is added to buffer_pos.

    # Returns
    The new buffer position and the message.
    """
    chunk = buffer[pos:pos + 4]
    rest = buffer[pos + 4:]
    assert len(chunk) == 4

    (size, ) = struct.unpack("!I", chunk)
    if len(rest) < size:
        raise ValueError(
            "Malformed QString: Claims length {} but actually {}. Entire buffer: {}"
            .format(size, len(rest), base64.b64encode(buffer)))
    return size + pos + 4, (buffer[pos + 4:pos + 4 + size]).decode("UTF-16BE")

Parse a serialized QString from buffer (A bytes like object) at given position.

Requires len(buffer[pos:]) >= 4.

Pos is added to buffer_pos.

Returns

The new buffer position and the message.

Methods

async def read_message(self)
Expand source code
async def read_message(self):
    """
    Read a message from the stream

    # Errors
    Raises `IncompleteReadError` on malformed stream.
    """
    try:
        length, *_ = struct.unpack("!I", await self.reader.readexactly(4))
        block = await self.reader.readexactly(length)
    except IncompleteReadError as e:
        if self.reader.at_eof() and not e.partial:
            raise DisconnectedError()
        # Otherwise reraise
        raise

    return QDataStreamProtocol.decode_message(block)

Read a message from the stream

Errors

Raises IncompleteReadError on malformed stream.

Inherited members