Compare commits

...

11 Commits

Author SHA1 Message Date
clerie 82378e0bd7 Add user device realation changes 2022-06-17 00:08:40 +02:00
clerie e85fa8ff29 Display type on cli 2022-06-16 23:45:01 +02:00
clerie 650dd842ce Use function docstring in cli usage 2022-06-16 23:39:56 +02:00
clerie 1d60043f37 Add method for detaching device and user 2022-06-16 23:13:54 +02:00
clerie 8849b8488c Introduce ability to attach users to devices 2022-06-16 23:01:04 +02:00
clerie ac9ae74a0b Make the exception show you all unknown keys at once
In case a response type contains an unknown key, there are probably some 
more.
2022-06-16 23:00:22 +02:00
clerie 501be198c4 Allow ommsync mode to be used in cli 2022-06-16 22:40:01 +02:00
clerie 4796d1587b Improve exception message 2022-06-16 22:32:01 +02:00
clerie deba76228b Add CreatePPUser message 2022-06-16 21:29:21 +02:00
clerie 428c4b8661 Add an interactive cli 2022-06-16 19:39:36 +02:00
clerie 93e0676147 Add infrastructure for dealing with encryption keys 2022-06-16 14:56:39 +02:00
8 changed files with 450 additions and 4 deletions

View File

@ -48,10 +48,106 @@ class OMMClient2:
m = messages.Open()
m.username = self._username
m.password = self._password
m.UserDeviceSyncClient = self._ommsync
if self._ommsync:
m.UserDeviceSyncClient = "true"
r = self.connection.request(m)
r.raise_on_error()
def attach_user_device(self, uid, ppn):
"""
Attach user to device
:param uid: User id
:param ppn: Device id
Requires ommsync=True
"""
t_u = types.PPUserType()
t_u.uid = uid
t_u.ppn = ppn
t_u.relType = types.PPRelTypeType("Dynamic")
t_d = types.PPDevType()
t_d.ppn = ppn
t_d.uid = uid
t_d.relType = types.PPRelTypeType("Dynamic")
m = messages.SetPP()
m.childs.user = [t_u]
m.childs.pp = [t_d]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0], r.childs.pp[0]
def create_user(self, num):
"""
Create PP user
:param num: User number
"""
t = types.PPUserType()
t.num = num
m = messages.CreatePPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]
def detach_user_device(self, uid, ppn):
"""
Detach user from device
:param uid: User id
:param ppn: Device id
Requires ommsync=True
"""
t_u = types.PPUserType()
t_u.uid = uid
t_u.ppn = 0
t_u.relType = types.PPRelTypeType("Unbound")
t_d = types.PPDevType()
t_d.ppn = ppn
t_d.uid = 0
t_d.relType = types.PPRelTypeType("Unbound")
m = messages.SetPP()
m.childs.user = [t_u]
m.childs.pp = [t_d]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0], r.childs.pp[0]
def detach_user_device_by_user(self, uid):
"""
Detach user from device
This just requires the user id
:param uid: User id
Requires ommsync=True
"""
u = self.get_user(uid)
return self.detach_user_device(uid, u.ppn)
def detach_user_device_by_device(self, ppn):
"""
Detach user from device
This just requires the device id
:param ppn: Device id
Requires ommsync=True
"""
d = self.get_device(ppn)
return self.detach_user_device(d.uid, ppn)
def get_account(self, id):
"""
Get account
@ -105,6 +201,15 @@ class OMMClient2:
# Determine next possible ppn
next_ppn = int(pp.ppn) + 1
def get_publickey(self):
"""
Get public key for encrypted values
"""
m = messages.GetPublicKey()
r = self.connection.request(m)
r.raise_on_error()
return int(r.modulus, 16), int(r.exponent, 16)
def get_user(self, uid):
"""
Get PP user
@ -185,7 +290,50 @@ class OMMClient2:
m = messages.SetPPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]
def set_user_relation_dynamic(self, uid):
"""
Set PP user to PP device relation to dynamic type
:param uid: User id
"""
m = messages.SetPPUserDevRelation()
m.uid = uid
m.relType = types.PPRelTypeType("Dynamic")
r = self.connection.request(m)
r.raise_on_error()
def set_user_relation_fixed(self, uid):
"""
Set PP user to PP device relation to fixed type
:param uid: User id
"""
m = messages.SetPPUserDevRelation()
m.uid = uid
m.relType = types.PPRelTypeType("Fixed")
r = self.connection.request(m)
r.raise_on_error()
def set_user_sipauth(self, uid, sipAuthId, sipPw):
"""
Set PP user sip credentials
:param uid: User id
:param sipAuthId: SIP user name
:param sipPw: Encrypted sip password
"""
t = types.PPUserType()
t.uid = uid
t.sipAuthId = sipAuthId
t.sipPw = sipPw
m = messages.SetPPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None

