Compare commits
8 Commits
2adef5aa10
...
eae785064e
Author | SHA1 | Date | |
---|---|---|---|
eae785064e | |||
9fb1d81652 | |||
fc952670bb | |||
88e67bcd3e | |||
6d3b85373e | |||
bf638d84f3 | |||
cf620b86bc | |||
f7cb2ca86e |
@ -6,51 +6,87 @@ import io
|
|||||||
import zipfile
|
import zipfile
|
||||||
import datetime
|
import datetime
|
||||||
from config.db import db as config_db
|
from config.db import db as config_db
|
||||||
|
import threading
|
||||||
|
|
||||||
def cleanup_value(v):
|
class DwdScraper:
|
||||||
if int(v) == -999:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return v
|
def __init__(self, worker=1):
|
||||||
|
self._worker = worker
|
||||||
|
|
||||||
conn = psycopg2.connect(config_db["uri"])
|
self._tasks = {}
|
||||||
cur = conn.cursor()
|
self._request_new_task = threading.Event()
|
||||||
|
|
||||||
cur.execute("SELECT min(dwd_last_update) FROM stations;")
|
self._close = True
|
||||||
last_date = cur.fetchall()[0][0]
|
|
||||||
print(last_date)
|
|
||||||
|
|
||||||
if not last_date is None:
|
def run(self):
|
||||||
cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_last_update = %s LIMIT 1;", [last_date])
|
for n in range(self._worker):
|
||||||
else:
|
threading.Thread(target=self._run_scraper, kwargs={"id": n}, daemon=True).start()
|
||||||
cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_last_update IS NULL LIMIT 1;")
|
|
||||||
last_station = cur.fetchone()
|
|
||||||
print(last_station)
|
|
||||||
|
|
||||||
curr_station_id = last_station[0]
|
while self._close:
|
||||||
curr_station_dwd_id = last_station[4]
|
self._request_new_task.wait()
|
||||||
|
self._request_new_task.clear()
|
||||||
|
print("Checking for waiting workers")
|
||||||
|
for id, t in list(self._tasks.items()):
|
||||||
|
if t["dwd_id"] is None:
|
||||||
|
print("Worker {} needs a job".format(id))
|
||||||
|
dwd_id = self.get_station_to_update()
|
||||||
|
if dwd_id is not None:
|
||||||
|
print("Worker {} got {}".format(id, dwd_id))
|
||||||
|
self._tasks[id]["dwd_id"] = dwd_id
|
||||||
|
self._tasks[id]["wait_for_dwd_id"].set()
|
||||||
|
|
||||||
|
|
||||||
|
def _run_scraper(self, id):
|
||||||
|
|
||||||
|
print("Worker {} spawned".format(id))
|
||||||
|
|
||||||
|
while self._close:
|
||||||
|
self._tasks[id] = {
|
||||||
|
"wait_for_dwd_id": threading.Event(),
|
||||||
|
"dwd_id": None
|
||||||
|
}
|
||||||
|
self._request_new_task.set()
|
||||||
|
print(id, "Waits for new task")
|
||||||
|
self._tasks[id]["wait_for_dwd_id"].wait()
|
||||||
|
print(id, "Got task {}".format(self._tasks[id]["dwd_id"]))
|
||||||
|
|
||||||
|
self._do_scrape(self._tasks[id]["dwd_id"])
|
||||||
|
|
||||||
|
|
||||||
|
def _do_scrape(self, dwd_id):
|
||||||
|
last_station = dwd_id
|
||||||
|
|
||||||
|
r = requests.get('https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/recent/tageswerte_KL_' + str(last_station) + '_akt.zip', stream=True)
|
||||||
|
print(dwd_id, "downloaded file")
|
||||||
|
|
||||||
|
r.raise_for_status()
|
||||||
|
|
||||||
print(curr_station_dwd_id)
|
|
||||||
r = requests.get('https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/recent/tageswerte_KL_' + str(curr_station_dwd_id) + '_akt.zip', stream=True)
|
|
||||||
print(r.url)
|
|
||||||
if r.status_code == 200:
|
|
||||||
zip = zipfile.ZipFile(io.BytesIO(r.content))
|
zip = zipfile.ZipFile(io.BytesIO(r.content))
|
||||||
files = zip.namelist()
|
files = zip.namelist()
|
||||||
print(files)
|
|
||||||
files_climate = [f for f in files if f.startswith("produkt_klima_")]
|
files_climate = [f for f in files if f.startswith("produkt_klima_")]
|
||||||
print(files_climate)
|
|
||||||
|
|
||||||
|
with psycopg2.connect(config_db["uri"]) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
|
||||||
buffer = []
|
cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_id = %s;", [last_station])
|
||||||
|
current_station = cur.fetchone()
|
||||||
|
|
||||||
|
curr_station_id = current_station[0]
|
||||||
|
curr_station_dwd_id = current_station[4]
|
||||||
|
|
||||||
is_first_line = True
|
is_first_line = True
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
|
||||||
for line in zip.open(files_climate[0]):
|
for line in zip.open(files_climate[0]):
|
||||||
l = line.decode('utf-8').strip()
|
l = line.decode('utf-8').strip()
|
||||||
if is_first_line:
|
if is_first_line:
|
||||||
is_first_line = False
|
is_first_line = False
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
count += 1
|
||||||
|
|
||||||
b = {}
|
b = {}
|
||||||
|
|
||||||
b["dwd_id"], l = l.strip().split(";", 1)
|
b["dwd_id"], l = l.strip().split(";", 1)
|
||||||
@ -59,52 +95,79 @@ if r.status_code == 200:
|
|||||||
b["date"] = str(b["date"])
|
b["date"] = str(b["date"])
|
||||||
b["date"] = datetime.date(int(b["date"][0:4]), int(b["date"][4:6]), int(b["date"][6:8]))
|
b["date"] = datetime.date(int(b["date"][0:4]), int(b["date"][4:6]), int(b["date"][6:8]))
|
||||||
b["qn_3"], l = l.strip().split(";", 1)
|
b["qn_3"], l = l.strip().split(";", 1)
|
||||||
b["qn_3"] = cleanup_value(int(b["qn_3"]))
|
b["qn_3"] = self.cleanup_value(int(b["qn_3"]))
|
||||||
b["fx"], l = l.strip().split(";", 1)
|
b["fx"], l = l.strip().split(";", 1)
|
||||||
b["fx"] = cleanup_value(float(b["fx"]))
|
b["fx"] = self.cleanup_value(float(b["fx"]))
|
||||||
b["fm"], l = l.strip().split(";", 1)
|
b["fm"], l = l.strip().split(";", 1)
|
||||||
b["fm"] = cleanup_value(float(b["fm"]))
|
b["fm"] = self.cleanup_value(float(b["fm"]))
|
||||||
b["qn_4"], l = l.strip().split(";", 1)
|
b["qn_4"], l = l.strip().split(";", 1)
|
||||||
b["qn_4"] = cleanup_value(int(b["qn_4"]))
|
b["qn_4"] = self.cleanup_value(int(b["qn_4"]))
|
||||||
b["rsk"], l = l.strip().split(";", 1)
|
b["rsk"], l = l.strip().split(";", 1)
|
||||||
b["rsk"] = cleanup_value(float(b["rsk"]))
|
b["rsk"] = self.cleanup_value(float(b["rsk"]))
|
||||||
b["rskf"], l = l.strip().split(";", 1)
|
b["rskf"], l = l.strip().split(";", 1)
|
||||||
b["rskf"] = cleanup_value(int(b["rskf"]))
|
b["rskf"] = self.cleanup_value(int(b["rskf"]))
|
||||||
b["sdk"], l = l.strip().split(";", 1)
|
b["sdk"], l = l.strip().split(";", 1)
|
||||||
b["sdk"] = cleanup_value(float(b["sdk"]))
|
b["sdk"] = self.cleanup_value(float(b["sdk"]))
|
||||||
b["shk_tag"], l = l.strip().split(";", 1)
|
b["shk_tag"], l = l.strip().split(";", 1)
|
||||||
b["shk_tag"] = cleanup_value(float(b["shk_tag"]))
|
b["shk_tag"] = self.cleanup_value(float(b["shk_tag"]))
|
||||||
b["nm"], l = l.strip().split(";", 1)
|
b["nm"], l = l.strip().split(";", 1)
|
||||||
b["nm"] = cleanup_value(float(b["nm"]))
|
b["nm"] = self.cleanup_value(float(b["nm"]))
|
||||||
b["vpm"], l = l.strip().split(";", 1)
|
b["vpm"], l = l.strip().split(";", 1)
|
||||||
b["vpm"] = cleanup_value(float(b["vpm"]))
|
b["vpm"] = self.cleanup_value(float(b["vpm"]))
|
||||||
b["pm"], l = l.strip().split(";", 1)
|
b["pm"], l = l.strip().split(";", 1)
|
||||||
b["pm"] = cleanup_value(float(b["pm"]))
|
b["pm"] = self.cleanup_value(float(b["pm"]))
|
||||||
b["tmk"], l = l.strip().split(";", 1)
|
b["tmk"], l = l.strip().split(";", 1)
|
||||||
b["tmk"] = cleanup_value(float(b["tmk"]))
|
b["tmk"] = self.cleanup_value(float(b["tmk"]))
|
||||||
b["upm"], l = l.strip().split(";", 1)
|
b["upm"], l = l.strip().split(";", 1)
|
||||||
b["upm"] = cleanup_value(float(b["upm"]))
|
b["upm"] = self.cleanup_value(float(b["upm"]))
|
||||||
b["txk"], l = l.strip().split(";", 1)
|
b["txk"], l = l.strip().split(";", 1)
|
||||||
b["txk"] = cleanup_value(float(b["txk"]))
|
b["txk"] = self.cleanup_value(float(b["txk"]))
|
||||||
b["tnk"], l = l.strip().split(";", 1)
|
b["tnk"], l = l.strip().split(";", 1)
|
||||||
b["tnk"] = cleanup_value(float(b["tnk"]))
|
b["tnk"] = self.cleanup_value(float(b["tnk"]))
|
||||||
b["tgk"], l = l.strip().split(";", 1)
|
b["tgk"], l = l.strip().split(";", 1)
|
||||||
b["tgk"] = cleanup_value(float(b["tgk"]))
|
b["tgk"] = self.cleanup_value(float(b["tgk"]))
|
||||||
|
|
||||||
#print(b)
|
|
||||||
print(curr_station_id, b["date"].isoformat())
|
|
||||||
|
|
||||||
cur.execute("SELECT id FROM climate WHERE station = %s AND date = %s; ", [curr_station_id, b["date"].isoformat()])
|
cur.execute("SELECT id FROM climate WHERE station = %s AND date = %s; ", [curr_station_id, b["date"].isoformat()])
|
||||||
if cur.rowcount == 0:
|
if cur.rowcount == 0:
|
||||||
cur.execute("INSERT INTO climate (station, date, qn_3, fx, fm, qn_4, rsk, rskf, sdk, shk_tag, nm, vpm, pm, tmk, upm, txk, tnk, tgk) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);",
|
cur.execute("INSERT INTO climate (station, date, qn_3, fx, fm, qn_4, rsk, rskf, sdk, shk_tag, nm, vpm, pm, tmk, upm, txk, tnk, tgk) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);",
|
||||||
[curr_station_id, b["date"].isoformat(), b["qn_3"], b["fx"], b["fm"], b["qn_4"], b["rsk"], b["rskf"], b["sdk"], b["shk_tag"], b["nm"], b["vpm"], b["pm"], b["tmk"], b["upm"], b["txk"], b["tnk"], b["tgk"]])
|
[curr_station_id, b["date"].isoformat(), b["qn_3"], b["fx"], b["fm"], b["qn_4"], b["rsk"], b["rskf"], b["sdk"], b["shk_tag"], b["nm"], b["vpm"], b["pm"], b["tmk"], b["upm"], b["txk"], b["tnk"], b["tgk"]])
|
||||||
print("imported")
|
|
||||||
else:
|
|
||||||
print("ignored")
|
|
||||||
|
|
||||||
cur.execute("UPDATE stations SET dwd_last_update = %s WHERE id = %s;", [datetime.datetime.today().isoformat(), curr_station_id])
|
if count % 20 == 0:
|
||||||
conn.commit()
|
print(dwd_id, "still updating")
|
||||||
|
|
||||||
|
print(dwd_id, "finished import")
|
||||||
|
|
||||||
cur.close()
|
cur.execute("UPDATE stations SET dwd_last_update = %s WHERE dwd_id = %s;", [datetime.datetime.today().isoformat(), last_station])
|
||||||
conn.close()
|
conn.commit()
|
||||||
|
|
||||||
|
print(dwd_id, "updated last change data")
|
||||||
|
|
||||||
|
def get_station_to_update(self):
|
||||||
|
"""
|
||||||
|
Returns DWD ID of station that should get updated soon.
|
||||||
|
"""
|
||||||
|
|
||||||
|
with psycopg2.connect(config_db["uri"]) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
|
||||||
|
cur.execute("SELECT dwd_id FROM stations ORDER BY dwd_last_update ASC LIMIT %s", [self._worker + 1])
|
||||||
|
|
||||||
|
stations = cur.fetchall()
|
||||||
|
|
||||||
|
#print(stations)
|
||||||
|
|
||||||
|
for station in stations:
|
||||||
|
if station[0] not in [t["dwd_id"] for id, t in self._tasks.items()]:
|
||||||
|
return station[0]
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def cleanup_value(self, v):
|
||||||
|
if int(v) == -999:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return v
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
DwdScraper(worker=8).run()
|
||||||
|
@ -76,10 +76,13 @@ if r.status_code == 200:
|
|||||||
cur = conn.cursor()
|
cur = conn.cursor()
|
||||||
|
|
||||||
for b in buffer:
|
for b in buffer:
|
||||||
|
print("importing", b["station_id"])
|
||||||
cur.execute("SELECT * FROM stations WHERE dwd_id LIKE %s;", [str(b["station_id"])])
|
cur.execute("SELECT * FROM stations WHERE dwd_id LIKE %s;", [str(b["station_id"])])
|
||||||
if cur.rowcount == 0:
|
if cur.rowcount == 0:
|
||||||
|
print("\t", "new station")
|
||||||
cur.execute("INSERT INTO stations (name, lat, lon, dwd_id, state, sea_level) VALUES (%s, %s, %s, %s, %s, %s);", [b["name"], b["lat"], b["lon"], b["station_id"], b["state"], b["sea_level"]])
|
cur.execute("INSERT INTO stations (name, lat, lon, dwd_id, state, sea_level) VALUES (%s, %s, %s, %s, %s, %s);", [b["name"], b["lat"], b["lon"], b["station_id"], b["state"], b["sea_level"]])
|
||||||
else:
|
else:
|
||||||
|
print("\t", "update station")
|
||||||
cur.execute("UPDATE stations SET name = %s, lat = %s, lon = %s, state = %s, sea_level = %s WHERE dwd_id LIKE %s", [b["name"], b["lat"], b["lon"], b["state"], b["sea_level"], b["station_id"]])
|
cur.execute("UPDATE stations SET name = %s, lat = %s, lon = %s, state = %s, sea_level = %s WHERE dwd_id LIKE %s", [b["name"], b["lat"], b["lon"], b["state"], b["sea_level"], b["station_id"]])
|
||||||
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
Loading…
Reference in New Issue
Block a user