Initial commit

This commit is contained in:
2020-04-08 11:14:45 +02:00
commit 68f52727ad
8 changed files with 527 additions and 0 deletions

View File

@@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@@ -0,0 +1,4 @@
#!/usr/bin/env python3
db = {}
db['uri'] = "postgresql://user:password@host:5432/database"

110
scraper/daily_import_climate.py Executable file
View File

@@ -0,0 +1,110 @@
#! /usr/bin/env python3
import psycopg2
import requests
import io
import zipfile
import datetime
from config.db import db as config_db
def cleanup_value(v):
if int(v) == -999:
return None
return v
conn = psycopg2.connect(config_db["uri"])
cur = conn.cursor()
cur.execute("SELECT min(dwd_last_update) FROM stations;")
last_date = cur.fetchall()[0][0]
print(last_date)
if not last_date is None:
cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_last_update = %s LIMIT 1;", [last_date])
else:
cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_last_update IS NULL LIMIT 1;")
last_station = cur.fetchone()
print(last_station)
curr_station_id = last_station[0]
curr_station_dwd_id = last_station[4]
print(curr_station_dwd_id)
r = requests.get('https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/recent/tageswerte_KL_' + str(curr_station_dwd_id) + '_akt.zip', stream=True)
print(r.url)
if r.status_code == 200:
zip = zipfile.ZipFile(io.BytesIO(r.content))
files = zip.namelist()
print(files)
files_climate = [f for f in files if f.startswith("produkt_klima_")]
print(files_climate)
buffer = []
is_first_line = True
for line in zip.open(files_climate[0]):
l = line.decode('utf-8').strip()
if is_first_line:
is_first_line = False
continue
b = {}
b["dwd_id"], l = l.strip().split(";", 1)
b["dwd_id"] = str(b["dwd_id"])
b["date"], l = l.strip().split(";", 1)
b["date"] = str(b["date"])
b["date"] = datetime.date(int(b["date"][0:4]), int(b["date"][4:6]), int(b["date"][6:8]))
b["qn_3"], l = l.strip().split(";", 1)
b["qn_3"] = cleanup_value(int(b["qn_3"]))
b["fx"], l = l.strip().split(";", 1)
b["fx"] = cleanup_value(float(b["fx"]))
b["fm"], l = l.strip().split(";", 1)
b["fm"] = cleanup_value(float(b["fm"]))
b["qn_4"], l = l.strip().split(";", 1)
b["qn_4"] = cleanup_value(int(b["qn_4"]))
b["rsk"], l = l.strip().split(";", 1)
b["rsk"] = cleanup_value(float(b["rsk"]))
b["rskf"], l = l.strip().split(";", 1)
b["rskf"] = cleanup_value(int(b["rskf"]))
b["sdk"], l = l.strip().split(";", 1)
b["sdk"] = cleanup_value(float(b["sdk"]))
b["shk_tag"], l = l.strip().split(";", 1)
b["shk_tag"] = cleanup_value(float(b["shk_tag"]))
b["nm"], l = l.strip().split(";", 1)
b["nm"] = cleanup_value(float(b["nm"]))
b["vpm"], l = l.strip().split(";", 1)
b["vpm"] = cleanup_value(float(b["vpm"]))
b["pm"], l = l.strip().split(";", 1)
b["pm"] = cleanup_value(float(b["pm"]))
b["tmk"], l = l.strip().split(";", 1)
b["tmk"] = cleanup_value(float(b["tmk"]))
b["upm"], l = l.strip().split(";", 1)
b["upm"] = cleanup_value(float(b["upm"]))
b["txk"], l = l.strip().split(";", 1)
b["txk"] = cleanup_value(float(b["txk"]))
b["tnk"], l = l.strip().split(";", 1)
b["tnk"] = cleanup_value(float(b["tnk"]))
b["tgk"], l = l.strip().split(";", 1)
b["tgk"] = cleanup_value(float(b["tgk"]))
#print(b)
print(curr_station_id, b["date"].isoformat())
cur.execute("SELECT id FROM climate WHERE station = %s AND date = %s; ", [curr_station_id, b["date"].isoformat()])
if cur.rowcount == 0:
cur.execute("INSERT INTO climate (station, date, qn_3, fx, fm, qn_4, rsk, rskf, sdk, shk_tag, nm, vpm, pm, tmk, upm, txk, tnk, tgk) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);",
[curr_station_id, b["date"].isoformat(), b["qn_3"], b["fx"], b["fm"], b["qn_4"], b["rsk"], b["rskf"], b["sdk"], b["shk_tag"], b["nm"], b["vpm"], b["pm"], b["tmk"], b["upm"], b["txk"], b["tnk"], b["tgk"]])
print("imported")
else:
print("ignored")
cur.execute("UPDATE stations SET dwd_last_update = %s WHERE id = %s;", [datetime.datetime.today().isoformat(), curr_station_id])
conn.commit()
cur.close()
conn.close()

View File

@@ -0,0 +1,88 @@
#!/usr/bin/env python3
import requests
import io
import psycopg2
from config.db import db as config_db
r = requests.get('https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/recent/KL_Tageswerte_Beschreibung_Stationen.txt', stream=True)
print(r)
if r.status_code == 200:
f = r.iter_lines(decode_unicode=True)
buffer = []
is_first_line = True
is_second_line = True
for l in f:
if l:
if is_first_line:
is_first_line = False
continue
if is_second_line:
is_second_line = False
continue
l = str(l)
b = {}
print(l)
b["station_id"], l = l.strip().split(" ", 1)
b["station_id"] = str(b["station_id"])
b["date_from"], l = l.strip().split(" ", 1)
b["date_from"] = int(b["date_from"])
b["date_to"], l = l.strip().split(" ", 1)
b["date_to"] = int(b["date_to"])
b["sea_level"], l = l.strip().split(" ", 1)
b["sea_level"] = int(b["sea_level"])
b["lat"], l = l.strip().split(" ", 1)
b["lat"] = float(b["lat"])
b["lon"], l = l.strip().split(" ", 1)
b["lon"] = float(b["lon"])
b["name"], l = l.strip().split(" ", 1)
b["name"] = str(b["name"])
b["state"] = l.strip()
b["state"] = str(b["state"])
if b["date_to"] >= 20200000:
"""
Im OpenData Programm sind auch Stationen enthalten, die nicht mehr betrieben werden. Wir importieren nur Stationen, die in diesem Jahr schon Werte geliefert haben.
"""
print(b["station_id"])
print(b["date_from"])
print(b["date_to"])
print(b["sea_level"])
print(b["lat"])
print(b["lon"])
print(b["name"])
print(b["state"])
print()
buffer.append(b)
else:
print("ignore station " + str(b["station_id"]))
print()
print(buffer)
conn = psycopg2.connect(config_db['uri'])
cur = conn.cursor()
for b in buffer:
cur.execute("SELECT * FROM stations WHERE dwd_id LIKE %s;", [str(b["station_id"])])
if cur.rowcount == 0:
cur.execute("INSERT INTO stations (name, lat, lon, dwd_id, state, sea_level) VALUES (%s, %s, %s, %s, %s, %s);", [b["name"], b["lat"], b["lon"], b["station_id"], b["state"], b["sea_level"]])
else:
cur.execute("UPDATE stations SET name = %s, lat = %s, lon = %s, state = %s, sea_level = %s WHERE dwd_id LIKE %s", [b["name"], b["lat"], b["lon"], b["state"], b["sea_level"], b["station_id"]])
conn.commit()
cur.close()
conn.close()