Compare commits

...

8 Commits

3 changed files with 303 additions and 13 deletions

View File

@@ -1,35 +1,73 @@
from . import Mu5001Tool from . import Mu5001Tool
from .prometheus_exporter import prometheus_exporter
import argparse import argparse
from pathlib import Path
from pprint import pprint from pprint import pprint
parser = argparse.ArgumentParser(prog="mu5001tool") parser = argparse.ArgumentParser(prog="mu5001tool")
parser.add_argument("--stok", dest="stok", help="Initial session token to use for commands") parser.add_argument("--stok", dest="stok", help="Initial session token to use for commands")
parser.add_argument("--password", dest="password", help="Password for authentication against the device") parser.add_argument("--password", dest="password", help="Password for authentication against the device")
parser.add_argument("--password-file", dest="password_file", type=Path, help="Password for authentication against the device, passed as path to file")
subparsers = parser.add_subparsers()
def run_status(m):
if m.is_logged_in():
print("Is logged in")
else:
print("Is not logged in")
pprint(m.status())
pprint(m.network_information())
pprint(m.apn_info())
pprint(m.state_information())
sp_status = subparsers.add_parser("status", help="General modem status information")
sp_status.set_defaults(func=run_status)
def run_prometheus_exporter(m, listen_port):
prometheus_exporter(m, listen_port)
sp_prometheus_exporter = subparsers.add_parser("prometheus-exporter", help="Serve metrics as prometheus exporter")
sp_prometheus_exporter.set_defaults(func=run_prometheus_exporter)
sp_prometheus_exporter.add_argument("--listen-port", dest="listen_port", type=int, default=9242, help="Port for service webserver")
def main(): def main():
args = parser.parse_args() args = parser.parse_args()
h = Mu5001Tool() if "func" not in args:
parser.print_help()
exit()
function_arguments = dict(vars(args))
function_arguments.pop("func")
function_arguments.pop("stok")
function_arguments.pop("password")
function_arguments.pop("password_file")
m = Mu5001Tool()
if args.stok is not None: if args.stok is not None:
h.set_stok(args.stok) m.set_stok(args.stok)
if args.password is not None: if args.password is not None:
print(h.login(args.password)) m.set_password(args.password)
if h.is_logged_in(): if args.password_file is not None:
print("Is logged in") password = args.password_file.read_text().strip()
else: m.set_password(password)
print("Is not logged in")
pprint(h.status()) if args.password is not None or args.password_file is not None:
m.login()
pprint(h.network_information()) args.func(m=m, **function_arguments)
pprint(h.apn_info())
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -3,8 +3,23 @@ import hashlib
import requests import requests
import urllib import urllib
class Mu5001ToolException(Exception):
pass
class FailedToLogin(Mu5001ToolException):
pass
class InvalidPassword(FailedToLogin):
pass
class AccountLocked(FailedToLogin):
pass
class Mu5001Tool: class Mu5001Tool:
def __init__(self, host="http://192.168.0.1"): def __init__(self, host="http://192.168.0.1"):
self._password = None
self._password_invalid = False
self.host = host self.host = host
self.session = requests.Session() self.session = requests.Session()
@@ -36,6 +51,9 @@ class Mu5001Tool:
"cmd": "LD", "cmd": "LD",
}).get("LD") }).get("LD")
def set_password(self, password):
self._password = password
def hashed_login_password(self, password): def hashed_login_password(self, password):
ld = self.get_ld() ld = self.get_ld()
@@ -44,12 +62,77 @@ class Mu5001Tool:
return login_sha256(login_sha256(password) + ld) return login_sha256(login_sha256(password) + ld)
def login(self, password): def is_login_possible(self):
return self.set_cmd_process({ data = self.get_cmd_process({
"multi_data": "1",
"cmd": "login_lock_time,psw_fail_num_str,loginfo",
})
if data.get("loginfo") == "ok":
# Already logged in
return False
# How many password attempts left before locking
psw_fail_num_str = data.get("psw_fail_num_str")
if psw_fail_num_str == "":
psw_fail_num_str = None
else:
psw_fail_num_str = int(psw_fail_num_str)
if psw_fail_num_str > 0:
return True
# How long login is blocked
login_lock_time = data.get("login_lock_time")
if login_lock_time == "":
login_lock_time = None
else:
login_lock_time = int(login_lock_time)
# Login is blocked
return False
def login_if_possible(self):
if self._password is None:
return
if self._password_invalid:
return
if not self.is_login_possible():
return
try:
self.login()
except InvalidPassword as e:
self._password_invalid = True
def login(self, password=None):
if password is None:
password = self._password
if password is None:
raise ValueError("No password provided")
data = self.set_cmd_process({
"goformId": "LOGIN", "goformId": "LOGIN",
"password": self.hashed_login_password(password), "password": self.hashed_login_password(password),
}) })
result = data.get("result")
if result == "3":
raise InvalidPassword("Invalid password")
if result == "1":
raise AccountLocked("Account currently locked")
if result != "0":
raise FailedToLogin("Failed to log in")
def logoff(self): def logoff(self):
return self.set_cmd_process({"goformId": "LOGOFF"}) return self.set_cmd_process({"goformId": "LOGOFF"})
@@ -167,3 +250,9 @@ class Mu5001Tool:
"multi_data": 1, "multi_data": 1,
"cmd": "network_type,rssi,lte_rssi,rscp,lte_rsrp,Z5g_snr,Z5g_rsrp,ZCELLINFO_band,Z5g_dlEarfcn,lte_ca_pcell_arfcn,lte_ca_pcell_band,lte_ca_scell_band,lte_ca_pcell_bandwidth,lte_ca_scell_info,lte_ca_scell_bandwidth,wan_lte_ca,lte_pci,Z5g_CELL_ID,Z5g_SINR,cell_id,wan_lte_ca,lte_ca_pcell_band,lte_ca_pcell_bandwidth,lte_ca_scell_band,lte_ca_scell_bandwidth,lte_ca_pcell_arfcn,lte_ca_scell_arfcn,lte_multi_ca_scell_info,wan_active_band,nr5g_pci,nr5g_action_band,nr5g_cell_id,lte_snr,ecio,wan_active_channel,nr5g_action_channel", "cmd": "network_type,rssi,lte_rssi,rscp,lte_rsrp,Z5g_snr,Z5g_rsrp,ZCELLINFO_band,Z5g_dlEarfcn,lte_ca_pcell_arfcn,lte_ca_pcell_band,lte_ca_scell_band,lte_ca_pcell_bandwidth,lte_ca_scell_info,lte_ca_scell_bandwidth,wan_lte_ca,lte_pci,Z5g_CELL_ID,Z5g_SINR,cell_id,wan_lte_ca,lte_ca_pcell_band,lte_ca_pcell_bandwidth,lte_ca_scell_band,lte_ca_scell_bandwidth,lte_ca_pcell_arfcn,lte_ca_scell_arfcn,lte_multi_ca_scell_info,wan_active_band,nr5g_pci,nr5g_action_band,nr5g_cell_id,lte_snr,ecio,wan_active_channel,nr5g_action_channel",
}) })
def state_information(self):
return self.get_cmd_process({
"multi_data": 1,
"cmd": "modem_main_state,pin_status,opms_wan_mode,opms_wan_auto_mode,loginfo,new_version_state,current_upgrade_state,is_mandatory,wifi_dfs_status,battery_value,ppp_dial_conn_fail_counter,dhcp_wan_status,wifi_chip1_ssid1_auth_mode,wifi_chip2_ssid1_auth_mode,signalbar,network_type,network_provider,ppp_status,simcard_roam,spn_name_data,spn_b1_flag,spn_b2_flag,wifi_onoff_state,wifi_chip1_ssid1_ssid,wifi_chip2_ssid1_ssid,wan_lte_ca,monthly_rx_bytes,monthly_tx_bytes,pppoe_status,dhcp_wan_status,static_wan_status,rmcc,rmnc,mdm_mcc,battery_charg_type,external_charging_flag,mode_main_state,battery_temp,EX_SSID1,sta_ip_status,EX_wifi_profile,m_ssid_enable,RadioOff,wifi_chip1_ssid1_access_sta_num,wifi_chip2_ssid1_access_sta_num,lan_ipaddr,station_mac,wifi_access_sta_num,battery_charging,battery_vol_percent,battery_pers,realtime_tx_bytes,realtime_rx_bytes,realtime_time,realtime_tx_thrpt,realtime_rx_thrpt,monthly_time,date_month,data_volume_limit_switch,data_volume_limit_size,data_volume_alert_percent,data_volume_limit_unit,roam_setting_option,upg_roam_switch,ssid,wifi_enable,wifi_5g_enable,check_web_conflict,dial_mode,ppp_dial_conn_fail_counter,privacy_read_flag,is_night_mode,vpn_conn_status,wan_connect_status,sms_received_flag,sts_received_flag,sms_unread_num,wifi_chip1_ssid2_access_sta_num,wifi_chip2_ssid2_access_sta_num",
})

