Module server.geoip_service
Manages the GeoIP database
Functions
def extract_file(tar: tarfile.TarFile, name: str) ‑> IO[bytes]
-
Expand source code
def extract_file(tar: tarfile.TarFile, name: str) -> IO[bytes]: """ Helper for getting a file handle to the database file in the tar archive. This is needed because we don't necessarily know the name of it's containing folder. # Errors Raises `TarError` if the tar archive does not contain the databse file. """ mmdb = next( (m for m in tar.getmembers() if m.name.endswith(name) and m.isfile()), None ) if mmdb is None: # Because we verified the checksum earlier, this should only be # possible if maxmind actually served us a bad file raise tarfile.TarError("Tar archive did not contain the database file!") f = tar.extractfile(mmdb) if f is None: raise tarfile.TarError("Tar archive did not contain the database file!") return f
Helper for getting a file handle to the database file in the tar archive. This is needed because we don't necessarily know the name of it's containing folder.
Errors
Raises
TarError
if the tar archive does not contain the databse file.
Classes
class GeoIpService
-
Expand source code
@with_logger class GeoIpService(Service): """ Service for managing the GeoIp database. This includes an asyncio crontab which periodically checks if the current file is out of date. If it is, then the service will try to download a new file from tue url in `server.config`. Provides an interface for getting data out of the database. """ def __init__(self): self.refresh_file_path() config.register_callback("GEO_IP_DATABASE_PATH", self.refresh_file_path) self.db = None self.db_update_time = None def refresh_file_path(self): self.file_path = config.GEO_IP_DATABASE_PATH async def initialize(self) -> None: self.check_geoip_db_file_updated() await self.check_update_geoip_db() # crontab: min hour day month day_of_week # Run every Wednesday because GeoLite2 is updated every first Tuesday # of the month. self._update_cron = aiocron.crontab( "0 0 * * 3", func=self.check_update_geoip_db ) self._check_file_timer = Timer( 60 * 10, self.check_geoip_db_file_updated, start=True ) def check_geoip_db_file_updated(self): """ Checks if the local database file has been updated by a server admin and loads it if it has. """ if not os.path.isfile(self.file_path): return if self.db is None: # We haven't loaded the file before self.load_db() else: assert self.db_update_time is not None # We have loaded the file, so check if it has been updated date_modified = datetime.fromtimestamp( os.path.getmtime(self.file_path) ) if date_modified > self.db_update_time: self.load_db() async def check_update_geoip_db(self) -> None: """ Check if the geoip database is old and update it if so. """ if not config.GEO_IP_LICENSE_KEY: self._logger.warning( "GEO_IP_LICENSE_KEY not set! Unable to download GeoIP database!" ) return self._logger.debug("Checking if geoip database needs updating") try: date_modified = datetime.fromtimestamp( os.path.getmtime(self.file_path) ) delta = datetime.now() - date_modified if delta.days > config.GEO_IP_DATABASE_MAX_AGE_DAYS: self._logger.info("Geoip database is out of date") await self.download_geoip_db() except FileNotFoundError: # pragma: no cover self._logger.warning("Geoip database is missing...") await self.download_geoip_db() except asyncio.TimeoutError: # pragma: no cover self._logger.warning( "Failed to download database file! " "Check the network connection and try again" ) except Exception as e: # pragma: no cover self._logger.exception(e) raise e self.load_db() async def download_geoip_db(self) -> None: """ Download the geoip database to a file. If the downloaded file is not a valid gzip file, then it does NOT overwrite the old file. """ assert config.GEO_IP_LICENSE_KEY is not None self._logger.info("Downloading new geoip database") # Download new file to a temp location with TemporaryFile() as temp_file: await self._download_file( config.GEO_IP_DATABASE_URL, config.GEO_IP_LICENSE_KEY, temp_file ) temp_file.seek(0) # Unzip the archive and overwrite the old file try: with tarfile.open(fileobj=temp_file, mode="r:gz") as tar: with open(self.file_path, "wb") as f_out: f_in = extract_file(tar, "GeoLite2-Country.mmdb") shutil.copyfileobj(f_in, f_out) except (tarfile.TarError) as e: # pragma: no cover self._logger.warning("Failed to extract downloaded file!") raise e self._logger.info("New database download complete") async def _download_file( self, url: str, license_key: str, fileobj: IO[bytes] ) -> None: """ Download a file using aiohttp and save it to a file. # Params - `url`: The url to download from - `file_path`: Path to save the file at """ chunk_size = 1024 params = { "edition_id": "GeoLite2-Country", "license_key": license_key, "suffix": "tar.gz" } async def get_checksum(session): async with session.get(url, params={ **params, "suffix": params["suffix"] + ".md5" }, timeout=60 * 20) as resp: return await resp.text() async def get_db_file_with_checksum(session): hasher = hashlib.md5() async with session.get(url, params=params, timeout=60 * 20) as resp: while True: chunk = await resp.content.read(chunk_size) if not chunk: break fileobj.write(chunk) hasher.update(chunk) return hasher.hexdigest() async with aiohttp.ClientSession(raise_for_status=True) as session: checksum, our_hash = await asyncio.gather( get_checksum(session), get_db_file_with_checksum(session) ) if checksum != our_hash: raise Exception( f"Hashes did not match! Expected {checksum} got {our_hash}" ) def load_db(self) -> None: """ Loads the database into memory. """ # Set the time first, if the file is corrupted we don't need to try # loading it again anyways self.db_update_time = datetime.now() try: new_db = maxminddb.open_database(self.file_path) except (InvalidDatabaseError, OSError, ValueError): self._logger.exception( "Failed to load maxmind db! Maybe the download was interrupted" ) else: if self.db is not None: self.db.close() self.db = new_db self._logger.info( "File loaded successfully from %s", self.file_path ) def country(self, address: str) -> str: """ Look up an ip address in the db and return it's country code. """ default_value = "" if self.db is None: return default_value entry = self.db.get(address) if entry is None: return default_value return str(entry.get("country", {}).get("iso_code", default_value)) async def shutdown(self): if self.db is not None: self.db.close()
Service for managing the GeoIp database. This includes an asyncio crontab which periodically checks if the current file is out of date. If it is, then the service will try to download a new file from tue url in
server.config
.Provides an interface for getting data out of the database.
Ancestors
Methods
def check_geoip_db_file_updated(self)
-
Expand source code
def check_geoip_db_file_updated(self): """ Checks if the local database file has been updated by a server admin and loads it if it has. """ if not os.path.isfile(self.file_path): return if self.db is None: # We haven't loaded the file before self.load_db() else: assert self.db_update_time is not None # We have loaded the file, so check if it has been updated date_modified = datetime.fromtimestamp( os.path.getmtime(self.file_path) ) if date_modified > self.db_update_time: self.load_db()
Checks if the local database file has been updated by a server admin and loads it if it has.
async def check_update_geoip_db(self) ‑> None
-
Expand source code
async def check_update_geoip_db(self) -> None: """ Check if the geoip database is old and update it if so. """ if not config.GEO_IP_LICENSE_KEY: self._logger.warning( "GEO_IP_LICENSE_KEY not set! Unable to download GeoIP database!" ) return self._logger.debug("Checking if geoip database needs updating") try: date_modified = datetime.fromtimestamp( os.path.getmtime(self.file_path) ) delta = datetime.now() - date_modified if delta.days > config.GEO_IP_DATABASE_MAX_AGE_DAYS: self._logger.info("Geoip database is out of date") await self.download_geoip_db() except FileNotFoundError: # pragma: no cover self._logger.warning("Geoip database is missing...") await self.download_geoip_db() except asyncio.TimeoutError: # pragma: no cover self._logger.warning( "Failed to download database file! " "Check the network connection and try again" ) except Exception as e: # pragma: no cover self._logger.exception(e) raise e self.load_db()
Check if the geoip database is old and update it if so.
def country(self, address: str) ‑> str
-
Expand source code
def country(self, address: str) -> str: """ Look up an ip address in the db and return it's country code. """ default_value = "" if self.db is None: return default_value entry = self.db.get(address) if entry is None: return default_value return str(entry.get("country", {}).get("iso_code", default_value))
Look up an ip address in the db and return it's country code.
async def download_geoip_db(self) ‑> None
-
Expand source code
async def download_geoip_db(self) -> None: """ Download the geoip database to a file. If the downloaded file is not a valid gzip file, then it does NOT overwrite the old file. """ assert config.GEO_IP_LICENSE_KEY is not None self._logger.info("Downloading new geoip database") # Download new file to a temp location with TemporaryFile() as temp_file: await self._download_file( config.GEO_IP_DATABASE_URL, config.GEO_IP_LICENSE_KEY, temp_file ) temp_file.seek(0) # Unzip the archive and overwrite the old file try: with tarfile.open(fileobj=temp_file, mode="r:gz") as tar: with open(self.file_path, "wb") as f_out: f_in = extract_file(tar, "GeoLite2-Country.mmdb") shutil.copyfileobj(f_in, f_out) except (tarfile.TarError) as e: # pragma: no cover self._logger.warning("Failed to extract downloaded file!") raise e self._logger.info("New database download complete")
Download the geoip database to a file. If the downloaded file is not a valid gzip file, then it does NOT overwrite the old file.
def load_db(self) ‑> None
-
Expand source code
def load_db(self) -> None: """ Loads the database into memory. """ # Set the time first, if the file is corrupted we don't need to try # loading it again anyways self.db_update_time = datetime.now() try: new_db = maxminddb.open_database(self.file_path) except (InvalidDatabaseError, OSError, ValueError): self._logger.exception( "Failed to load maxmind db! Maybe the download was interrupted" ) else: if self.db is not None: self.db.close() self.db = new_db self._logger.info( "File loaded successfully from %s", self.file_path )
Loads the database into memory.
def refresh_file_path(self)
-
Expand source code
def refresh_file_path(self): self.file_path = config.GEO_IP_DATABASE_PATH
Inherited members