Module server.protocol.simple_json

Classes

class SimpleJsonProtocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
Expand source code
class SimpleJsonProtocol(Protocol):
    @staticmethod
    def encode_message(message: dict) -> bytes:
        return (json_encoder.encode(message) + "\n").encode()

    @staticmethod
    def decode_message(data: bytes) -> dict:
        return json.loads(data.strip())

    async def read_message(self) -> dict:
        line = await self.reader.readline()
        if not line:
            raise DisconnectedError()
        return SimpleJsonProtocol.decode_message(line)

Ancestors

Inherited members