diff --git a/scraper/daily_import_climate.py b/scraper/daily_import_climate.py index 35b83a1..c78cb58 100755 --- a/scraper/daily_import_climate.py +++ b/scraper/daily_import_climate.py @@ -6,24 +6,65 @@ import io import zipfile import datetime from config.db import db as config_db +import threading class DwdScraper: - def run(self): - last_station = self.get_station_to_update() - print("checking station", last_station) + def __init__(self, worker=1): + self._worker = worker + + self._tasks = {} + self._request_new_task = threading.Event() + + self._close = True + + def run(self): + for n in range(self._worker): + threading.Thread(target=self._run_scraper, kwargs={"id": n}, daemon=True).start() + + while self._close: + self._request_new_task.wait() + self._request_new_task.clear() + print("Checking for waiting workers") + for id, t in list(self._tasks.items()): + if t["dwd_id"] is None: + print("Worker {} needs a job".format(id)) + dwd_id = self.get_station_to_update() + if dwd_id is not None: + print("Worker {} got {}".format(id, dwd_id)) + self._tasks[id]["dwd_id"] = dwd_id + self._tasks[id]["wait_for_dwd_id"].set() + + + def _run_scraper(self, id): + + print("Worker {} spawned".format(id)) + + while self._close: + self._tasks[id] = { + "wait_for_dwd_id": threading.Event(), + "dwd_id": None + } + self._request_new_task.set() + print(id, "Waits for new task") + self._tasks[id]["wait_for_dwd_id"].wait() + print(id, "Got task {}".format(self._tasks[id]["dwd_id"])) + + self._do_scrape(self._tasks[id]["dwd_id"]) + + + def _do_scrape(self, dwd_id): + last_station = dwd_id - print(last_station) r = requests.get('https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/recent/tageswerte_KL_' + str(last_station) + '_akt.zip', stream=True) - print(r.url) - if r.status_code != 200: - return + print(dwd_id, "downloaded file") + + r.raise_for_status() zip = zipfile.ZipFile(io.BytesIO(r.content)) files = zip.namelist() - print(files) + files_climate = [f for f in files if f.startswith("produkt_klima_")] - print(files_climate) with psycopg2.connect(config_db["uri"]) as conn: with conn.cursor() as cur: @@ -36,12 +77,16 @@ class DwdScraper: is_first_line = True + count = 0 + for line in zip.open(files_climate[0]): l = line.decode('utf-8').strip() if is_first_line: is_first_line = False continue + count += 1 + b = {} b["dwd_id"], l = l.strip().split(";", 1) @@ -82,20 +127,22 @@ class DwdScraper: b["tgk"], l = l.strip().split(";", 1) b["tgk"] = self.cleanup_value(float(b["tgk"])) - #print(b) - print(curr_station_id, b["date"].isoformat()) cur.execute("SELECT id FROM climate WHERE station = %s AND date = %s; ", [curr_station_id, b["date"].isoformat()]) if cur.rowcount == 0: cur.execute("INSERT INTO climate (station, date, qn_3, fx, fm, qn_4, rsk, rskf, sdk, shk_tag, nm, vpm, pm, tmk, upm, txk, tnk, tgk) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);", [curr_station_id, b["date"].isoformat(), b["qn_3"], b["fx"], b["fm"], b["qn_4"], b["rsk"], b["rskf"], b["sdk"], b["shk_tag"], b["nm"], b["vpm"], b["pm"], b["tmk"], b["upm"], b["txk"], b["tnk"], b["tgk"]]) - print("imported") - else: - print("ignored") + + if count % 20 == 0: + print(dwd_id, "still updating") + + print(dwd_id, "finished import") cur.execute("UPDATE stations SET dwd_last_update = %s WHERE dwd_id = %s;", [datetime.datetime.today().isoformat(), last_station]) conn.commit() + print(dwd_id, "updated last change data") + def get_station_to_update(self): """ Returns DWD ID of station that should get updated soon. @@ -104,16 +151,17 @@ class DwdScraper: with psycopg2.connect(config_db["uri"]) as conn: with conn.cursor() as cur: - cur.execute("SELECT min(dwd_last_update) FROM stations;") - last_date = cur.fetchall()[0][0] + cur.execute("SELECT dwd_id FROM stations ORDER BY dwd_last_update ASC LIMIT %s", [self._worker + 1]) - if not last_date is None: - cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_last_update = %s LIMIT 1;", [last_date]) - else: - cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_last_update IS NULL LIMIT 1;") - last_station = cur.fetchone() + stations = cur.fetchall() - return last_station[4] + #print(stations) + + for station in stations: + if station[0] not in [t["dwd_id"] for id, t in self._tasks.items()]: + return station[0] + + return None def cleanup_value(self, v): if int(v) == -999: @@ -122,4 +170,4 @@ class DwdScraper: return v if __name__ == "__main__": - DwdScraper().run() + DwdScraper(worker=8).run()