From 0b30cb2bc7332b2379f7b9b8a0f242d94176aaef Mon Sep 17 00:00:00 2001 From: clerie Date: Sat, 17 Jun 2023 20:18:54 +0200 Subject: [PATCH] Retry after failing DECT tasks --- fieldpoc/dect.py | 243 ++++++++++++++++++++++++++++++----------------- flake.lock | 8 +- 2 files changed, 160 insertions(+), 91 deletions(-) diff --git a/fieldpoc/dect.py b/fieldpoc/dect.py index 0fd995c..fa63c63 100644 --- a/fieldpoc/dect.py +++ b/fieldpoc/dect.py @@ -10,6 +10,7 @@ logger = logging.getLogger("fieldpoc.dect") class Dect: def __init__(self, fp): self.fp = fp + self.c = None # OOMClient2 not initialized at the beginning def _init_client(self): self.c = mitel_ommclient2.OMMClient2( @@ -35,7 +36,6 @@ class Dect: "dialout_allowed": False, } - def get_temp_number(self): current_temp_extension = 0 used_temp_extensions = [num[len(self.temp_num_prefix):] for num, ext in self.fp.temp_extensions.items()] @@ -55,102 +55,171 @@ class Dect: self.c.set_user_sipauth(u.uid, num, self.get_sip_password_for_number(num)) return u + def sync(self): + logger.info("syncing") + + # load DECT extensions from configuration + extensions = self.fp.extensions.extensions_by_type("dect") + # accessible by extension number + extensions_by_num = {e.num: e for e in extensions} + # accessible by device IPEI, for devices with static IPEI in configuration + extensions_by_ipei = {e._c['dect_ipei']: e for _, e in extensions_by_num.items() if e._c.get('dect_ipei')} + + # signal if new extensions got created + created_tmp_ext = False + + # collect users + users_by_ext = {} + users_by_uid = {} + + # check all users on OMM + for user in self.c.get_users(): + # check user on OMM against fieldpoc configuration + e = extensions_by_num.get(user.num) + + # user in OMM, but no DECT extension in configuration + if not e: + # the user might have a temporary extension assigned, collect it + if user.num.startswith(next(self.fp.extensions.extensions_by_type("temp")).num): + users_by_ext[user.num] = user + users_by_uid[user.uid] = user + + # we have no configuration for the user + else: + # TODO: delete in omm + pass + + continue + + # update user in OMM if name of extension changed + elif e._c['name'] != user.name: + self.c.set_user_name(user.uid, e._c['name']) + + + if e._c.get('dect_ipei') and user.relType != mitel_ommclient2.types.PPRelTypeType("Unbound"): + d = self.c.get_device(user.ppn) + if d.ipei != e._c['dect_ipei']: + logger.debug(f"Detaching {user} {d}") + self.c.detach_user_device(user.uid, user.ppn) + + self.c.set_user_sipauth(user.uid, e.num, self.get_sip_password_for_number(e.num)) + users_by_ext[user.num] = user + users_by_uid[user.uid] = user + + # check all devices on OMM + for d in self.c.get_devices(): + + # find extension in fieldpoc configuration with static IPEI of the device on OMM + e = extensions_by_ipei.get(d.ipei) + + # in case the IPEI is statically assigned to an extension + if e: + # find user on OMM configured for the selectied extension + u = users_by_ext.get(e.num) + + # bind the user to the device if user exist and device is not bound to any user + if u and d.relType == mitel_ommclient2.types.PPRelTypeType("Unbound"): + logger.debug(f'Binding user for {d}') + self.c.attach_user_device(u.uid, d.ppn) + self.c.set_user_relation_fixed(u.uid) + + # fix user settings on OMM if device on OMM is already connected to a user + elif d.relType != mitel_ommclient2.types.PPRelTypeType("Unbound"): + ui = users_by_uid.get(d.uid) + if ui.num != e.num: + logger.debug(f'User for {d} has wrong number') + if self.fp.temp_extensions.get(ui.num): + self.fp.temp_extensions.pop(ui.num) + self.c.set_user_num(d.uid, e.num) + self.c.set_user_sipauth(d.uid, e.num, self.get_sip_password_for_number(e.num)) + self.c.set_user_name(user.uid, e._c['name']) + + # create a new user on OMM if none exist and bind it to not bound device on OMM + else: + logger.debug(f'Creating and binding user for {d}') + user = self.create_and_bind_user(d, e.num) + self.c.set_user_name(user.uid, e._c['name']) + + # assign any unbound device without static assigned IPEI a temporary extension + elif d.relType == mitel_ommclient2.types.PPRelTypeType("Unbound"): + temp_num = self.get_temp_number() + logger.debug(f'Creating and binding tmp-user for {d}: {temp_num}') + user = self.create_and_bind_user(d, temp_num) + self.c.set_user_name(user.uid, f"Temp {temp_num[4:]}") + self.fp.temp_extensions[temp_num] = { + "name": f"Temp {temp_num[4:]}", + "type": "dect", + "trunk": False, + "dialout_allowed": False, + } + created_tmp_ext = True + + # update rounting when new extensions got created + if created_tmp_ext: + self.fp.queues['routing'].put({"type": "sync"}) + + def claim_extension(self, current_extension, token): + new_extension = None + + # find an extension that can be claimed with the provided token + for ext in self.fp.extensions.extensions_by_type("dect"): + if ext._c.get('dect_claim_token') == token: + new_extension = ext + break + + if e: + # find user on OMM related to the current extension + user = next(self.c.find_users(lambda u: u.num == current_extension)) + + # if the current extension is a temporary extension, free it up again + if self.fp.temp_extensions.get(user.num): + self.fp.temp_extensions.pop(user.num) + + # assign new extension to the user on OMM + self.c.set_user_num(user.uid, new_extension.num) + self.c.set_user_sipauth(user.uid, e.num, self.get_sip_password_for_number(new_extension.num)) + self.c.set_user_name(user.uid, new_extension._c['name']) + def run(self): - logger.info("initialising connection to OMM") - self._init_client() - self.load_temp_extensions() + logger.info("starting dect thread") while True: msg = self.fp.queues["dect"].get() self.fp.queues["dect"].task_done() if msg.get("type") == "stop": + self.restart_loop = False + logger.info("stopped") break - elif msg.get("type") == "sync": - logger.info("syncing") - extensions = self.fp.extensions.extensions_by_type("dect") - extensions_by_num = {e.num: e for e in extensions} - extensions_by_ipei = {e._c['dect_ipei']: e for _, e in extensions_by_num.items() if e._c.get('dect_ipei')} - created_tmp_ext = False + else: + try: + # setup connection to OMM + if self.c is None: + logger.info("initialising connection to OMM") + self._init_client() + self.load_temp_extensions() - users_by_ext = {} - users_by_uid = {} - for user in self.c.get_users(): - e = extensions_by_num.get(user.num) - if not e: - # user in omm, but not as dect in nerd - if user.num.startswith(next(self.fp.extensions.extensions_by_type("temp")).num): - users_by_ext[user.num] = user - users_by_uid[user.uid] = user - else: + # handle tasks + if msg.get("type") == "sync": + self.sync() + + elif msg.get("type") == "claim": + self.claim_extenstion(msg.get("extension"), msg.get("token")[4:]) + + except: + logger.exception("task failed") + + # reset connection to OMM + if self.c is not None: + try: + self.c.connection.close() + except: pass - # TODO: delete in omm - continue - elif e._c['name'] != user.name: - self.c.set_user_name(user.uid, e._c['name']) - if e._c.get('dect_ipei') and user.relType != mitel_ommclient2.types.PPRelTypeType("Unbound"): - d = self.c.get_device(user.ppn) - if d.ipei != e._c['dect_ipei']: - logger.debug(f"Detaching {user} {d}") - self.c.detach_user_device(user.uid, user.ppn) + self.c = None - self.c.set_user_sipauth(user.uid, e.num, self.get_sip_password_for_number(e.num)) - users_by_ext[user.num] = user - users_by_uid[user.uid] = user + logger.info("requeue failed task") + self.fp.queues["dect"].put(msg) - for d in self.c.get_devices(): - e = extensions_by_ipei.get(d.ipei) - if e: - # device is in nerd - u = users_by_ext.get(e.num) - if u and d.relType == mitel_ommclient2.types.PPRelTypeType("Unbound"): - logger.debug(f'Binding user for {d}') - self.c.attach_user_device(u.uid, d.ppn) - self.c.set_user_relation_fixed(u.uid) - elif d.relType != mitel_ommclient2.types.PPRelTypeType("Unbound"): - ui = users_by_uid.get(d.uid) - if ui.num != e.num: - logger.debug(f'User for {d} has wrong number') - if self.fp.temp_extensions.get(ui.num): - self.fp.temp_extensions.pop(ui.num) - self.c.set_user_num(d.uid, e.num) - self.c.set_user_sipauth(d.uid, e.num, self.get_sip_password_for_number(e.num)) - self.c.set_user_name(user.uid, e._c['name']) - else: - logger.debug(f'Creating and binding user for {d}') - user = self.create_and_bind_user(d, e.num) - self.c.set_user_name(user.uid, e._c['name']) - - elif d.relType == mitel_ommclient2.types.PPRelTypeType("Unbound"): - temp_num = self.get_temp_number() - logger.debug(f'Creating and binding tmp-user for {d}: {temp_num}') - user = self.create_and_bind_user(d, temp_num) - self.c.set_user_name(user.uid, f"Temp {temp_num[4:]}") - self.fp.temp_extensions[temp_num] = { - "name": f"Temp {temp_num[4:]}", - "type": "dect", - "trunk": False, - "dialout_allowed": False, - } - created_tmp_ext = True - - if created_tmp_ext: - self.fp.queues['routing'].put({"type": "sync"}) - - elif msg.get("type") == "claim": - e = None - for ext in self.fp.extensions.extensions_by_type("dect"): - if ext._c.get('dect_claim_token') and ext._c['dect_claim_token'] == msg.get("token")[4:]: - e = ext - break - - if e: - user = next(self.c.find_users(lambda u: u.num == msg.get("extension"))) - if self.fp.temp_extensions.get(user.num): - self.fp.temp_extensions.pop(user.num) - self.c.set_user_num(user.uid, e.num) - self.c.set_user_sipauth(user.uid, e.num, self.get_sip_password_for_number(e.num)) - self.c.set_user_name(user.uid, e._c['name']) - - self.c.connection.close() + if self.c is not None: + self.c.connection.close() diff --git a/flake.lock b/flake.lock index 9b18302..b3c5418 100644 --- a/flake.lock +++ b/flake.lock @@ -7,11 +7,11 @@ ] }, "locked": { - "lastModified": 1665864178, - "narHash": "sha256-dZ4YVEn4E29ubYqDSuNfVIuOnHEZ4sZGUuODdQsvtaU=", + "lastModified": 1687019250, + "narHash": "sha256-cN9ZuQ/1irnoYg013v1ZDn15MHcFXhxILGhRNDGd794=", "ref": "refs/heads/main", - "rev": "aef2761f9fba079f33f41337b443350cb314eeef", - "revCount": 67, + "rev": "a11629f543a8b43451cecc46600a78cbb6af015a", + "revCount": 70, "type": "git", "url": "https://git.clerie.de/clerie/mitel_ommclient2.git" },