Retry after failing DECT tasks

This commit is contained in:
clerie 2023-06-17 20:18:54 +02:00
parent ff480cb8b5
commit 0b30cb2bc7
2 changed files with 160 additions and 91 deletions

View File

@ -10,6 +10,7 @@ logger = logging.getLogger("fieldpoc.dect")
class Dect: class Dect:
def __init__(self, fp): def __init__(self, fp):
self.fp = fp self.fp = fp
self.c = None # OOMClient2 not initialized at the beginning
def _init_client(self): def _init_client(self):
self.c = mitel_ommclient2.OMMClient2( self.c = mitel_ommclient2.OMMClient2(
@ -35,7 +36,6 @@ class Dect:
"dialout_allowed": False, "dialout_allowed": False,
} }
def get_temp_number(self): def get_temp_number(self):
current_temp_extension = 0 current_temp_extension = 0
used_temp_extensions = [num[len(self.temp_num_prefix):] for num, ext in self.fp.temp_extensions.items()] used_temp_extensions = [num[len(self.temp_num_prefix):] for num, ext in self.fp.temp_extensions.items()]
@ -55,102 +55,171 @@ class Dect:
self.c.set_user_sipauth(u.uid, num, self.get_sip_password_for_number(num)) self.c.set_user_sipauth(u.uid, num, self.get_sip_password_for_number(num))
return u return u
def sync(self):
logger.info("syncing")
# load DECT extensions from configuration
extensions = self.fp.extensions.extensions_by_type("dect")
# accessible by extension number
extensions_by_num = {e.num: e for e in extensions}
# accessible by device IPEI, for devices with static IPEI in configuration
extensions_by_ipei = {e._c['dect_ipei']: e for _, e in extensions_by_num.items() if e._c.get('dect_ipei')}
# signal if new extensions got created
created_tmp_ext = False
# collect users
users_by_ext = {}
users_by_uid = {}
# check all users on OMM
for user in self.c.get_users():
# check user on OMM against fieldpoc configuration
e = extensions_by_num.get(user.num)
# user in OMM, but no DECT extension in configuration
if not e:
# the user might have a temporary extension assigned, collect it
if user.num.startswith(next(self.fp.extensions.extensions_by_type("temp")).num):
users_by_ext[user.num] = user
users_by_uid[user.uid] = user
# we have no configuration for the user
else:
# TODO: delete in omm
pass
continue
# update user in OMM if name of extension changed
elif e._c['name'] != user.name:
self.c.set_user_name(user.uid, e._c['name'])
if e._c.get('dect_ipei') and user.relType != mitel_ommclient2.types.PPRelTypeType("Unbound"):
d = self.c.get_device(user.ppn)
if d.ipei != e._c['dect_ipei']:
logger.debug(f"Detaching {user} {d}")
self.c.detach_user_device(user.uid, user.ppn)
self.c.set_user_sipauth(user.uid, e.num, self.get_sip_password_for_number(e.num))
users_by_ext[user.num] = user
users_by_uid[user.uid] = user
# check all devices on OMM
for d in self.c.get_devices():
# find extension in fieldpoc configuration with static IPEI of the device on OMM
e = extensions_by_ipei.get(d.ipei)
# in case the IPEI is statically assigned to an extension
if e:
# find user on OMM configured for the selectied extension
u = users_by_ext.get(e.num)
# bind the user to the device if user exist and device is not bound to any user
if u and d.relType == mitel_ommclient2.types.PPRelTypeType("Unbound"):
logger.debug(f'Binding user for {d}')
self.c.attach_user_device(u.uid, d.ppn)
self.c.set_user_relation_fixed(u.uid)
# fix user settings on OMM if device on OMM is already connected to a user
elif d.relType != mitel_ommclient2.types.PPRelTypeType("Unbound"):
ui = users_by_uid.get(d.uid)
if ui.num != e.num:
logger.debug(f'User for {d} has wrong number')
if self.fp.temp_extensions.get(ui.num):
self.fp.temp_extensions.pop(ui.num)
self.c.set_user_num(d.uid, e.num)
self.c.set_user_sipauth(d.uid, e.num, self.get_sip_password_for_number(e.num))
self.c.set_user_name(user.uid, e._c['name'])
# create a new user on OMM if none exist and bind it to not bound device on OMM
else:
logger.debug(f'Creating and binding user for {d}')
user = self.create_and_bind_user(d, e.num)
self.c.set_user_name(user.uid, e._c['name'])
# assign any unbound device without static assigned IPEI a temporary extension
elif d.relType == mitel_ommclient2.types.PPRelTypeType("Unbound"):
temp_num = self.get_temp_number()
logger.debug(f'Creating and binding tmp-user for {d}: {temp_num}')
user = self.create_and_bind_user(d, temp_num)
self.c.set_user_name(user.uid, f"Temp {temp_num[4:]}")
self.fp.temp_extensions[temp_num] = {
"name": f"Temp {temp_num[4:]}",
"type": "dect",
"trunk": False,
"dialout_allowed": False,
}
created_tmp_ext = True
# update rounting when new extensions got created
if created_tmp_ext:
self.fp.queues['routing'].put({"type": "sync"})
def claim_extension(self, current_extension, token):
new_extension = None
# find an extension that can be claimed with the provided token
for ext in self.fp.extensions.extensions_by_type("dect"):
if ext._c.get('dect_claim_token') == token:
new_extension = ext
break
if e:
# find user on OMM related to the current extension
user = next(self.c.find_users(lambda u: u.num == current_extension))
# if the current extension is a temporary extension, free it up again
if self.fp.temp_extensions.get(user.num):
self.fp.temp_extensions.pop(user.num)
# assign new extension to the user on OMM
self.c.set_user_num(user.uid, new_extension.num)
self.c.set_user_sipauth(user.uid, e.num, self.get_sip_password_for_number(new_extension.num))
self.c.set_user_name(user.uid, new_extension._c['name'])
def run(self): def run(self):
logger.info("initialising connection to OMM") logger.info("starting dect thread")
self._init_client()
self.load_temp_extensions()
while True: while True:
msg = self.fp.queues["dect"].get() msg = self.fp.queues["dect"].get()
self.fp.queues["dect"].task_done() self.fp.queues["dect"].task_done()
if msg.get("type") == "stop": if msg.get("type") == "stop":
self.restart_loop = False
logger.info("stopped")
break break
elif msg.get("type") == "sync":
logger.info("syncing")
extensions = self.fp.extensions.extensions_by_type("dect") else:
extensions_by_num = {e.num: e for e in extensions} try:
extensions_by_ipei = {e._c['dect_ipei']: e for _, e in extensions_by_num.items() if e._c.get('dect_ipei')} # setup connection to OMM
created_tmp_ext = False if self.c is None:
logger.info("initialising connection to OMM")
self._init_client()
self.load_temp_extensions()
users_by_ext = {} # handle tasks
users_by_uid = {} if msg.get("type") == "sync":
for user in self.c.get_users(): self.sync()
e = extensions_by_num.get(user.num)
if not e: elif msg.get("type") == "claim":
# user in omm, but not as dect in nerd self.claim_extenstion(msg.get("extension"), msg.get("token")[4:])
if user.num.startswith(next(self.fp.extensions.extensions_by_type("temp")).num):
users_by_ext[user.num] = user except:
users_by_uid[user.uid] = user logger.exception("task failed")
else:
# reset connection to OMM
if self.c is not None:
try:
self.c.connection.close()
except:
pass pass
# TODO: delete in omm self.c = None
continue
elif e._c['name'] != user.name:
self.c.set_user_name(user.uid, e._c['name'])
if e._c.get('dect_ipei') and user.relType != mitel_ommclient2.types.PPRelTypeType("Unbound"):
d = self.c.get_device(user.ppn)
if d.ipei != e._c['dect_ipei']:
logger.debug(f"Detaching {user} {d}")
self.c.detach_user_device(user.uid, user.ppn)
self.c.set_user_sipauth(user.uid, e.num, self.get_sip_password_for_number(e.num)) logger.info("requeue failed task")
users_by_ext[user.num] = user self.fp.queues["dect"].put(msg)
users_by_uid[user.uid] = user
for d in self.c.get_devices(): if self.c is not None:
e = extensions_by_ipei.get(d.ipei) self.c.connection.close()
if e:
# device is in nerd
u = users_by_ext.get(e.num)
if u and d.relType == mitel_ommclient2.types.PPRelTypeType("Unbound"):
logger.debug(f'Binding user for {d}')
self.c.attach_user_device(u.uid, d.ppn)
self.c.set_user_relation_fixed(u.uid)
elif d.relType != mitel_ommclient2.types.PPRelTypeType("Unbound"):
ui = users_by_uid.get(d.uid)
if ui.num != e.num:
logger.debug(f'User for {d} has wrong number')
if self.fp.temp_extensions.get(ui.num):
self.fp.temp_extensions.pop(ui.num)
self.c.set_user_num(d.uid, e.num)
self.c.set_user_sipauth(d.uid, e.num, self.get_sip_password_for_number(e.num))
self.c.set_user_name(user.uid, e._c['name'])
else:
logger.debug(f'Creating and binding user for {d}')
user = self.create_and_bind_user(d, e.num)
self.c.set_user_name(user.uid, e._c['name'])
elif d.relType == mitel_ommclient2.types.PPRelTypeType("Unbound"):
temp_num = self.get_temp_number()
logger.debug(f'Creating and binding tmp-user for {d}: {temp_num}')
user = self.create_and_bind_user(d, temp_num)
self.c.set_user_name(user.uid, f"Temp {temp_num[4:]}")
self.fp.temp_extensions[temp_num] = {
"name": f"Temp {temp_num[4:]}",
"type": "dect",
"trunk": False,
"dialout_allowed": False,
}
created_tmp_ext = True
if created_tmp_ext:
self.fp.queues['routing'].put({"type": "sync"})
elif msg.get("type") == "claim":
e = None
for ext in self.fp.extensions.extensions_by_type("dect"):
if ext._c.get('dect_claim_token') and ext._c['dect_claim_token'] == msg.get("token")[4:]:
e = ext
break
if e:
user = next(self.c.find_users(lambda u: u.num == msg.get("extension")))
if self.fp.temp_extensions.get(user.num):
self.fp.temp_extensions.pop(user.num)
self.c.set_user_num(user.uid, e.num)
self.c.set_user_sipauth(user.uid, e.num, self.get_sip_password_for_number(e.num))
self.c.set_user_name(user.uid, e._c['name'])
self.c.connection.close()

View File

@ -7,11 +7,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1665864178, "lastModified": 1687019250,
"narHash": "sha256-dZ4YVEn4E29ubYqDSuNfVIuOnHEZ4sZGUuODdQsvtaU=", "narHash": "sha256-cN9ZuQ/1irnoYg013v1ZDn15MHcFXhxILGhRNDGd794=",
"ref": "refs/heads/main", "ref": "refs/heads/main",
"rev": "aef2761f9fba079f33f41337b443350cb314eeef", "rev": "a11629f543a8b43451cecc46600a78cbb6af015a",
"revCount": 67, "revCount": 70,
"type": "git", "type": "git",
"url": "https://git.clerie.de/clerie/mitel_ommclient2.git" "url": "https://git.clerie.de/clerie/mitel_ommclient2.git"
}, },