1
0
Fork 0

feat(sipgate-balance-exporter): add script

This commit is contained in:
Jannik 2022-07-28 14:25:00 +02:00
parent 3966204fce
commit b2676162e7
2 changed files with 94 additions and 0 deletions

View File

@ -56,6 +56,18 @@ with lib;
script = "${pkgs.python3}/bin/python ${./waldbrandgefahrenstufen-exporter.py}";
};
systemd.services.sipgate-balance-exporter = {
description = "Sipgate Balance Exporter";
wantedBy = [ "multi-user.target" ];
serviceConfig = {
DynamicUser = "yes";
Environment = "SIPGATE_TOKEN_PATH=/var/src/secrets/sipgate-balance/apitoken";
};
script = "${pkgs.python3}/bin/python ${./sipgate-balance-exporter.py}";
};
services.prometheus.alertmanager = {
enable = true;
listenAddress = "[::1]";
@ -271,6 +283,18 @@ with lib;
}
];
}
{
job_name = "sipgate-balance";
scrape_interval = "120s";
scrape_timeout = "20s";
static_configs = [
{
targets = [
"[::1]:9243"
];
}
];
}
{
job_name = "snmp";
scrape_interval = "120s";

View File

@ -0,0 +1,70 @@
#!/usr/bin/env python3
from http.server import HTTPServer, BaseHTTPRequestHandler, HTTPStatus
import os
import socket
import json
from urllib.request import Request, urlopen
import xml.etree.ElementTree as ET
from pathlib import Path
class HTTPServerV6(HTTPServer):
address_family = socket.AF_INET6
TOKEN = Path(os.environ.get("SIPGATE_TOKEN_PATH", "apitoken")).read_text().strip()
class ExporterRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/":
self.make_response("sipgate Guthaben für den Trunk")
elif self.path == "/metrics":
self.export()
else:
self.send_error(HTTPStatus.NOT_FOUND, "File not found")
return
def do_HEAD(self):
if self.path == "/":
self.make_response("sipgate Guthaben für den Trunk", head_only=True)
elif self.path == "/metrics":
self.export(head_only=True)
else:
self.send_error(HTTPStatus.NOT_FOUND, "File not found")
return
def export(self, head_only=False):
req = Request("https://api.sipgate.com/v2/balance")
req.add_header("accept", "application/json")
req.add_header("authorization", "Basic " + TOKEN)
webURL = urlopen(req)
data = webURL.read()
encoding = webURL.info().get_content_charset('utf-8')
d = json.loads(data.decode(encoding))
print(d)
balance = d["amount"] / 10000
currency = d["currency"]
res = "sipgate_balance{{currency=\"{}\"}} {}".format(currency, balance)
self.make_response(res, head_only=head_only)
def make_response(self, content, head_only=False):
encoded = content.encode("utf-8")
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Conten-Length", str(len(encoded)))
self.end_headers()
if not head_only:
self.wfile.write(encoded)
def run():
with HTTPServerV6(("::1", 9243), ExporterRequestHandler) as httpd:
print("Starting sipgate Balance Exporter on http://[{}]:{}".format(*httpd.socket.getsockname()[:2]))
httpd.serve_forever()
if __name__ == "__main__":
run()