View File

@ -134,12 +134,16 @@ def response_type(c):
RESPONSE_TYPES[c.__name__] = c
return c
from .createppuser import CreatePPUser, CreatePPUserResp
from .getaccount import GetAccount, GetAccountResp
from .getppdev import GetPPDev, GetPPDevResp
from .getppuser import GetPPUser, GetPPUserResp
from .getpublickey import GetPublicKey, GetPublicKeyResp
from .open import Open, OpenResp
from .ping import Ping, PingResp
from .setpp import SetPP, SetPPResp
from .setppuser import SetPPUser, SetPPUserResp
from .setppuserdevrelation import SetPPUserDevRelation, SetPPUserDevRelationResp
def construct(request):
"""

View File

@ -0,0 +1,18 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPUserType
@request_type
class CreatePPUser(Request):
CHILDS = {
"user": PPUserType,
}
@response_type
class CreatePPUserResp(Response):
CHILDS = {
"user": PPUserType,
}

View File

@ -0,0 +1,16 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
@request_type
class GetPublicKey(Request):
pass
@response_type
class GetPublicKeyResp(Response):
FIELDS = {
"modulus": str,
"exponent": str,
}

View File

@ -0,0 +1,20 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPDevType, PPUserType
@request_type
class SetPP(Request):
CHILDS = {
"pp": PPDevType,
"user": PPUserType,
}
@response_type
class SetPPResp(Response):
CHILDS = {
"pp": PPDevType,
"user": PPUserType,
}

View File

@ -0,0 +1,20 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPRelTypeType
@request_type
class SetPPUserDevRelation(Request):
FIELDS = {
"uid": int,
"relType": PPRelTypeType,
}
@response_type
class SetPPUserDevRelationResp(Response):
FIELDS = {
"uid": int,
"relType": PPRelTypeType,
}

View File

@ -39,12 +39,16 @@ class ChildType:
return "{}({})".format(self.__class__.__name__, repr(self._attrs))
def cast_dict_to_childtype(t, d):
errors = {} # collect unknown keys
for k, v in d.items():
if k in t.FIELDS.keys():
if t.FIELDS[k] is not None and type(v) != t.FIELDS[k]:
d[k] = t.FIELDS[k](v)
else:
raise KeyError()
errors[k] = v
if errors != {}:
raise KeyError("The following keys are unknown for '{}': {}".format(t.__name__, errors))
return t(d)
@ -92,7 +96,11 @@ class MonitoringStateType(EnumType):
class PPRelTypeType(EnumType):
VALUES = None
VALUES = [
"Fixed",
"Dynamic",
"Unbound",
]
class AccountType(ChildType):
@ -130,6 +138,18 @@ class PPDevType(ChildType):
"ppDefaultProfileLoaded": bool,
"subscribeToPARIOnly": bool,
# undocumented
"ommId": str,
"ommIdAck": str,
"timeStampAdmin": int,
"timeStampRelation": int,
"timeStampRoaming": int,
"timeStampSubscription": int,
"autoCreate": bool,
"roaming": None, # value: 'RoamingComplete'
"modicType": str, # value: '01'
"locationData": str, # value: '000001000000'
"dectIeFixedId": str,
"subscriptionId": str,
"ppnSec": int,
}
@ -209,4 +229,7 @@ class PPUserType(ChildType):
"keyLockEnable": None,
"keyLockPin": None,
"keyLockTime": None,
"ppnOld": int,
"timeStampAdmin": int,
"timeStampRelation": int,
}

197
ommcli Executable file
View File

@ -0,0 +1,197 @@
#!/usr/bin/env python3
from mitel_ommclient2 import OMMClient2
from mitel_ommclient2.exceptions import ENoEnt
from mitel_ommclient2.messages import GetAccount, Ping
import time
import argparse
import base64
import getpass
import traceback
try:
# This is is only dependency not from the modules inlcuded in python by default, so we make it optional
import rsa
except ImportError:
rsa = None
# exit handling with argparse is a bit broken even with exit_on_error=False, so we hack this
def error_instead_exit(self, message):
raise argparse.ArgumentError(None, message)
argparse.ArgumentParser.error = error_instead_exit
def format_child_type(t):
return " {}\n{}".format(t.__class__.__name__, "\n".join(["{:<30} {}".format(key, value) for key, value in t._attrs.items()]))
def format_list(v):
return "\n\n\n\n".join(format_child_type(d) for d in v)
return fl
if __name__ == "__main__":
connect_parser = argparse.ArgumentParser(prog='ommclient2')
connect_parser.add_argument("-n", dest="hostname", default="127.0.0.1")
connect_parser.add_argument("-u", dest="username", default="omm")
connect_parser.add_argument("-p", dest="password")
connect_parser.add_argument("--ommsync", dest="ommsync", action='store_true', help="Log in with ommsync mode")
connect_parser.add_argument("subcommand", nargs="*")
args = connect_parser.parse_args()
hostname = args.hostname
username = args.username
password = args.password
ommsync = args.ommsync
subcommand = args.subcommand
if not password:
password = getpass.getpass(prompt="OMM password for {}@{}:".format(username, hostname))
c = OMMClient2(hostname, username, password, ommsync=ommsync)
def encrypt(secret):
if rsa is None:
raise Exception("rsa module is required for excryption")
publickey = c.get_publickey()
pubkey = rsa.PublicKey(*publickey)
byte_secret = secret.encode('utf8')
byte_encrypt = rsa.encrypt(byte_secret, pubkey)
encrypt = base64.b64encode(byte_encrypt).decode("utf8")
return encrypt
parser = argparse.ArgumentParser(prog="ommclient2", add_help=False, exit_on_error=False)
subparsers = parser.add_subparsers()
def add_parser(command_name, func, format=None, args={}):
subp = subparsers.add_parser(command_name, help=func.__doc__.strip().split("\n")[0], description=func.__doc__)
if format is not None:
subp.set_defaults(func=func, format=format)
else:
subp.set_defaults(func=func)
for a, t in args.items():
subp.add_argument(a, type=t)
return subp
parser_get_account = subparsers.add_parser("encrypt")
parser_get_account.add_argument("secret")
parser_get_account.set_defaults(func=encrypt)
parser_exit = subparsers.add_parser("exit")
parser_exit.set_defaults(func=exit)
parser_get_account = add_parser("attach_user_device", func=c.attach_user_device, format=format_list, args={
"uid": int,
"ppn": int,
})
parser_get_account = add_parser("create_user", func=c.create_user, format=format_child_type, args={
"num": str,
})
parser_get_account = add_parser("detach_user_device", func=c.detach_user_device, format=format_list, args={
"uid": int,
"ppn": int,
})
parser_get_account = add_parser("detach_user_device_by_device", func=c.detach_user_device_by_device, format=format_list, args={
"ppn": int,
})
parser_get_account = add_parser("detach_user_device_by_user", func=c.detach_user_device_by_user, format=format_list, args={
"uid": int,
})
parser_get_account = add_parser("get_account", func=c.get_account, format=format_child_type, args={
"id": int,
})
parser_get_account = add_parser("get_device", func=c.get_device, format=format_child_type, args={
"ppn": int,
})
parser_get_account = add_parser("get_devices", func=c.get_devices, format=format_list)
parser_get_account = add_parser("get_publickey", func=c.get_publickey)
parser_get_account = add_parser("get_user", func=c.get_user, format=format_child_type, args={
"uid": int,
})
parser_get_account = add_parser("get_users", func=c.get_users, format=format_list)
parser_help = subparsers.add_parser("help")
parser_help.set_defaults(func=parser.format_help)
parser_ping = subparsers.add_parser("ping")
parser_ping.set_defaults(func=lambda *args, **kwargs: "pong" if c.ping(*args, **kwargs) else "error")
parser_get_account = add_parser("set_user_name", func=c.set_user_name, args={
"uid": int,
"name": str,
})
parser_get_account = add_parser("set_user_num", func=c.set_user_num, args={
"uid": int,
"num": str,
})
parser_get_account = add_parser("set_user_relation_dynamic", func=c.set_user_relation_dynamic, args={
"uid": int,
})
parser_get_account = add_parser("set_user_relation_fixed", func=c.set_user_relation_fixed, args={
"uid": int,
})
parser_get_account = add_parser("set_user_sipauth", func=c.set_user_sipauth, args={
"uid": int,
"sipAuthId": str,
"sipPw": str,
})
if subcommand:
try:
args = parser.parse_args(subcommand)
except argparse.ArgumentError as e:
print("argument error:", e.message)
exit(1)
v = dict(vars(args))
v.pop("func")
format = lambda r: r
if v.get("format") is not None:
format = v.get("format")
v.pop("format")
try:
r = args.func(**v)
except Exception as e:
print("".join(traceback.format_exception(type(e), e, e.__traceback__)))
exit(1)
print(format(r))
exit()
print("OMMClient")
parser.print_help()
while True:
i = input("> ").split()
try:
args = parser.parse_args(i)
except argparse.ArgumentError as e:
print("argument error:", e.message)
continue
v = dict(vars(args))
v.pop("func")
format = lambda r: r
if v.get("format") is not None:
format = v.get("format")
v.pop("format")
try:
r = args.func(**v)
except Exception as e:
print("".join(traceback.format_exception(type(e), e, e.__traceback__)))
continue
print(format(r))