Module server.oauth_service

Classes

class OAuthService
Expand source code
@with_logger
class OAuthService(Service, name="oauth_service"):
    """
    Service for managing the OAuth token logins and verification.
    """

    def __init__(self):
        self.public_keys = {}
        self._last_key_fetch_time = None

    async def initialize(self) -> None:
        await self.retrieve_public_keys()
        # crontab: min hour day month day_of_week
        # Run every 10 minutes to update public keys.
        self._update_cron = aiocron.crontab(
            "*/10 * * * *", func=self.retrieve_public_keys
        )

    @synchronizedmethod
    async def get_public_keys(self) -> dict:
        """
        Return cached keys, or fetch them if they're missing
        """
        if not self.public_keys:
            # Rate limit requests so we don't spam the endpoint when it's down
            if (
                not self._last_key_fetch_time or
                time.monotonic() - self._last_key_fetch_time > 5
            ):
                await self.retrieve_public_keys()

            if not self.public_keys:
                raise RuntimeError("jwks could not be retrieved")

        return self.public_keys

    async def retrieve_public_keys(self) -> None:
        """
        Get the latest jwks from the hydra endpoint
        """
        self._last_key_fetch_time = time.monotonic()
        try:
            async with aiohttp.ClientSession(raise_for_status=True) as session:
                async with session.get(config.HYDRA_JWKS_URI) as resp:
                    jwks = await resp.json()
                    self.public_keys = {
                        jwk["kid"]: RSAAlgorithm.from_jwk(jwk)
                        for jwk in jwks["keys"]
                    }
            self._logger.info("Got public keys from %s", config.HYDRA_JWKS_URI)
        except Exception:
            self._logger.exception(
                "Unable to retrieve jwks, token login will be unavailable!"
            )

    async def get_player_id_from_token(self, token: str) -> int:
        """
        Decode the JWT to get the player_id
        """
        # Ensures that if we're missing the jwks we will try to fetch them on
        # each new login request. This way our login functionality will be
        # restored as soon as possible
        keys = await self.get_public_keys()
        try:
            kid = jwt.get_unverified_header(token)["kid"]
            key = keys[kid]
            decoded = jwt.decode(
                token,
                key=key,
                algorithms="RS256",
                options={"verify_aud": False}
            )

            if "lobby" not in decoded["scp"]:
                raise AuthenticationError(
                    "Token does not have permission to login to the lobby server",
                    "token"
                )

            return int(decoded["sub"])
        except (InvalidTokenError, KeyError, ValueError):
            raise AuthenticationError("Token signature was invalid", "token")

Service for managing the OAuth token logins and verification.

Ancestors

Methods

async def get_player_id_from_token(self, token: str) ‑> int
Expand source code
async def get_player_id_from_token(self, token: str) -> int:
    """
    Decode the JWT to get the player_id
    """
    # Ensures that if we're missing the jwks we will try to fetch them on
    # each new login request. This way our login functionality will be
    # restored as soon as possible
    keys = await self.get_public_keys()
    try:
        kid = jwt.get_unverified_header(token)["kid"]
        key = keys[kid]
        decoded = jwt.decode(
            token,
            key=key,
            algorithms="RS256",
            options={"verify_aud": False}
        )

        if "lobby" not in decoded["scp"]:
            raise AuthenticationError(
                "Token does not have permission to login to the lobby server",
                "token"
            )

        return int(decoded["sub"])
    except (InvalidTokenError, KeyError, ValueError):
        raise AuthenticationError("Token signature was invalid", "token")

Decode the JWT to get the player_id

async def get_public_keys(self) ‑> dict
Expand source code
@synchronizedmethod
async def get_public_keys(self) -> dict:
    """
    Return cached keys, or fetch them if they're missing
    """
    if not self.public_keys:
        # Rate limit requests so we don't spam the endpoint when it's down
        if (
            not self._last_key_fetch_time or
            time.monotonic() - self._last_key_fetch_time > 5
        ):
            await self.retrieve_public_keys()

        if not self.public_keys:
            raise RuntimeError("jwks could not be retrieved")

    return self.public_keys

Return cached keys, or fetch them if they're missing

async def retrieve_public_keys(self) ‑> None
Expand source code
async def retrieve_public_keys(self) -> None:
    """
    Get the latest jwks from the hydra endpoint
    """
    self._last_key_fetch_time = time.monotonic()
    try:
        async with aiohttp.ClientSession(raise_for_status=True) as session:
            async with session.get(config.HYDRA_JWKS_URI) as resp:
                jwks = await resp.json()
                self.public_keys = {
                    jwk["kid"]: RSAAlgorithm.from_jwk(jwk)
                    for jwk in jwks["keys"]
                }
        self._logger.info("Got public keys from %s", config.HYDRA_JWKS_URI)
    except Exception:
        self._logger.exception(
            "Unable to retrieve jwks, token login will be unavailable!"
        )

Get the latest jwks from the hydra endpoint

Inherited members