Establish sql connection only after data got scraped successfully

This commit is contained in:
clerie 2022-02-28 17:31:10 +01:00
parent fc952670bb
commit 9fb1d81652

View File

@ -13,6 +13,18 @@ class DwdScraper:
last_station = self.get_station_to_update() last_station = self.get_station_to_update()
print("checking station", last_station) print("checking station", last_station)
print(last_station)
r = requests.get('https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/recent/tageswerte_KL_' + str(last_station) + '_akt.zip', stream=True)
print(r.url)
if r.status_code != 200:
return
zip = zipfile.ZipFile(io.BytesIO(r.content))
files = zip.namelist()
print(files)
files_climate = [f for f in files if f.startswith("produkt_klima_")]
print(files_climate)
with psycopg2.connect(config_db["uri"]) as conn: with psycopg2.connect(config_db["uri"]) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
@ -22,77 +34,64 @@ class DwdScraper:
curr_station_id = current_station[0] curr_station_id = current_station[0]
curr_station_dwd_id = current_station[4] curr_station_dwd_id = current_station[4]
print(curr_station_dwd_id) is_first_line = True
r = requests.get('https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/recent/tageswerte_KL_' + str(curr_station_dwd_id) + '_akt.zip', stream=True)
print(r.url)
if r.status_code == 200:
zip = zipfile.ZipFile(io.BytesIO(r.content))
files = zip.namelist()
print(files)
files_climate = [f for f in files if f.startswith("produkt_klima_")]
print(files_climate)
for line in zip.open(files_climate[0]):
l = line.decode('utf-8').strip()
if is_first_line:
is_first_line = False
continue
buffer = [] b = {}
is_first_line = True b["dwd_id"], l = l.strip().split(";", 1)
b["dwd_id"] = str(b["dwd_id"])
b["date"], l = l.strip().split(";", 1)
b["date"] = str(b["date"])
b["date"] = datetime.date(int(b["date"][0:4]), int(b["date"][4:6]), int(b["date"][6:8]))
b["qn_3"], l = l.strip().split(";", 1)
b["qn_3"] = self.cleanup_value(int(b["qn_3"]))
b["fx"], l = l.strip().split(";", 1)
b["fx"] = self.cleanup_value(float(b["fx"]))
b["fm"], l = l.strip().split(";", 1)
b["fm"] = self.cleanup_value(float(b["fm"]))
b["qn_4"], l = l.strip().split(";", 1)
b["qn_4"] = self.cleanup_value(int(b["qn_4"]))
b["rsk"], l = l.strip().split(";", 1)
b["rsk"] = self.cleanup_value(float(b["rsk"]))
b["rskf"], l = l.strip().split(";", 1)
b["rskf"] = self.cleanup_value(int(b["rskf"]))
b["sdk"], l = l.strip().split(";", 1)
b["sdk"] = self.cleanup_value(float(b["sdk"]))
b["shk_tag"], l = l.strip().split(";", 1)
b["shk_tag"] = self.cleanup_value(float(b["shk_tag"]))
b["nm"], l = l.strip().split(";", 1)
b["nm"] = self.cleanup_value(float(b["nm"]))
b["vpm"], l = l.strip().split(";", 1)
b["vpm"] = self.cleanup_value(float(b["vpm"]))
b["pm"], l = l.strip().split(";", 1)
b["pm"] = self.cleanup_value(float(b["pm"]))
b["tmk"], l = l.strip().split(";", 1)
b["tmk"] = self.cleanup_value(float(b["tmk"]))
b["upm"], l = l.strip().split(";", 1)
b["upm"] = self.cleanup_value(float(b["upm"]))
b["txk"], l = l.strip().split(";", 1)
b["txk"] = self.cleanup_value(float(b["txk"]))
b["tnk"], l = l.strip().split(";", 1)
b["tnk"] = self.cleanup_value(float(b["tnk"]))
b["tgk"], l = l.strip().split(";", 1)
b["tgk"] = self.cleanup_value(float(b["tgk"]))
for line in zip.open(files_climate[0]): #print(b)
l = line.decode('utf-8').strip() print(curr_station_id, b["date"].isoformat())
if is_first_line:
is_first_line = False
continue
b = {} cur.execute("SELECT id FROM climate WHERE station = %s AND date = %s; ", [curr_station_id, b["date"].isoformat()])
if cur.rowcount == 0:
b["dwd_id"], l = l.strip().split(";", 1) cur.execute("INSERT INTO climate (station, date, qn_3, fx, fm, qn_4, rsk, rskf, sdk, shk_tag, nm, vpm, pm, tmk, upm, txk, tnk, tgk) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);",
b["dwd_id"] = str(b["dwd_id"]) [curr_station_id, b["date"].isoformat(), b["qn_3"], b["fx"], b["fm"], b["qn_4"], b["rsk"], b["rskf"], b["sdk"], b["shk_tag"], b["nm"], b["vpm"], b["pm"], b["tmk"], b["upm"], b["txk"], b["tnk"], b["tgk"]])
b["date"], l = l.strip().split(";", 1) print("imported")
b["date"] = str(b["date"]) else:
b["date"] = datetime.date(int(b["date"][0:4]), int(b["date"][4:6]), int(b["date"][6:8])) print("ignored")
b["qn_3"], l = l.strip().split(";", 1)
b["qn_3"] = self.cleanup_value(int(b["qn_3"]))
b["fx"], l = l.strip().split(";", 1)
b["fx"] = self.cleanup_value(float(b["fx"]))
b["fm"], l = l.strip().split(";", 1)
b["fm"] = self.cleanup_value(float(b["fm"]))
b["qn_4"], l = l.strip().split(";", 1)
b["qn_4"] = self.cleanup_value(int(b["qn_4"]))
b["rsk"], l = l.strip().split(";", 1)
b["rsk"] = self.cleanup_value(float(b["rsk"]))
b["rskf"], l = l.strip().split(";", 1)
b["rskf"] = self.cleanup_value(int(b["rskf"]))
b["sdk"], l = l.strip().split(";", 1)
b["sdk"] = self.cleanup_value(float(b["sdk"]))
b["shk_tag"], l = l.strip().split(";", 1)
b["shk_tag"] = self.cleanup_value(float(b["shk_tag"]))
b["nm"], l = l.strip().split(";", 1)
b["nm"] = self.cleanup_value(float(b["nm"]))
b["vpm"], l = l.strip().split(";", 1)
b["vpm"] = self.cleanup_value(float(b["vpm"]))
b["pm"], l = l.strip().split(";", 1)
b["pm"] = self.cleanup_value(float(b["pm"]))
b["tmk"], l = l.strip().split(";", 1)
b["tmk"] = self.cleanup_value(float(b["tmk"]))
b["upm"], l = l.strip().split(";", 1)
b["upm"] = self.cleanup_value(float(b["upm"]))
b["txk"], l = l.strip().split(";", 1)
b["txk"] = self.cleanup_value(float(b["txk"]))
b["tnk"], l = l.strip().split(";", 1)
b["tnk"] = self.cleanup_value(float(b["tnk"]))
b["tgk"], l = l.strip().split(";", 1)
b["tgk"] = self.cleanup_value(float(b["tgk"]))
#print(b)
print(curr_station_id, b["date"].isoformat())
cur.execute("SELECT id FROM climate WHERE station = %s AND date = %s; ", [curr_station_id, b["date"].isoformat()])
if cur.rowcount == 0:
cur.execute("INSERT INTO climate (station, date, qn_3, fx, fm, qn_4, rsk, rskf, sdk, shk_tag, nm, vpm, pm, tmk, upm, txk, tnk, tgk) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);",
[curr_station_id, b["date"].isoformat(), b["qn_3"], b["fx"], b["fm"], b["qn_4"], b["rsk"], b["rskf"], b["sdk"], b["shk_tag"], b["nm"], b["vpm"], b["pm"], b["tmk"], b["upm"], b["txk"], b["tnk"], b["tgk"]])
print("imported")
else:
print("ignored")
cur.execute("UPDATE stations SET dwd_last_update = %s WHERE dwd_id = %s;", [datetime.datetime.today().isoformat(), last_station]) cur.execute("UPDATE stations SET dwd_last_update = %s WHERE dwd_id = %s;", [datetime.datetime.today().isoformat(), last_station])
conn.commit() conn.commit()