From 1b25e16a3751528a5e889c1e6bd6717ddd720d0a Mon Sep 17 00:00:00 2001 From: Igor Chubin Date: Mon, 16 Nov 2020 19:52:10 +0100 Subject: [PATCH] Minor code cleanup --- lib/location.py | 90 +++++++++++++++++++++++-------------------------- 1 file changed, 42 insertions(+), 48 deletions(-) diff --git a/lib/location.py b/lib/location.py index 4edda69..6b79b4d 100644 --- a/lib/location.py +++ b/lib/location.py @@ -31,38 +31,30 @@ Exports: location_processing is_location_blocked """ + from __future__ import print_function -import sys -import os import json +import os import socket -import requests +import sys + import geoip2.database import pycountry +import requests from globals import GEOLITE, GEOLOCATOR_SERVICE, IP2LCACHE, IP2LOCATION_KEY, NOT_FOUND_LOCATION, \ ALIASES, BLACKLIST, IATA_CODES_FILE, IPLOCATION_ORDER, IPINFO_TOKEN GEOIP_READER = geoip2.database.Reader(GEOLITE) - COUNTRY_MAP = {"Russian Federation": "Russia"} def _debug_log(s): with open("/tmp/debug.log", "a") as f: f.write(s+"\n") -def ascii_only(string): - "Check if `string` contains only ASCII symbols" - try: - for _ in range(5): - string = string.encode('utf-8') - return True - except UnicodeDecodeError: - return False - -def is_ip(ip_addr): +def _is_ip(ip_addr): """ Check if `ip_addr` looks like an IP Address """ @@ -80,7 +72,8 @@ def is_ip(ip_addr): except socket.error: return False -def location_normalize(location): + +def _location_normalize(location): """ Normalize location name `location` """ @@ -94,8 +87,7 @@ def location_normalize(location): return location - -def geolocator(location): +def _geolocator(location): """ Return a GPS pair for specified `location` or None if nothing can be found @@ -120,7 +112,7 @@ def geolocator(location): return None -def ipcachewrite(ip_addr, location): +def _ipcachewrite(ip_addr, location): """ Write a retrieved ip+location into cache Can stress some filesystems after long term use, see https://stackoverflow.com/questions/466521/how-many-files-can-i-put-in-a-directory @@ -132,7 +124,8 @@ def ipcachewrite(ip_addr, location): file.write(location[3] + ';' + location[2] + ';' + location[1] + ';' + location[0] + ';' + location[4] + ';' + location[5]) # like ip2location format -def ipcache(ip_addr): + +def _ipcache(ip_addr): """ Retrieve a location from cache by ip addr Returns a triple of (CITY, REGION, COUNTRY) or None TODO: When cache becomes more robust, transition to using latlong @@ -152,7 +145,7 @@ def ipcache(ip_addr): return None -def ip2location(ip_addr): +def _ip2location(ip_addr): """Convert IP address `ip_addr` to a location name""" # if IP2LOCATION_KEY is not set, do not query, # because the query wont be processed anyway @@ -171,7 +164,7 @@ def ip2location(ip_addr): return city, region, country, ccode, lat, long -def ipinfo(ip_addr): +def _ipinfo(ip_addr): if not IPINFO_TOKEN: return None try: @@ -191,7 +184,7 @@ def ipinfo(ip_addr): return city, region, country, ccode, lat, long -def geoip(ip_addr): +def _geoip(ip_addr): try: response = GEOIP_READER.city(ip_addr) city, region, country, ccode, lat, long = response.city.name, response.subdivisions.name, response.country.name, response.country.iso_code, response.location.latitude, response.location.longitude @@ -200,17 +193,18 @@ def geoip(ip_addr): return city, region, country, ccode, lat, long -def workaround(country): +def _country_name_workaround(country): # workaround for strange bug with the country name # maybe some other countries has this problem too country = COUNTRY_MAP.get(country) or country return country -def get_location(ip_addr): + +def _get_location(ip_addr): """ Return location triple (CITY, REGION, COUNTRY) for `ip_addr` """ - location = ipcache(ip_addr) + location = _ipcache(ip_addr) if location: return location @@ -218,20 +212,20 @@ def get_location(ip_addr): # (CITY, REGION, COUNTRY, CCODE, LAT, LONG) for method in IPLOCATION_ORDER: if method == 'geoip': - location = geoip(ip_addr) + location = _geoip(ip_addr) elif method == 'ip2location': - location = ip2location(ip_addr) + location = _ip2location(ip_addr) elif method == 'ipinfo': - location = ipinfo(ip_addr) + location = _ipinfo(ip_addr) else: print("ERROR: invalid iplocation method specified: %s" % method) if location is not None: break if location is not None and all(location): - ipcachewrite(ip_addr, location) + _ipcachewrite(ip_addr, location) # cache write used to happen before workaround, preserve that - location[2] = workaround(location[2]) + location[2] = _country_name_workaround(location[2]) return location[:3] # city, region, country # ccode is cached but not needed for location @@ -249,15 +243,15 @@ def get_location(ip_addr): return NOT_FOUND_LOCATION, None, None -def location_canonical_name(location): +def _location_canonical_name(location): "Find canonical name for `location`" - location = location_normalize(location) + location = _location_normalize(location) if location.lower() in LOCATION_ALIAS: return LOCATION_ALIAS[location.lower()] return location -def load_aliases(aliases_filename): +def _load_aliases(aliases_filename): """ Load aliases from the aliases file """ @@ -269,10 +263,10 @@ def load_aliases(aliases_filename): except AttributeError: from_, to_ = line.split(':', 1) - aliases_db[location_normalize(from_)] = location_normalize(to_) + aliases_db[_location_normalize(from_)] = _location_normalize(to_) return aliases_db -def load_iata_codes(iata_codes_filename): +def _load_iata_codes(iata_codes_filename): """ Load IATA codes from the IATA codes file """ @@ -282,9 +276,9 @@ def load_iata_codes(iata_codes_filename): result.append(line.strip()) return set(result) -LOCATION_ALIAS = load_aliases(ALIASES) +LOCATION_ALIAS = _load_aliases(ALIASES) LOCATION_BLACK_LIST = [x.strip() for x in open(BLACKLIST, 'r').readlines()] -IATA_CODES = load_iata_codes(IATA_CODES_FILE) +IATA_CODES = _load_iata_codes(IATA_CODES_FILE) def is_location_blocked(location): """ @@ -294,7 +288,7 @@ def is_location_blocked(location): return location is not None and location.lower() in LOCATION_BLACK_LIST -def get_hemisphere(location): +def _get_hemisphere(location): """ Return hemisphere of the location (True = North, False = South). Assume North and return True if location can't be found. @@ -302,11 +296,12 @@ def get_hemisphere(location): if all(location): location_string = ", ".join(location) - geolocation = geolocator(location_string) + geolocation = _geolocator(location_string) if geolocation is None: return True return geolocation["latitude"] > 0 + def location_processing(location, ip_addr): """ """ @@ -326,7 +321,7 @@ def location_processing(location, ip_addr): if location and location.lstrip('~ ').startswith('@'): try: - location, region, country = get_location( + location, region, country = _get_location( socket.gethostbyname( location.lstrip('~ ')[1:])) location = '~' + location @@ -336,20 +331,20 @@ def location_processing(location, ip_addr): except: location, region, country = NOT_FOUND_LOCATION, None, None - query_source_location = get_location(ip_addr) + query_source_location = _get_location(ip_addr) # For moon queries, hemisphere must be found # True for North, False for South hemisphere = False if location is not None and (location.lower()+"@").startswith("moon@"): - hemisphere = get_hemisphere(query_source_location) + hemisphere = _get_hemisphere(query_source_location) country = None if not location or location == 'MyLocation': location = ip_addr - if is_ip(location): - location, region, country = get_location(location) + if _is_ip(location): + location, region, country = _get_location(location) # location is just city here # here too @@ -360,7 +355,7 @@ def location_processing(location, ip_addr): hide_full_address = not force_show_full_address if location and not location.startswith('~'): - tmp_location = location_canonical_name(location) + tmp_location = _location_canonical_name(location) if tmp_location != location: override_location_name = location location = tmp_location @@ -368,7 +363,7 @@ def location_processing(location, ip_addr): # up to this point it is possible that the name # contains some unicode symbols # here we resolve them - if location is not None: # and not ascii_only(location): + if location is not None: location = "~" + location.lstrip('~ ') if not override_location_name: override_location_name = location.lstrip('~') @@ -377,7 +372,7 @@ def location_processing(location, ip_addr): # location = '~%s' % location if location is not None and not location.startswith("~-,") and location.startswith('~'): - geolocation = geolocator(location_canonical_name(location[1:])) + geolocation = _geolocator(_location_canonical_name(location[1:])) if geolocation is not None: if not override_location_name: override_location_name = location[1:].replace('+', ' ') @@ -390,7 +385,6 @@ def location_processing(location, ip_addr): else: location = NOT_FOUND_LOCATION #location[1:] - return location, \ override_location_name, \ full_address, \