Module server.protocol.qdatastream
Classes
class QDataStreamProtocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
-
Implements the legacy QDataStream-based encoding scheme
Expand source code
@with_logger class QDataStreamProtocol(Protocol): """ Implements the legacy QDataStream-based encoding scheme """ @staticmethod def read_qstring(buffer: bytes, pos: int = 0) -> tuple[int, str]: """ Parse a serialized QString from buffer (A bytes like object) at given position. Requires `len(buffer[pos:]) >= 4`. Pos is added to buffer_pos. # Returns The new buffer position and the message. """ chunk = buffer[pos:pos + 4] rest = buffer[pos + 4:] assert len(chunk) == 4 (size, ) = struct.unpack("!I", chunk) if len(rest) < size: raise ValueError( "Malformed QString: Claims length {} but actually {}. Entire buffer: {}" .format(size, len(rest), base64.b64encode(buffer))) return size + pos + 4, (buffer[pos + 4:pos + 4 + size]).decode("UTF-16BE") @staticmethod def pack_qstring(message: str) -> bytes: encoded = message.encode("UTF-16BE") return struct.pack("!i", len(encoded)) + encoded @staticmethod def pack_block(block: bytes) -> bytes: return struct.pack("!I", len(block)) + block @staticmethod def read_block(data): buffer_pos = 0 while len(data[buffer_pos:]) > 4: buffer_pos, msg = QDataStreamProtocol.read_qstring(data, buffer_pos) yield msg @staticmethod def pack_message(*args: str) -> bytes: """ For sending a bunch of QStrings packed together in a 'block' """ msg = bytearray() for arg in args: if not isinstance(arg, str): raise NotImplementedError("Only string serialization is supported") msg += QDataStreamProtocol.pack_qstring(arg) return QDataStreamProtocol.pack_block(msg) @staticmethod def encode_message(message: dict) -> bytes: """ Encodes a python object as a block of QStrings """ command = message.get("command") if command == "ping": return PING_MSG elif command == "pong": return PONG_MSG return QDataStreamProtocol.pack_message(json_encoder.encode(message)) @staticmethod def decode_message(data: bytes) -> dict: _, action = QDataStreamProtocol.read_qstring(data) if action in ("PING", "PONG"): return {"command": action.lower()} message = json.loads(action) try: for part in QDataStreamProtocol.read_block(data): try: message_part = json.loads(part) if part != action: message.update(message_part) except (ValueError, TypeError): if "legacy" not in message: message["legacy"] = [] message["legacy"].append(part) except (KeyError, ValueError): pass return message async def read_message(self): """ Read a message from the stream # Errors Raises `IncompleteReadError` on malformed stream. """ try: length, *_ = struct.unpack("!I", await self.reader.readexactly(4)) block = await self.reader.readexactly(length) except IncompleteReadError as e: if self.reader.at_eof() and not e.partial: raise DisconnectedError() # Otherwise reraise raise return QDataStreamProtocol.decode_message(block)
Ancestors
Static methods
def encode_message(message: dict) ‑> bytes
-
Encodes a python object as a block of QStrings
def pack_block(block: bytes) ‑> bytes
def pack_message(*args: str) ‑> bytes
-
For sending a bunch of QStrings packed together in a 'block'
def pack_qstring(message: str) ‑> bytes
def read_block(data)
def read_qstring(buffer: bytes, pos: int = 0) ‑> tuple[int, str]
-
Parse a serialized QString from buffer (A bytes like object) at given position.
Requires
len(buffer[pos:]) >= 4
.Pos is added to buffer_pos.
Returns
The new buffer position and the message.
Methods
async def read_message(self)
-
Read a message from the stream
Errors
Raises
IncompleteReadError
on malformed stream.
Inherited members