Module server.control
Tiny http server for introspecting state
Classes
class ControlServer (lobby_server: ServerInstance)
-
Expand source code
@with_logger class ControlServer: def __init__( self, lobby_server: "ServerInstance", ): self.lobby_server = lobby_server self.game_service = lobby_server.services["game_service"] self.player_service = lobby_server.services["player_service"] self.host = None self.port = None self.app = web.Application() self.runner = web.AppRunner(self.app) self.app.add_routes([ web.get("/games", self.games), web.get("/players", self.players), ]) async def run_from_config(self) -> None: """ Initialize the http control server """ host = socket.gethostbyname(socket.gethostname()) port = config.CONTROL_SERVER_PORT await self.shutdown() await self.start(host, port) async def start(self, host: str, port: int) -> None: self.host = host self.port = port await self.runner.setup() self.site = web.TCPSite(self.runner, host, port) await self.site.start() self._logger.info( "Control server listening on http://%s:%s", host, port ) async def shutdown(self) -> None: await self.runner.cleanup() self.host = None self.port = None async def games(self, request) -> web.Response: return web.json_response([ game.to_dict() for game in self.game_service.all_games ]) async def players(self, request) -> web.Response: return web.json_response([ player.to_dict() for player in self.player_service.all_players ])
Methods
async def games(self, request) ‑> aiohttp.web_response.Response
-
Expand source code
async def games(self, request) -> web.Response: return web.json_response([ game.to_dict() for game in self.game_service.all_games ])
async def players(self, request) ‑> aiohttp.web_response.Response
-
Expand source code
async def players(self, request) -> web.Response: return web.json_response([ player.to_dict() for player in self.player_service.all_players ])
async def run_from_config(self) ‑> None
-
Expand source code
async def run_from_config(self) -> None: """ Initialize the http control server """ host = socket.gethostbyname(socket.gethostname()) port = config.CONTROL_SERVER_PORT await self.shutdown() await self.start(host, port)
Initialize the http control server
async def shutdown(self) ‑> None
-
Expand source code
async def shutdown(self) -> None: await self.runner.cleanup() self.host = None self.port = None
async def start(self, host: str, port: int) ‑> None
-
Expand source code
async def start(self, host: str, port: int) -> None: self.host = host self.port = port await self.runner.setup() self.site = web.TCPSite(self.runner, host, port) await self.site.start() self._logger.info( "Control server listening on http://%s:%s", host, port )