View File

@@ -0,0 +1,163 @@
from . import Mu5001Tool
from http.server import BaseHTTPRequestHandler, HTTPServer, HTTPStatus
from pprint import pprint
import socket
import time
import traceback
class HTTPServerV6(HTTPServer):
address_family = socket.AF_INET6
def make_prometheus_exporter_request_handler(m):
class PrometheusExporterRequestHandler(BaseHTTPRequestHandler):
def do_GET(self, head_only=False):
if self.path == "/":
self.make_response(
"Prometheus Exporter for MU5001",
head_only=head_only
)
elif self.path == "/metrics":
try:
self.make_response(
self.export(),
head_only=head_only
)
except Exception as e:
traceback.print_exc()
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "Failed to fetch metrics")
else:
self.send_error(HTTPStatus.NOT_FOUND, "File not found")
def do_HEAD(self):
self.do_GET(head_only=True)
def make_response(self, content, head_only=False):
encoded = content.encode("utf-8")
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Conten-Length", str(len(encoded)))
self.end_headers()
if not head_only:
self.wfile.write(encoded)
def export(self):
cmds_as_metric_value = [
"battery_charging",
"battery_pers",
"battery_temp",
"battery_value",
"battery_vol_percent",
"data_volume_alert_percent",
"data_volume_limit_size",
"data_volume_limit_switch",
"dhcp_wan_status",
"lte_ca_pcell_band",
"lte_ca_pcell_bandwidth",
"lte_ca_scell_band",
"lte_ca_scell_bandwidth",
"lte_rsrp",
"lte_rssi",
"lte_snr",
"mdm_mcc",
"monthly_rx_bytes",
"monthly_time",
"monthly_tx_bytes",
"nr5g_action_channel",
"pin_status",
"ppp_dial_conn_fail_counter",
"realtime_rx_bytes",
"realtime_rx_thrpt",
"realtime_time",
"realtime_tx_bytes",
"realtime_tx_thrpt",
"rmcc",
"rmnc",
"signalbar",
"sms_unread_num",
"wan_active_channel",
"wifi_5g_enable",
"wifi_access_sta_num",
"wifi_chip1_ssid1_access_sta_num",
"wifi_chip1_ssid2_access_sta_num",
"wifi_chip2_ssid1_access_sta_num",
"wifi_chip2_ssid2_access_sta_num",
"wifi_onoff_state",
]
cmds_as_metric_label = [
"battery_charg_type",
"data_volume_limit_unit",
"dial_mode",
"loginfo",
"mode_main_state",
"modem_main_state",
"network_provider",
"network_type",
"opms_wan_auto_mode",
"opms_wan_mode",
"ppp_status",
"roam_setting_option",
"simcard_roam",
"spn_name_data",
"wan_connect_status",
"wan_lte_ca",
"wifi_chip1_ssid1_auth_mode",
"wifi_chip1_ssid1_ssid",
"wifi_chip2_ssid1_auth_mode",
"wifi_chip2_ssid1_ssid",
"cell_id",
"lte_pci",
"nr5g_action_band",
"nr5g_pci",
"wan_active_band",
]
data = m.get_cmd_process({
"multi_data": 1,
"cmd": ",".join(cmds_as_metric_value + cmds_as_metric_label),
})
out = []
if data.get("loginfo") == "ok":
out.append("mu5001tool_logged_in 1")
else:
out.append("mu5001tool_logged_in 0")
m.login_if_possible()
for cmd in cmds_as_metric_value:
d = data.get(cmd)
if d is None or d == "":
continue
try:
v = int(d)
except ValueError as e:
try:
v = float(d)
except ValueError as e:
raise ValueError(f"cmd: {cmd}: {e}")
out.append(f"mu5001_{cmd} {v}")
for cmd in cmds_as_metric_label:
d = data.get(cmd)
if d is None or d == "":
continue
out.append(f"mu5001_{cmd}{{{cmd}=\"{d}\"}} 1")
out.sort()
return "\n".join(out)
return PrometheusExporterRequestHandler
def prometheus_exporter(m, listen_port):
with HTTPServerV6(("::1", listen_port), make_prometheus_exporter_request_handler(m)) as httpd:
print("Starting prometheus exporter on http://[{}]:{}".format(*httpd.socket.getsockname()[:2]))
httpd.serve_forever()