Module server.oauth_service

Classes

class OAuthService

Service for managing the OAuth token logins and verification.

Expand source code
@with_logger
class OAuthService(Service, name="oauth_service"):
    """
    Service for managing the OAuth token logins and verification.
    """

    def __init__(self):
        self.public_keys = {}
        self._last_key_fetch_time = None

    async def initialize(self) -> None:
        await self.retrieve_public_keys()
        # crontab: min hour day month day_of_week
        # Run every 10 minutes to update public keys.
        self._update_cron = aiocron.crontab(
            "*/10 * * * *", func=self.retrieve_public_keys
        )

    @synchronizedmethod
    async def get_public_keys(self) -> dict:
        """
        Return cached keys, or fetch them if they're missing
        """
        if not self.public_keys:
            # Rate limit requests so we don't spam the endpoint when it's down
            if (
                not self._last_key_fetch_time or
                time.monotonic() - self._last_key_fetch_time > 5
            ):
                await self.retrieve_public_keys()

            if not self.public_keys:
                raise RuntimeError("jwks could not be retrieved")

        return self.public_keys

    async def retrieve_public_keys(self) -> None:
        """
        Get the latest jwks from the hydra endpoint
        """
        self._last_key_fetch_time = time.monotonic()
        try:
            async with aiohttp.ClientSession(raise_for_status=True) as session:
                async with session.get(config.HYDRA_JWKS_URI) as resp:
                    jwks = await resp.json()
                    self.public_keys = {
                        jwk["kid"]: RSAAlgorithm.from_jwk(jwk)
                        for jwk in jwks["keys"]
                    }
            self._logger.info("Got public keys from %s", config.HYDRA_JWKS_URI)
        except Exception:
            self._logger.exception(
                "Unable to retrieve jwks, token login will be unavailable!"
            )

    async def get_player_id_from_token(self, token: str) -> int:
        """
        Decode the JWT to get the player_id
        """
        # Ensures that if we're missing the jwks we will try to fetch them on
        # each new login request. This way our login functionality will be
        # restored as soon as possible
        keys = await self.get_public_keys()
        try:
            kid = jwt.get_unverified_header(token)["kid"]
            key = keys[kid]
            decoded = jwt.decode(
                token,
                key=key,
                algorithms="RS256",
                options={"verify_aud": False}
            )

            if "lobby" not in decoded["scp"]:
                raise AuthenticationError(
                    "Token does not have permission to login to the lobby server",
                    "token"
                )

            return int(decoded["sub"])
        except (InvalidTokenError, KeyError, ValueError):
            raise AuthenticationError("Token signature was invalid", "token")

Ancestors

Methods

async def get_player_id_from_token(self, token: str) ‑> int

Decode the JWT to get the player_id

async def get_public_keys(self) ‑> dict

Return cached keys, or fetch them if they're missing

async def retrieve_public_keys(self) ‑> None

Get the latest jwks from the hydra endpoint

Inherited members