#! /usr/bin/env python3 import psycopg2 import requests import io import zipfile import datetime from config.db import db as config_db import threading class DwdScraper: def __init__(self, worker=1): self._worker = worker self._tasks = {} self._request_new_task = threading.Event() self._close = True def run(self): for n in range(self._worker): threading.Thread(target=self._run_scraper, kwargs={"id": n}, daemon=True).start() while self._close: self._request_new_task.wait() self._request_new_task.clear() print("Checking for waiting workers") for id, t in list(self._tasks.items()): if t["dwd_id"] is None: print("Worker {} needs a job".format(id)) dwd_id = self.get_station_to_update() if dwd_id is not None: print("Worker {} got {}".format(id, dwd_id)) self._tasks[id]["dwd_id"] = dwd_id self._tasks[id]["wait_for_dwd_id"].set() def _run_scraper(self, id): print("Worker {} spawned".format(id)) while self._close: self._tasks[id] = { "wait_for_dwd_id": threading.Event(), "dwd_id": None } self._request_new_task.set() print(id, "Waits for new task") self._tasks[id]["wait_for_dwd_id"].wait() print(id, "Got task {}".format(self._tasks[id]["dwd_id"])) self._do_scrape(self._tasks[id]["dwd_id"]) def _do_scrape(self, dwd_id): last_station = dwd_id r = requests.get('https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/recent/tageswerte_KL_' + str(last_station) + '_akt.zip', stream=True) print(dwd_id, "downloaded file") r.raise_for_status() zip = zipfile.ZipFile(io.BytesIO(r.content)) files = zip.namelist() files_climate = [f for f in files if f.startswith("produkt_klima_")] with psycopg2.connect(config_db["uri"]) as conn: with conn.cursor() as cur: cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_id = %s;", [last_station]) current_station = cur.fetchone() curr_station_id = current_station[0] curr_station_dwd_id = current_station[4] is_first_line = True count = 0 for line in zip.open(files_climate[0]): l = line.decode('utf-8').strip() if is_first_line: is_first_line = False continue count += 1 b = {} b["dwd_id"], l = l.strip().split(";", 1) b["dwd_id"] = str(b["dwd_id"]) b["date"], l = l.strip().split(";", 1) b["date"] = str(b["date"]) b["date"] = datetime.date(int(b["date"][0:4]), int(b["date"][4:6]), int(b["date"][6:8])) b["qn_3"], l = l.strip().split(";", 1) b["qn_3"] = self.cleanup_value(int(b["qn_3"])) b["fx"], l = l.strip().split(";", 1) b["fx"] = self.cleanup_value(float(b["fx"])) b["fm"], l = l.strip().split(";", 1) b["fm"] = self.cleanup_value(float(b["fm"])) b["qn_4"], l = l.strip().split(";", 1) b["qn_4"] = self.cleanup_value(int(b["qn_4"])) b["rsk"], l = l.strip().split(";", 1) b["rsk"] = self.cleanup_value(float(b["rsk"])) b["rskf"], l = l.strip().split(";", 1) b["rskf"] = self.cleanup_value(int(b["rskf"])) b["sdk"], l = l.strip().split(";", 1) b["sdk"] = self.cleanup_value(float(b["sdk"])) b["shk_tag"], l = l.strip().split(";", 1) b["shk_tag"] = self.cleanup_value(float(b["shk_tag"])) b["nm"], l = l.strip().split(";", 1) b["nm"] = self.cleanup_value(float(b["nm"])) b["vpm"], l = l.strip().split(";", 1) b["vpm"] = self.cleanup_value(float(b["vpm"])) b["pm"], l = l.strip().split(";", 1) b["pm"] = self.cleanup_value(float(b["pm"])) b["tmk"], l = l.strip().split(";", 1) b["tmk"] = self.cleanup_value(float(b["tmk"])) b["upm"], l = l.strip().split(";", 1) b["upm"] = self.cleanup_value(float(b["upm"])) b["txk"], l = l.strip().split(";", 1) b["txk"] = self.cleanup_value(float(b["txk"])) b["tnk"], l = l.strip().split(";", 1) b["tnk"] = self.cleanup_value(float(b["tnk"])) b["tgk"], l = l.strip().split(";", 1) b["tgk"] = self.cleanup_value(float(b["tgk"])) cur.execute("SELECT id FROM climate WHERE station = %s AND date = %s; ", [curr_station_id, b["date"].isoformat()]) if cur.rowcount == 0: cur.execute("INSERT INTO climate (station, date, qn_3, fx, fm, qn_4, rsk, rskf, sdk, shk_tag, nm, vpm, pm, tmk, upm, txk, tnk, tgk) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);", [curr_station_id, b["date"].isoformat(), b["qn_3"], b["fx"], b["fm"], b["qn_4"], b["rsk"], b["rskf"], b["sdk"], b["shk_tag"], b["nm"], b["vpm"], b["pm"], b["tmk"], b["upm"], b["txk"], b["tnk"], b["tgk"]]) if count % 20 == 0: print(dwd_id, "still updating") print(dwd_id, "finished import") cur.execute("UPDATE stations SET dwd_last_update = %s WHERE dwd_id = %s;", [datetime.datetime.today().isoformat(), last_station]) conn.commit() print(dwd_id, "updated last change data") def get_station_to_update(self): """ Returns DWD ID of station that should get updated soon. """ with psycopg2.connect(config_db["uri"]) as conn: with conn.cursor() as cur: cur.execute("SELECT dwd_id FROM stations ORDER BY dwd_last_update ASC LIMIT %s", [self._worker + 1]) stations = cur.fetchall() #print(stations) for station in stations: if station[0] not in [t["dwd_id"] for id, t in self._tasks.items()]: return station[0] return None def cleanup_value(self, v): if int(v) == -999: return None return v if __name__ == "__main__": DwdScraper(worker=8).run()