Module server.protocol.protocol
Protocol base class
Classes
class DisconnectedError (*args, **kwargs)-
Expand source code
class DisconnectedError(ConnectionError): """For signaling that a protocol has lost connection to the remote."""For signaling that a protocol has lost connection to the remote.
Ancestors
- builtins.ConnectionError
- builtins.OSError
- builtins.Exception
- builtins.BaseException
class Protocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)-
Expand source code
class Protocol(metaclass=ABCMeta): def __init__(self, reader: StreamReader, writer: StreamWriter): self.reader = reader self.writer = writer # Force calls to drain() to only return once the data has been sent self.writer.transport.set_write_buffer_limits(high=0) @staticmethod @abstractmethod def encode_message(message: dict) -> bytes: """ Encode a message as raw bytes. Can be used along with `*_raw` methods. """ pass # pragma: no cover @staticmethod @abstractmethod def decode_message(data: bytes) -> dict: """ Decode a message from raw bytes. """ pass # pragma: no cover def is_connected(self) -> bool: """ Return whether or not the connection is still alive """ return not self.writer.is_closing() @abstractmethod async def read_message(self) -> dict: """ Asynchronously read a message from the stream # Returns The parsed message # Errors May raise `IncompleteReadError`. """ pass # pragma: no cover async def send_message(self, message: dict) -> None: """ Send a single message in the form of a dictionary # Errors May raise `DisconnectedError`. """ await self.send_raw(self.encode_message(message)) async def send_messages(self, messages: list[dict]) -> None: """ Send multiple messages in the form of a list of dictionaries. May be more optimal than sending a single message. # Errors May raise `DisconnectedError`. """ self.write_messages(messages) await self.drain() async def send_raw(self, data: bytes) -> None: """ Send raw bytes. Should generally not be used. # Errors May raise `DisconnectedError`. """ self.write_raw(data) await self.drain() def write_message(self, message: dict) -> None: """ Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players. # Errors May raise `DisconnectedError`. """ if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.write_raw(self.encode_message(message)) def write_messages(self, messages: list[dict]) -> None: """ Write multiple message into the message buffer. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.writelines([self.encode_message(msg) for msg in messages]) def write_raw(self, data: bytes) -> None: """ Write raw bytes into the message buffer. Should generally not be used. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.write(data) def abort(self) -> None: # SelectorTransport only self.writer.transport.abort() async def close(self) -> None: """ Close the underlying writer as soon as the buffer has emptied. # Errors Never raises. Any exceptions that occur while waiting to close are ignored. """ self.writer.close() with contextlib.suppress(Exception): await self.writer.wait_closed() @synchronizedmethod async def drain(self) -> None: """ Await the write buffer to empty. See StreamWriter.drain() # Errors Raises `DisconnectedError` if the client disconnects while waiting for the write buffer to empty. """ # Method needs to be synchronized as drain() cannot be called # concurrently by multiple coroutines: # http://bugs.python.org/issue29930. try: await self.writer.drain() except Exception as e: await self.close() raise DisconnectedError("Protocol connection lost!") from eSubclasses
Static methods
def decode_message(data: bytes) ‑> dict-
Expand source code
@staticmethod @abstractmethod def decode_message(data: bytes) -> dict: """ Decode a message from raw bytes. """ pass # pragma: no coverDecode a message from raw bytes.
def encode_message(message: dict) ‑> bytes-
Expand source code
@staticmethod @abstractmethod def encode_message(message: dict) -> bytes: """ Encode a message as raw bytes. Can be used along with `*_raw` methods. """ pass # pragma: no coverEncode a message as raw bytes. Can be used along with
*_rawmethods.
Methods
def abort(self) ‑> None-
Expand source code
def abort(self) -> None: # SelectorTransport only self.writer.transport.abort() async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """ Close the underlying writer as soon as the buffer has emptied. # Errors Never raises. Any exceptions that occur while waiting to close are ignored. """ self.writer.close() with contextlib.suppress(Exception): await self.writer.wait_closed()Close the underlying writer as soon as the buffer has emptied.
Errors
Never raises. Any exceptions that occur while waiting to close are ignored.
async def drain(self) ‑> None-
Expand source code
@synchronizedmethod async def drain(self) -> None: """ Await the write buffer to empty. See StreamWriter.drain() # Errors Raises `DisconnectedError` if the client disconnects while waiting for the write buffer to empty. """ # Method needs to be synchronized as drain() cannot be called # concurrently by multiple coroutines: # http://bugs.python.org/issue29930. try: await self.writer.drain() except Exception as e: await self.close() raise DisconnectedError("Protocol connection lost!") from eAwait the write buffer to empty. See StreamWriter.drain()
Errors
Raises
DisconnectedErrorif the client disconnects while waiting for the write buffer to empty. def is_connected(self) ‑> bool-
Expand source code
def is_connected(self) -> bool: """ Return whether or not the connection is still alive """ return not self.writer.is_closing()Return whether or not the connection is still alive
async def read_message(self) ‑> dict-
Expand source code
@abstractmethod async def read_message(self) -> dict: """ Asynchronously read a message from the stream # Returns The parsed message # Errors May raise `IncompleteReadError`. """ pass # pragma: no coverAsynchronously read a message from the stream
Returns
The parsed message
Errors
May raise
IncompleteReadError. async def send_message(self, message: dict) ‑> None-
Expand source code
async def send_message(self, message: dict) -> None: """ Send a single message in the form of a dictionary # Errors May raise `DisconnectedError`. """ await self.send_raw(self.encode_message(message)) async def send_messages(self, messages: list[dict]) ‑> None-
Expand source code
async def send_messages(self, messages: list[dict]) -> None: """ Send multiple messages in the form of a list of dictionaries. May be more optimal than sending a single message. # Errors May raise `DisconnectedError`. """ self.write_messages(messages) await self.drain()Send multiple messages in the form of a list of dictionaries.
May be more optimal than sending a single message.
Errors
May raise
DisconnectedError. async def send_raw(self, data: bytes) ‑> None-
Expand source code
async def send_raw(self, data: bytes) -> None: """ Send raw bytes. Should generally not be used. # Errors May raise `DisconnectedError`. """ self.write_raw(data) await self.drain() def write_message(self, message: dict) ‑> None-
Expand source code
def write_message(self, message: dict) -> None: """ Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players. # Errors May raise `DisconnectedError`. """ if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.write_raw(self.encode_message(message))Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players.
Errors
May raise
DisconnectedError. def write_messages(self, messages: list[dict]) ‑> None-
Expand source code
def write_messages(self, messages: list[dict]) -> None: """ Write multiple message into the message buffer. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.writelines([self.encode_message(msg) for msg in messages]) def write_raw(self, data: bytes) ‑> None-
Expand source code
def write_raw(self, data: bytes) -> None: """ Write raw bytes into the message buffer. Should generally not be used. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.write(data)Write raw bytes into the message buffer. Should generally not be used.
Errors
May raise
DisconnectedError.