commit 68f52727adfa8c36bc0e7a459224366f0b7faa78 Author: clerie Date: Wed Apr 8 11:14:45 2020 +0200 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0db3bed --- /dev/null +++ b/.gitignore @@ -0,0 +1,142 @@ +# dwd-scraper custom +scraper/config/db.py + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..6708866 --- /dev/null +++ b/README.md @@ -0,0 +1,18 @@ +# DWD Scraper +Importiert Messdaten aus dem OpenData Programm des DWD in eine Datenbank. + +## Datenbank +Als Datenbank wird PostgreSQL verwendet und das Datenbankschema liegt unter `sql/wetter.sql`. + +Zugangsdaten für die Datenbank müssen unter `scraper/config/db.py` abgelegt werden, das entsprechende Format ist unter `scraper/config/db.py.example` hinterlegt. + +## Scraper +Als erstes müssen die Stationen importiert werden. +``` +./scraper/daily_import_stations.py +``` + +Anschließend können darauf die Messergebnisse für die importierten Stationen eingelesen werden. +``` +./scraper/daily_import_climate.py +``` diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..37ec460 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +psycopg2-binary diff --git a/scraper/config/__init__.py b/scraper/config/__init__.py new file mode 100644 index 0000000..e5a0d9b --- /dev/null +++ b/scraper/config/__init__.py @@ -0,0 +1 @@ +#!/usr/bin/env python3 diff --git a/scraper/config/db.py.example b/scraper/config/db.py.example new file mode 100644 index 0000000..ee98e5b --- /dev/null +++ b/scraper/config/db.py.example @@ -0,0 +1,4 @@ +#!/usr/bin/env python3 + +db = {} +db['uri'] = "postgresql://user:password@host:5432/database" diff --git a/scraper/daily_import_climate.py b/scraper/daily_import_climate.py new file mode 100755 index 0000000..17e516d --- /dev/null +++ b/scraper/daily_import_climate.py @@ -0,0 +1,110 @@ +#! /usr/bin/env python3 + +import psycopg2 +import requests +import io +import zipfile +import datetime +from config.db import db as config_db + +def cleanup_value(v): + if int(v) == -999: + return None + + return v + +conn = psycopg2.connect(config_db["uri"]) +cur = conn.cursor() + +cur.execute("SELECT min(dwd_last_update) FROM stations;") +last_date = cur.fetchall()[0][0] +print(last_date) + +if not last_date is None: + cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_last_update = %s LIMIT 1;", [last_date]) +else: + cur.execute("SELECT id, name, lat, lon, dwd_id, dwd_last_update FROM stations WHERE dwd_last_update IS NULL LIMIT 1;") +last_station = cur.fetchone() +print(last_station) + +curr_station_id = last_station[0] +curr_station_dwd_id = last_station[4] + +print(curr_station_dwd_id) +r = requests.get('https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/recent/tageswerte_KL_' + str(curr_station_dwd_id) + '_akt.zip', stream=True) +print(r.url) +if r.status_code == 200: + zip = zipfile.ZipFile(io.BytesIO(r.content)) + files = zip.namelist() + print(files) + files_climate = [f for f in files if f.startswith("produkt_klima_")] + print(files_climate) + + + buffer = [] + + is_first_line = True + + for line in zip.open(files_climate[0]): + l = line.decode('utf-8').strip() + if is_first_line: + is_first_line = False + continue + + b = {} + + b["dwd_id"], l = l.strip().split(";", 1) + b["dwd_id"] = str(b["dwd_id"]) + b["date"], l = l.strip().split(";", 1) + b["date"] = str(b["date"]) + b["date"] = datetime.date(int(b["date"][0:4]), int(b["date"][4:6]), int(b["date"][6:8])) + b["qn_3"], l = l.strip().split(";", 1) + b["qn_3"] = cleanup_value(int(b["qn_3"])) + b["fx"], l = l.strip().split(";", 1) + b["fx"] = cleanup_value(float(b["fx"])) + b["fm"], l = l.strip().split(";", 1) + b["fm"] = cleanup_value(float(b["fm"])) + b["qn_4"], l = l.strip().split(";", 1) + b["qn_4"] = cleanup_value(int(b["qn_4"])) + b["rsk"], l = l.strip().split(";", 1) + b["rsk"] = cleanup_value(float(b["rsk"])) + b["rskf"], l = l.strip().split(";", 1) + b["rskf"] = cleanup_value(int(b["rskf"])) + b["sdk"], l = l.strip().split(";", 1) + b["sdk"] = cleanup_value(float(b["sdk"])) + b["shk_tag"], l = l.strip().split(";", 1) + b["shk_tag"] = cleanup_value(float(b["shk_tag"])) + b["nm"], l = l.strip().split(";", 1) + b["nm"] = cleanup_value(float(b["nm"])) + b["vpm"], l = l.strip().split(";", 1) + b["vpm"] = cleanup_value(float(b["vpm"])) + b["pm"], l = l.strip().split(";", 1) + b["pm"] = cleanup_value(float(b["pm"])) + b["tmk"], l = l.strip().split(";", 1) + b["tmk"] = cleanup_value(float(b["tmk"])) + b["upm"], l = l.strip().split(";", 1) + b["upm"] = cleanup_value(float(b["upm"])) + b["txk"], l = l.strip().split(";", 1) + b["txk"] = cleanup_value(float(b["txk"])) + b["tnk"], l = l.strip().split(";", 1) + b["tnk"] = cleanup_value(float(b["tnk"])) + b["tgk"], l = l.strip().split(";", 1) + b["tgk"] = cleanup_value(float(b["tgk"])) + + #print(b) + print(curr_station_id, b["date"].isoformat()) + + cur.execute("SELECT id FROM climate WHERE station = %s AND date = %s; ", [curr_station_id, b["date"].isoformat()]) + if cur.rowcount == 0: + cur.execute("INSERT INTO climate (station, date, qn_3, fx, fm, qn_4, rsk, rskf, sdk, shk_tag, nm, vpm, pm, tmk, upm, txk, tnk, tgk) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);", + [curr_station_id, b["date"].isoformat(), b["qn_3"], b["fx"], b["fm"], b["qn_4"], b["rsk"], b["rskf"], b["sdk"], b["shk_tag"], b["nm"], b["vpm"], b["pm"], b["tmk"], b["upm"], b["txk"], b["tnk"], b["tgk"]]) + print("imported") + else: + print("ignored") + +cur.execute("UPDATE stations SET dwd_last_update = %s WHERE id = %s;", [datetime.datetime.today().isoformat(), curr_station_id]) +conn.commit() + + +cur.close() +conn.close() diff --git a/scraper/daily_import_stations.py b/scraper/daily_import_stations.py new file mode 100755 index 0000000..4131f91 --- /dev/null +++ b/scraper/daily_import_stations.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 + + +import requests +import io +import psycopg2 +from config.db import db as config_db + +r = requests.get('https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/daily/kl/recent/KL_Tageswerte_Beschreibung_Stationen.txt', stream=True) + +print(r) + +if r.status_code == 200: + + f = r.iter_lines(decode_unicode=True) + + buffer = [] + + is_first_line = True + is_second_line = True + + for l in f: + if l: + if is_first_line: + is_first_line = False + continue + if is_second_line: + is_second_line = False + continue + + l = str(l) + + b = {} + + print(l) + + b["station_id"], l = l.strip().split(" ", 1) + b["station_id"] = str(b["station_id"]) + b["date_from"], l = l.strip().split(" ", 1) + b["date_from"] = int(b["date_from"]) + b["date_to"], l = l.strip().split(" ", 1) + b["date_to"] = int(b["date_to"]) + b["sea_level"], l = l.strip().split(" ", 1) + b["sea_level"] = int(b["sea_level"]) + b["lat"], l = l.strip().split(" ", 1) + b["lat"] = float(b["lat"]) + b["lon"], l = l.strip().split(" ", 1) + b["lon"] = float(b["lon"]) + b["name"], l = l.strip().split(" ", 1) + b["name"] = str(b["name"]) + b["state"] = l.strip() + b["state"] = str(b["state"]) + + if b["date_to"] >= 20200000: + """ + Im OpenData Programm sind auch Stationen enthalten, die nicht mehr betrieben werden. Wir importieren nur Stationen, die in diesem Jahr schon Werte geliefert haben. + """ + print(b["station_id"]) + print(b["date_from"]) + print(b["date_to"]) + print(b["sea_level"]) + print(b["lat"]) + print(b["lon"]) + print(b["name"]) + print(b["state"]) + print() + + buffer.append(b) + else: + print("ignore station " + str(b["station_id"])) + print() + + print(buffer) + + conn = psycopg2.connect(config_db['uri']) + cur = conn.cursor() + + for b in buffer: + cur.execute("SELECT * FROM stations WHERE dwd_id LIKE %s;", [str(b["station_id"])]) + if cur.rowcount == 0: + cur.execute("INSERT INTO stations (name, lat, lon, dwd_id, state, sea_level) VALUES (%s, %s, %s, %s, %s, %s);", [b["name"], b["lat"], b["lon"], b["station_id"], b["state"], b["sea_level"]]) + else: + cur.execute("UPDATE stations SET name = %s, lat = %s, lon = %s, state = %s, sea_level = %s WHERE dwd_id LIKE %s", [b["name"], b["lat"], b["lon"], b["state"], b["sea_level"], b["station_id"]]) + + conn.commit() + + cur.close() + conn.close() diff --git a/sql/wetter.sql b/sql/wetter.sql new file mode 100644 index 0000000..cd32788 --- /dev/null +++ b/sql/wetter.sql @@ -0,0 +1,163 @@ +-- [161/267] +-- PostgreSQL database dump +-- + +-- Dumped from database version 11.7 (Debian 11.7-0+deb10u1) +-- Dumped by pg_dump version 11.7 (Debian 11.7-0+deb10u1) + +SET statement_timeout = 0; +SET lock_timeout = 0; +SET idle_in_transaction_session_timeout = 0; +SET client_encoding = 'UTF8'; +SET standard_conforming_strings = on; +SELECT pg_catalog.set_config('search_path', '', false); +SET check_function_bodies = false; +SET xmloption = content; +SET client_min_messages = warning; +SET row_security = off; + +SET default_tablespace = ''; + +SET default_with_oids = false; + +-- +-- Name: climate; Type: TABLE; Schema: public; Owner: wetter +-- + +CREATE TABLE public.climate ( + id bigint NOT NULL, + station bigint NOT NULL, + date date NOT NULL, + qn_3 integer, + fx real, + fm real, + qn_4 integer, + rsk real, + rskf integer, + sdk real, + shk_tag real, + nm real, + vpm real, + pm real, + tmk real, + upm real, + txk real, + tnk real, + tgk real, + dwd_last_update timestamp without time zone +); + + +ALTER TABLE public.climate OWNER TO wetter; + +-- +-- Name: climate_id_seq; Type: SEQUENCE; Schema: public; Owner: wetter +-- + +CREATE SEQUENCE public.climate_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +ALTER TABLE public.climate_id_seq OWNER TO wetter; + +-- +-- Name: climate_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: wetter +-- + +ALTER SEQUENCE public.climate_id_seq OWNED BY public.climate.id; + + +-- +-- Name: stations; Type: TABLE; Schema: public; Owner: wetter +-- + +CREATE TABLE public.stations ( + id bigint NOT NULL, + name character varying(200) NOT NULL, + lat numeric(10,8) NOT NULL, + lon numeric(11,8) NOT NULL, + dwd_id character varying(5), + dwd_last_update timestamp without time zone, + state character varying(200), + sea_level integer +); + + +ALTER TABLE public.stations OWNER TO wetter; + +-- +-- Name: stations_id_seq; Type: SEQUENCE; Schema: public; Owner: wetter +-- + +CREATE SEQUENCE public.stations_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +ALTER TABLE public.stations_id_seq OWNER TO wetter; + +-- +-- Name: stations_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: wetter +-- + +ALTER SEQUENCE public.stations_id_seq OWNED BY public.stations.id; + + +-- +-- Name: climate id; Type: DEFAULT; Schema: public; Owner: wetter +-- + +ALTER TABLE ONLY public.climate ALTER COLUMN id SET DEFAULT nextval('public.climate_id_seq'::regclass +); + + +-- +-- Name: climate station; Type: DEFAULT; Schema: public; Owner: wetter +-- + +ALTER TABLE ONLY public.climate ALTER COLUMN station SET DEFAULT nextval('public.climate_station_seq' +::regclass); + + +-- +-- Name: stations id; Type: DEFAULT; Schema: public; Owner: wetter +-- + +ALTER TABLE ONLY public.stations ALTER COLUMN id SET DEFAULT nextval('public.stations_id_seq'::regcla +ss); + + +-- +-- Name: climate climate_pkey; Type: CONSTRAINT; Schema: public; Owner: wetter +-- + +ALTER TABLE ONLY public.climate + ADD CONSTRAINT climate_pkey PRIMARY KEY (id); + + +-- +-- Name: stations stations_pkey; Type: CONSTRAINT; Schema: public; Owner: wetter +-- + +ALTER TABLE ONLY public.stations + ADD CONSTRAINT stations_pkey PRIMARY KEY (id); + + +-- +-- Name: climate climate_station_fkey; Type: FK CONSTRAINT; Schema: public; Owner: wetter +-- + +ALTER TABLE ONLY public.climate + ADD CONSTRAINT climate_station_fkey FOREIGN KEY (station) REFERENCES public.stations(id); + + +-- +-- PostgreSQL database dump complete +--