Module server.protocol.protocol
Classes
class DisconnectedError (*args, **kwargs)
-
Expand source code
class DisconnectedError(ConnectionError): """For signaling that a protocol has lost connection to the remote."""
For signaling that a protocol has lost connection to the remote.
Ancestors
- builtins.ConnectionError
- builtins.OSError
- builtins.Exception
- builtins.BaseException
class Protocol (reader: asyncio.streams.StreamReader, writer: asyncio.streams.StreamWriter)
-
Expand source code
class Protocol(metaclass=ABCMeta): def __init__(self, reader: StreamReader, writer: StreamWriter): self.reader = reader self.writer = writer # Force calls to drain() to only return once the data has been sent self.writer.transport.set_write_buffer_limits(high=0) @staticmethod @abstractmethod def encode_message(message: dict) -> bytes: """ Encode a message as raw bytes. Can be used along with `*_raw` methods. """ pass # pragma: no cover @staticmethod @abstractmethod def decode_message(data: bytes) -> dict: """ Decode a message from raw bytes. """ pass # pragma: no cover def is_connected(self) -> bool: """ Return whether or not the connection is still alive """ return not self.writer.is_closing() @abstractmethod async def read_message(self) -> dict: """ Asynchronously read a message from the stream # Returns The parsed message # Errors May raise `IncompleteReadError`. """ pass # pragma: no cover async def send_message(self, message: dict) -> None: """ Send a single message in the form of a dictionary # Errors May raise `DisconnectedError`. """ await self.send_raw(self.encode_message(message)) async def send_messages(self, messages: list[dict]) -> None: """ Send multiple messages in the form of a list of dictionaries. May be more optimal than sending a single message. # Errors May raise `DisconnectedError`. """ self.write_messages(messages) await self.drain() async def send_raw(self, data: bytes) -> None: """ Send raw bytes. Should generally not be used. # Errors May raise `DisconnectedError`. """ self.write_raw(data) await self.drain() def write_message(self, message: dict) -> None: """ Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players. # Errors May raise `DisconnectedError`. """ if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.write_raw(self.encode_message(message)) def write_messages(self, messages: list[dict]) -> None: """ Write multiple message into the message buffer. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.writelines([self.encode_message(msg) for msg in messages]) def write_raw(self, data: bytes) -> None: """ Write raw bytes into the message buffer. Should generally not be used. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.write(data) def abort(self) -> None: # SelectorTransport only self.writer.transport.abort() async def close(self) -> None: """ Close the underlying writer as soon as the buffer has emptied. # Errors Never raises. Any exceptions that occur while waiting to close are ignored. """ self.writer.close() with contextlib.suppress(Exception): await self.writer.wait_closed() @synchronizedmethod async def drain(self) -> None: """ Await the write buffer to empty. See StreamWriter.drain() # Errors Raises `DisconnectedError` if the client disconnects while waiting for the write buffer to empty. """ # Method needs to be synchronized as drain() cannot be called # concurrently by multiple coroutines: # http://bugs.python.org/issue29930. try: await self.writer.drain() except Exception as e: await self.close() raise DisconnectedError("Protocol connection lost!") from e
Subclasses
Static methods
def decode_message(data: bytes) ‑> dict
-
Expand source code
@staticmethod @abstractmethod def decode_message(data: bytes) -> dict: """ Decode a message from raw bytes. """ pass # pragma: no cover
Decode a message from raw bytes.
def encode_message(message: dict) ‑> bytes
-
Expand source code
@staticmethod @abstractmethod def encode_message(message: dict) -> bytes: """ Encode a message as raw bytes. Can be used along with `*_raw` methods. """ pass # pragma: no cover
Encode a message as raw bytes. Can be used along with
*_raw
methods.
Methods
def abort(self) ‑> None
-
Expand source code
def abort(self) -> None: # SelectorTransport only self.writer.transport.abort()
async def close(self) ‑> None
-
Expand source code
async def close(self) -> None: """ Close the underlying writer as soon as the buffer has emptied. # Errors Never raises. Any exceptions that occur while waiting to close are ignored. """ self.writer.close() with contextlib.suppress(Exception): await self.writer.wait_closed()
Close the underlying writer as soon as the buffer has emptied.
Errors
Never raises. Any exceptions that occur while waiting to close are ignored.
async def drain(self) ‑> None
-
Expand source code
@synchronizedmethod async def drain(self) -> None: """ Await the write buffer to empty. See StreamWriter.drain() # Errors Raises `DisconnectedError` if the client disconnects while waiting for the write buffer to empty. """ # Method needs to be synchronized as drain() cannot be called # concurrently by multiple coroutines: # http://bugs.python.org/issue29930. try: await self.writer.drain() except Exception as e: await self.close() raise DisconnectedError("Protocol connection lost!") from e
Await the write buffer to empty. See StreamWriter.drain()
Errors
Raises
DisconnectedError
if the client disconnects while waiting for the write buffer to empty. def is_connected(self) ‑> bool
-
Expand source code
def is_connected(self) -> bool: """ Return whether or not the connection is still alive """ return not self.writer.is_closing()
Return whether or not the connection is still alive
async def read_message(self) ‑> dict
-
Expand source code
@abstractmethod async def read_message(self) -> dict: """ Asynchronously read a message from the stream # Returns The parsed message # Errors May raise `IncompleteReadError`. """ pass # pragma: no cover
Asynchronously read a message from the stream
Returns
The parsed message
Errors
May raise
IncompleteReadError
. async def send_message(self, message: dict) ‑> None
-
Expand source code
async def send_message(self, message: dict) -> None: """ Send a single message in the form of a dictionary # Errors May raise `DisconnectedError`. """ await self.send_raw(self.encode_message(message))
async def send_messages(self, messages: list[dict]) ‑> None
-
Expand source code
async def send_messages(self, messages: list[dict]) -> None: """ Send multiple messages in the form of a list of dictionaries. May be more optimal than sending a single message. # Errors May raise `DisconnectedError`. """ self.write_messages(messages) await self.drain()
Send multiple messages in the form of a list of dictionaries.
May be more optimal than sending a single message.
Errors
May raise
DisconnectedError
. async def send_raw(self, data: bytes) ‑> None
-
Expand source code
async def send_raw(self, data: bytes) -> None: """ Send raw bytes. Should generally not be used. # Errors May raise `DisconnectedError`. """ self.write_raw(data) await self.drain()
def write_message(self, message: dict) ‑> None
-
Expand source code
def write_message(self, message: dict) -> None: """ Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players. # Errors May raise `DisconnectedError`. """ if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.write_raw(self.encode_message(message))
Write a single message into the message buffer. Should be used when sending broadcasts or when sending messages that are triggered by incoming messages from other players.
Errors
May raise
DisconnectedError
. def write_messages(self, messages: list[dict]) ‑> None
-
Expand source code
def write_messages(self, messages: list[dict]) -> None: """ Write multiple message into the message buffer. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.writelines([self.encode_message(msg) for msg in messages])
def write_raw(self, data: bytes) ‑> None
-
Expand source code
def write_raw(self, data: bytes) -> None: """ Write raw bytes into the message buffer. Should generally not be used. # Errors May raise `DisconnectedError`. """ metrics.sent_messages.labels(self.__class__.__name__).inc() if not self.is_connected(): raise DisconnectedError("Protocol is not connected!") self.writer.write(data)
Write raw bytes into the message buffer. Should generally not be used.
Errors
May raise
DisconnectedError
.