lib/wttr_srv.py refactoring

v2
Igor Chubin 5 years ago
parent e2cebf74f2
commit 0ac4790007

@ -10,11 +10,12 @@ import os
import time import time
from flask import render_template, send_file, make_response from flask import render_template, send_file, make_response
import format.png import fmt.png
import parse_query import parse_query
from translations import get_message, FULL_TRANSLATION, PARTIAL_TRANSLATION, SUPPORTED_LANGS from translations import get_message, FULL_TRANSLATION, PARTIAL_TRANSLATION, SUPPORTED_LANGS
from buttons import add_buttons from buttons import add_buttons
from globals import get_help_file, log, \ from globals import get_help_file, \
BASH_FUNCTION_FILE, TRANSLATION_FILE, LOG_FILE, \ BASH_FUNCTION_FILE, TRANSLATION_FILE, LOG_FILE, \
NOT_FOUND_LOCATION, \ NOT_FOUND_LOCATION, \
MALFORMED_RESPONSE_HTML_PAGE, \ MALFORMED_RESPONSE_HTML_PAGE, \
@ -22,7 +23,7 @@ from globals import get_help_file, log, \
MY_EXTERNAL_IP, QUERY_LIMITS MY_EXTERNAL_IP, QUERY_LIMITS
from location import is_location_blocked, location_processing from location import is_location_blocked, location_processing
from limits import Limits from limits import Limits
from wttr import get_wetter from view.wttr import get_wetter
from view.moon import get_moon from view.moon import get_moon
from view.line import wttr_line from view.line import wttr_line
@ -52,10 +53,8 @@ def show_text_file(name, lang):
.replace('SUPPORTED_LANGUAGES', ' '.join(SUPPORTED_LANGS)) .replace('SUPPORTED_LANGUAGES', ' '.join(SUPPORTED_LANGS))
return text return text
def client_ip_address(request): def _client_ip_address(request):
""" """Return client ip address for flask `request`.
Return client ip address for `request`.
Flask related
""" """
if request.headers.getlist("X-PNG-Query-For"): if request.headers.getlist("X-PNG-Query-For"):
@ -111,25 +110,25 @@ def _parse_language_header(header):
elif lang == 'en': elif lang == 'en':
yield None, lang_tuple[1] yield None, lang_tuple[1]
try: try:
return max(supported_langs(), key=lambda lang_tuple:lang_tuple[1])[0] return max(supported_langs(), key=lambda lang_tuple: lang_tuple[1])[0]
except ValueError: except ValueError:
return None return None
return _find_supported_language(_parse_accept_language(header)) return _find_supported_language(_parse_accept_language(header))
def get_answer_language_and_format(request): def get_answer_language_and_view(request):
""" """
Return preferred answer language based on Return preferred answer language based on
domain name, query arguments and headers domain name, query arguments and headers
""" """
lang = None lang = None
fmt = None view_name = None
hostname = request.headers['Host'] hostname = request.headers['Host']
if hostname != 'wttr.in' and hostname.endswith('.wttr.in'): if hostname != 'wttr.in' and hostname.endswith('.wttr.in'):
lang = hostname[:-8] lang = hostname[:-8]
if lang == "v2": if lang == "v2":
fmt = "v2" view_name = "v2"
lang = None lang = None
if 'lang' in request.args: if 'lang' in request.args:
@ -139,7 +138,7 @@ def get_answer_language_and_format(request):
if lang is None and header_accept_language: if lang is None and header_accept_language:
lang = _parse_language_header(header_accept_language) lang = _parse_language_header(header_accept_language)
return lang, fmt return lang, view_name
def get_output_format(request, query): def get_output_format(request, query):
""" """
@ -150,16 +149,14 @@ def get_output_format(request, query):
if 'format' in query: if 'format' in query:
return False return False
# FIXME
user_agent = request.headers.get('User-Agent', '').lower() user_agent = request.headers.get('User-Agent', '').lower()
if query.get('force-ansi'): if query.get('force-ansi'):
return False return False
html_output = not any(agent in user_agent for agent in PLAIN_TEXT_AGENTS) html_output = not any(agent in user_agent for agent in PLAIN_TEXT_AGENTS)
return html_output return html_output
def cyclic_location_selection(locations, period): def _cyclic_location_selection(locations, period):
""" """Return one of `locations` (: separated list)
Return one of `locations` (: separated list)
basing on the current time and query interval `period` basing on the current time and query interval `period`
""" """
@ -172,145 +169,200 @@ def cyclic_location_selection(locations, period):
except ValueError: except ValueError:
period = 1 period = 1
index = int(time.time())/period % len(locations) index = int(time.time()/period) % len(locations)
return locations[index] return locations[index]
def wttr(location, request): def _response(parsed_query, query, fast_mode=False):
"""Create response text based on `parsed_query` and `query` data.
If `fast_mode` is True, process only requests that can
be handled very fast (cached and static files).
""" """
Main rendering function, it processes incoming weather queries.
Depending on user agent it returns output in HTML or ANSI format.
Incoming data: answer = None
request.args cache_signature = cache.get_signature(
request.headers parsed_query["user_agent"],
request.remote_addr parsed_query["request_url"],
request.referrer parsed_query["ip_addr"],
request.query_string parsed_query["lang"])
""" answer = cache.get(cache_signature)
if parsed_query['orig_location'] in PLAIN_TEXT_PAGES:
answer = show_text_file(parsed_query['orig_location'], parsed_query['lang'])
if parsed_query['html_output']:
answer = render_template('index.html', body=answer)
if answer or fast_mode:
return answer
# at this point, we could not handle the query fast,
# so we handle it with all available logic
if parsed_query["view"] or 'format' in query:
response_text = wttr_line(
parsed_query['location'],
parsed_query['override_location_name'],
parsed_query['full_address'],
query,
parsed_query['lang'],
parsed_query['view'])
return cache.store(cache_signature, response_text)
if parsed_query.get('png_filename'):
options = {
'ip_addr': parsed_query['ip_addr'],
'lang': parsed_query['lang'],
'location': parsed_query['location']}
options.update(query)
cached_png_file = fmt.png.make_wttr_in_png(
parsed_query['png_filename'], options=options)
response = make_response(send_file(
cached_png_file,
attachment_filename=parsed_query['png_filename'],
mimetype='image/png'))
for key, value in {
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Expires': '0',
}.items():
response.headers[key] = value
def _wrap_response(response_text, html_output):
response = make_response(response_text)
response.mimetype = 'text/html' if html_output else 'text/plain'
return response return response
if is_location_blocked(location): orig_location = parsed_query['orig_location']
return "" if orig_location and \
(orig_location.lower() == 'moon' or \
orig_location.lower().startswith('moon@')):
output = get_moon(
parsed_query['orig_location'],
html=parsed_query['html_output'],
lang=parsed_query['lang'],
query=query)
else:
output = get_wetter(
parsed_query['location'],
parsed_query['ip_addr'],
html=parsed_query['html_output'],
lang=parsed_query['lang'],
query=query,
location_name=parsed_query['override_location_name'],
full_address=parsed_query['full_address'],
url=parsed_query['request_url'],)
if query.get('days', '3') != '0' and not query.get('no-follow-line'):
if parsed_query['html_output']:
output = add_buttons(output)
else:
output += '\n' + get_message('FOLLOW_ME', parsed_query['lang']) + '\n'
ip_addr = client_ip_address(request) return cache.store(cache_signature, output)
# return output
try: def parse_request(location, request, query, fast_mode=False):
LIMITS.check_ip(ip_addr) """Parse request and provided extended information for the query,
except RuntimeError as exception: including location data, language, output format, view, etc.
return str(exception)
Incoming data:
`location` location name extracted from the query url
`request.args`
`request.headers`
`request.remote_addr`
`request.referrer`
`request.query_string`
`query` parsed command line arguments
Return: dictionary with parsed parameters
"""
png_filename = None png_filename = None
if location is not None and location.lower().endswith(".png"): if location is not None and location.lower().endswith(".png"):
png_filename = location png_filename = location
location = location[:-4] location = location[:-4]
if location and ':' in location and location[0] != ":":
location = _cyclic_location_selection(location, query.get('period', 1))
lang, fmt = get_answer_language_and_format(request) lang, _view = get_answer_language_and_view(request)
query = parse_query.parse_query(request.args)
html_output = get_output_format(request, query) html_output = get_output_format(request, query)
user_agent = request.headers.get('User-Agent', '').lower()
# generating cache signature
cache_signature = cache.get_signature(user_agent, request.url, ip_addr, lang)
answer = cache.get(cache_signature)
if answer:
return _wrap_response(answer, html_output)
if location in PLAIN_TEXT_PAGES: ip_addr = _client_ip_address(request)
help_ = show_text_file(location, lang) parsed_query = {
if html_output: 'ip_addr': ip_addr,
return _wrap_response(render_template('index.html', body=help_), html_output) 'user_agent': request.headers.get('User-Agent', '').lower(),
return _wrap_response(help_, html_output) 'lang': lang,
'view': _view,
if location and ':' in location: 'html_output': html_output,
location = cyclic_location_selection(location, query.get('period', 1)) 'orig_location': location,
'location': location,
orig_location = location 'request_url': request.url,
}
if not png_filename:
if not png_filename and not fast_mode:
location, override_location_name, full_address, country, query_source_location = \ location, override_location_name, full_address, country, query_source_location = \
location_processing(location, ip_addr) location_processing(location, ip_addr)
us_ip = query_source_location[1] == 'United States' and 'slack' not in user_agent us_ip = query_source_location[1] == 'United States' \
and 'slack' not in parsed_query['user_agent']
query = parse_query.metric_or_imperial(query, lang, us_ip=us_ip) query = parse_query.metric_or_imperial(query, lang, us_ip=us_ip)
# logging query
orig_location_utf8 = (orig_location or "")
location_utf8 = location
use_imperial = query.get('use_imperial', False)
log(" ".join(map(str,
[ip_addr, user_agent, orig_location_utf8, location_utf8, use_imperial, lang])))
if country and location != NOT_FOUND_LOCATION: if country and location != NOT_FOUND_LOCATION:
location = "%s,%s" % (location, country) location = "%s,%s" % (location, country)
# We are ready to return the answer parsed_query.update({
'location': location,
'override_location_name': override_location_name,
'full_address': full_address,
'country': country,
'query_source_location': query_source_location})
return parsed_query
def wttr(location, request):
"""Main rendering function, it processes incoming weather queries,
and depending on the User-Agent string and other paramters of the query
it returns output in HTMLi, ANSI or other format.
"""
def _wrap_response(response_text, html_output):
if not isinstance(response_text, str):
return response_text
response = make_response(response_text)
response.mimetype = 'text/html' if html_output else 'text/plain'
return response
if is_location_blocked(location):
return ""
try: try:
if fmt or 'format' in query: LIMITS.check_ip(_client_ip_address(request))
response_text = wttr_line( except RuntimeError as exception:
location, override_location_name, full_address, query, lang, fmt) return str(exception)
fmt = fmt or query.get('format')
response_text = cache.store(cache_signature, response_text) query = parse_query.parse_query(request.args)
return _wrap_response(
response_text,
html_output)
if png_filename:
options = {
'ip_addr': ip_addr,
'lang': lang,
'location': location}
options.update(query)
cached_png_file = format.png.make_wttr_in_png(png_filename, options=options)
response = make_response(send_file(cached_png_file,
attachment_filename=png_filename,
mimetype='image/png'))
for key, value in {
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Expires': '0',
}.items():
response.headers[key] = value
# Trying to disable github caching
return response
if orig_location and (orig_location.lower() == 'moon' or orig_location.lower().startswith('moon@')):
output = get_moon(orig_location, html=html_output, lang=lang, query=query)
else:
output = get_wetter(location, ip_addr,
html=html_output,
lang=lang,
query=query,
location_name=override_location_name,
full_address=full_address,
url=request.url,
)
if query.get('days', '3') != '0' and not query.get('no-follow-line'):
if html_output:
output = add_buttons(output)
else:
#output += '\n' + get_message('NEW_FEATURE', lang).encode('utf-8')
output += '\n' + get_message('FOLLOW_ME', lang) + '\n'
return _wrap_response(output, html_output)
# first, we try to process the query as fast as possible
# (using the cache and static files),
# and only if "fast_mode" was unsuccessful,
# use the full track
parsed_query = parse_request(location, request, query, fast_mode=True)
response = _response(parsed_query, query, fast_mode=True)
try:
if not response:
parsed_query = parse_request(location, request, query)
response = _response(parsed_query, query)
# pylint: disable=broad-except
except Exception as exception: except Exception as exception:
# if 'Malformed response' in str(exception) \ logging.error("Exception has occured", exc_info=1)
# or 'API key has reached calls per day allowed limit' in str(exception): if parsed_query['html_output']:
logging.error("Exception has occured", exc_info=1) response = MALFORMED_RESPONSE_HTML_PAGE
if html_output: else:
return _wrap_response(MALFORMED_RESPONSE_HTML_PAGE, html_output) response = get_message('CAPACITY_LIMIT_REACHED', parsed_query['lang'])
return _wrap_response(get_message('CAPACITY_LIMIT_REACHED', lang), html_output) return _wrap_response(response, parsed_query['html_output'])
# logging.error("Exception has occured", exc_info=1)
# return "ERROR"
if __name__ == "__main__": if __name__ == "__main__":
import doctest import doctest

Loading…
Cancel
Save