From cf620b86bcb54d605b2ba4b580847a84d5515ac7 Mon Sep 17 00:00:00 2001 From: clerie Date: Mon, 28 Feb 2022 17:02:01 +0100 Subject: [PATCH] Move DwdScraper to a class --- scraper/daily_import_climate.py | 190 ++++++++++++++++---------------- 1 file changed, 98 insertions(+), 92 deletions(-) diff --git a/scraper/daily_import_climate.py b/scraper/daily_import_climate.py index 17e516d..935c8bf 100755 --- a/scraper/daily_import_climate.py +++ b/scraper/daily_import_climate.py @@ -7,104 +7,110 @@ import zipfile import datetime from config.db import db as config_db -def cleanup_value(v): - if int(v) == -999: - return None +class DwdScraper: - return v + def run(self): + def cleanup_value(v): + if int(v) == -999: + return None -conn = psycopg2.connect(config_db["uri"]) -cur = conn.cursor() + return v -cur.execute("SELECT min(dwd_last_update) FROM stations;") -last_date = cur.fetchall()[0][0] -print(last_date) + conn = psycopg2.connect(config_db["uri"]) + cur = conn.cursor() -if not last_date is None: - cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_last_update = %s LIMIT 1;", [last_date]) -else: - cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_last_update IS NULL LIMIT 1;") -last_station = cur.fetchone() -print(last_station) + cur.execute("SELECT min(dwd_last_update) FROM stations;") + last_date = cur.fetchall()[0][0] + print(last_date) -curr_station_id = last_station[0] -curr_station_dwd_id = last_station[4] - -print(curr_station_dwd_id) -r = requests.get('https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/recent/tageswerte_KL_' + str(curr_station_dwd_id) + '_akt.zip', stream=True) -print(r.url) -if r.status_code == 200: - zip = zipfile.ZipFile(io.BytesIO(r.content)) - files = zip.namelist() - print(files) - files_climate = [f for f in files if f.startswith("produkt_klima_")] - print(files_climate) - - - buffer = [] - - is_first_line = True - - for line in zip.open(files_climate[0]): - l = line.decode('utf-8').strip() - if is_first_line: - is_first_line = False - continue - - b = {} - - b["dwd_id"], l = l.strip().split(";", 1) - b["dwd_id"] = str(b["dwd_id"]) - b["date"], l = l.strip().split(";", 1) - b["date"] = str(b["date"]) - b["date"] = datetime.date(int(b["date"][0:4]), int(b["date"][4:6]), int(b["date"][6:8])) - b["qn_3"], l = l.strip().split(";", 1) - b["qn_3"] = cleanup_value(int(b["qn_3"])) - b["fx"], l = l.strip().split(";", 1) - b["fx"] = cleanup_value(float(b["fx"])) - b["fm"], l = l.strip().split(";", 1) - b["fm"] = cleanup_value(float(b["fm"])) - b["qn_4"], l = l.strip().split(";", 1) - b["qn_4"] = cleanup_value(int(b["qn_4"])) - b["rsk"], l = l.strip().split(";", 1) - b["rsk"] = cleanup_value(float(b["rsk"])) - b["rskf"], l = l.strip().split(";", 1) - b["rskf"] = cleanup_value(int(b["rskf"])) - b["sdk"], l = l.strip().split(";", 1) - b["sdk"] = cleanup_value(float(b["sdk"])) - b["shk_tag"], l = l.strip().split(";", 1) - b["shk_tag"] = cleanup_value(float(b["shk_tag"])) - b["nm"], l = l.strip().split(";", 1) - b["nm"] = cleanup_value(float(b["nm"])) - b["vpm"], l = l.strip().split(";", 1) - b["vpm"] = cleanup_value(float(b["vpm"])) - b["pm"], l = l.strip().split(";", 1) - b["pm"] = cleanup_value(float(b["pm"])) - b["tmk"], l = l.strip().split(";", 1) - b["tmk"] = cleanup_value(float(b["tmk"])) - b["upm"], l = l.strip().split(";", 1) - b["upm"] = cleanup_value(float(b["upm"])) - b["txk"], l = l.strip().split(";", 1) - b["txk"] = cleanup_value(float(b["txk"])) - b["tnk"], l = l.strip().split(";", 1) - b["tnk"] = cleanup_value(float(b["tnk"])) - b["tgk"], l = l.strip().split(";", 1) - b["tgk"] = cleanup_value(float(b["tgk"])) - - #print(b) - print(curr_station_id, b["date"].isoformat()) - - cur.execute("SELECT id FROM climate WHERE station = %s AND date = %s; ", [curr_station_id, b["date"].isoformat()]) - if cur.rowcount == 0: - cur.execute("INSERT INTO climate (station, date, qn_3, fx, fm, qn_4, rsk, rskf, sdk, shk_tag, nm, vpm, pm, tmk, upm, txk, tnk, tgk) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);", - [curr_station_id, b["date"].isoformat(), b["qn_3"], b["fx"], b["fm"], b["qn_4"], b["rsk"], b["rskf"], b["sdk"], b["shk_tag"], b["nm"], b["vpm"], b["pm"], b["tmk"], b["upm"], b["txk"], b["tnk"], b["tgk"]]) - print("imported") + if not last_date is None: + cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_last_update = %s LIMIT 1;", [last_date]) else: - print("ignored") + cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_last_update IS NULL LIMIT 1;") + last_station = cur.fetchone() + print(last_station) -cur.execute("UPDATE stations SET dwd_last_update = %s WHERE id = %s;", [datetime.datetime.today().isoformat(), curr_station_id]) -conn.commit() + curr_station_id = last_station[0] + curr_station_dwd_id = last_station[4] + + print(curr_station_dwd_id) + r = requests.get('https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/recent/tageswerte_KL_' + str(curr_station_dwd_id) + '_akt.zip', stream=True) + print(r.url) + if r.status_code == 200: + zip = zipfile.ZipFile(io.BytesIO(r.content)) + files = zip.namelist() + print(files) + files_climate = [f for f in files if f.startswith("produkt_klima_")] + print(files_climate) -cur.close() -conn.close() + buffer = [] + + is_first_line = True + + for line in zip.open(files_climate[0]): + l = line.decode('utf-8').strip() + if is_first_line: + is_first_line = False + continue + + b = {} + + b["dwd_id"], l = l.strip().split(";", 1) + b["dwd_id"] = str(b["dwd_id"]) + b["date"], l = l.strip().split(";", 1) + b["date"] = str(b["date"]) + b["date"] = datetime.date(int(b["date"][0:4]), int(b["date"][4:6]), int(b["date"][6:8])) + b["qn_3"], l = l.strip().split(";", 1) + b["qn_3"] = cleanup_value(int(b["qn_3"])) + b["fx"], l = l.strip().split(";", 1) + b["fx"] = cleanup_value(float(b["fx"])) + b["fm"], l = l.strip().split(";", 1) + b["fm"] = cleanup_value(float(b["fm"])) + b["qn_4"], l = l.strip().split(";", 1) + b["qn_4"] = cleanup_value(int(b["qn_4"])) + b["rsk"], l = l.strip().split(";", 1) + b["rsk"] = cleanup_value(float(b["rsk"])) + b["rskf"], l = l.strip().split(";", 1) + b["rskf"] = cleanup_value(int(b["rskf"])) + b["sdk"], l = l.strip().split(";", 1) + b["sdk"] = cleanup_value(float(b["sdk"])) + b["shk_tag"], l = l.strip().split(";", 1) + b["shk_tag"] = cleanup_value(float(b["shk_tag"])) + b["nm"], l = l.strip().split(";", 1) + b["nm"] = cleanup_value(float(b["nm"])) + b["vpm"], l = l.strip().split(";", 1) + b["vpm"] = cleanup_value(float(b["vpm"])) + b["pm"], l = l.strip().split(";", 1) + b["pm"] = cleanup_value(float(b["pm"])) + b["tmk"], l = l.strip().split(";", 1) + b["tmk"] = cleanup_value(float(b["tmk"])) + b["upm"], l = l.strip().split(";", 1) + b["upm"] = cleanup_value(float(b["upm"])) + b["txk"], l = l.strip().split(";", 1) + b["txk"] = cleanup_value(float(b["txk"])) + b["tnk"], l = l.strip().split(";", 1) + b["tnk"] = cleanup_value(float(b["tnk"])) + b["tgk"], l = l.strip().split(";", 1) + b["tgk"] = cleanup_value(float(b["tgk"])) + + #print(b) + print(curr_station_id, b["date"].isoformat()) + + cur.execute("SELECT id FROM climate WHERE station = %s AND date = %s; ", [curr_station_id, b["date"].isoformat()]) + if cur.rowcount == 0: + cur.execute("INSERT INTO climate (station, date, qn_3, fx, fm, qn_4, rsk, rskf, sdk, shk_tag, nm, vpm, pm, tmk, upm, txk, tnk, tgk) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);", + [curr_station_id, b["date"].isoformat(), b["qn_3"], b["fx"], b["fm"], b["qn_4"], b["rsk"], b["rskf"], b["sdk"], b["shk_tag"], b["nm"], b["vpm"], b["pm"], b["tmk"], b["upm"], b["txk"], b["tnk"], b["tgk"]]) + print("imported") + else: + print("ignored") + + cur.execute("UPDATE stations SET dwd_last_update = %s WHERE id = %s;", [datetime.datetime.today().isoformat(), curr_station_id]) + conn.commit() + + + cur.close() + conn.close() + +if __name__ == "__main__": + DwdScraper().run()