Compare commits

..

No commits in common. "82378e0bd72b207d219208ed7025cd8350ac2abf" and "2d8a1897f04f8f5e55b1cd77a6e94a9b43c6fb47" have entirely different histories.

8 changed files with 4 additions and 450 deletions

View File

@ -48,106 +48,10 @@ class OMMClient2:
m = messages.Open()
m.username = self._username
m.password = self._password
if self._ommsync:
m.UserDeviceSyncClient = "true"
m.UserDeviceSyncClient = self._ommsync
r = self.connection.request(m)
r.raise_on_error()
def attach_user_device(self, uid, ppn):
"""
Attach user to device
:param uid: User id
:param ppn: Device id
Requires ommsync=True
"""
t_u = types.PPUserType()
t_u.uid = uid
t_u.ppn = ppn
t_u.relType = types.PPRelTypeType("Dynamic")
t_d = types.PPDevType()
t_d.ppn = ppn
t_d.uid = uid
t_d.relType = types.PPRelTypeType("Dynamic")
m = messages.SetPP()
m.childs.user = [t_u]
m.childs.pp = [t_d]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0], r.childs.pp[0]
def create_user(self, num):
"""
Create PP user
:param num: User number
"""
t = types.PPUserType()
t.num = num
m = messages.CreatePPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]
def detach_user_device(self, uid, ppn):
"""
Detach user from device
:param uid: User id
:param ppn: Device id
Requires ommsync=True
"""
t_u = types.PPUserType()
t_u.uid = uid
t_u.ppn = 0
t_u.relType = types.PPRelTypeType("Unbound")
t_d = types.PPDevType()
t_d.ppn = ppn
t_d.uid = 0
t_d.relType = types.PPRelTypeType("Unbound")
m = messages.SetPP()
m.childs.user = [t_u]
m.childs.pp = [t_d]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0], r.childs.pp[0]
def detach_user_device_by_user(self, uid):
"""
Detach user from device
This just requires the user id
:param uid: User id
Requires ommsync=True
"""
u = self.get_user(uid)
return self.detach_user_device(uid, u.ppn)
def detach_user_device_by_device(self, ppn):
"""
Detach user from device
This just requires the device id
:param ppn: Device id
Requires ommsync=True
"""
d = self.get_device(ppn)
return self.detach_user_device(d.uid, ppn)
def get_account(self, id):
"""
Get account
@ -201,15 +105,6 @@ class OMMClient2:
# Determine next possible ppn
next_ppn = int(pp.ppn) + 1
def get_publickey(self):
"""
Get public key for encrypted values
"""
m = messages.GetPublicKey()
r = self.connection.request(m)
r.raise_on_error()
return int(r.modulus, 16), int(r.exponent, 16)
def get_user(self, uid):
"""
Get PP user
@ -290,50 +185,7 @@ class OMMClient2:
m = messages.SetPPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]
def set_user_relation_dynamic(self, uid):
"""
Set PP user to PP device relation to dynamic type
:param uid: User id
"""
m = messages.SetPPUserDevRelation()
m.uid = uid
m.relType = types.PPRelTypeType("Dynamic")
r = self.connection.request(m)
r.raise_on_error()
def set_user_relation_fixed(self, uid):
"""
Set PP user to PP device relation to fixed type
:param uid: User id
"""
m = messages.SetPPUserDevRelation()
m.uid = uid
m.relType = types.PPRelTypeType("Fixed")
r = self.connection.request(m)
r.raise_on_error()
def set_user_sipauth(self, uid, sipAuthId, sipPw):
"""
Set PP user sip credentials
:param uid: User id
:param sipAuthId: SIP user name
:param sipPw: Encrypted sip password
"""
t = types.PPUserType()
t.uid = uid
t.sipAuthId = sipAuthId
t.sipPw = sipPw
m = messages.SetPPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None

View File

@ -134,16 +134,12 @@ def response_type(c):
RESPONSE_TYPES[c.__name__] = c
return c
from .createppuser import CreatePPUser, CreatePPUserResp
from .getaccount import GetAccount, GetAccountResp
from .getppdev import GetPPDev, GetPPDevResp
from .getppuser import GetPPUser, GetPPUserResp
from .getpublickey import GetPublicKey, GetPublicKeyResp
from .open import Open, OpenResp
from .ping import Ping, PingResp
from .setpp import SetPP, SetPPResp
from .setppuser import SetPPUser, SetPPUserResp
from .setppuserdevrelation import SetPPUserDevRelation, SetPPUserDevRelationResp
def construct(request):
"""

View File

@ -1,18 +0,0 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPUserType
@request_type
class CreatePPUser(Request):
CHILDS = {
"user": PPUserType,
}
@response_type
class CreatePPUserResp(Response):
CHILDS = {
"user": PPUserType,
}

View File

@ -1,16 +0,0 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
@request_type
class GetPublicKey(Request):
pass
@response_type
class GetPublicKeyResp(Response):
FIELDS = {
"modulus": str,
"exponent": str,
}

View File

@ -1,20 +0,0 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPDevType, PPUserType
@request_type
class SetPP(Request):
CHILDS = {
"pp": PPDevType,
"user": PPUserType,
}
@response_type
class SetPPResp(Response):
CHILDS = {
"pp": PPDevType,
"user": PPUserType,
}

View File

@ -1,20 +0,0 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPRelTypeType
@request_type
class SetPPUserDevRelation(Request):
FIELDS = {
"uid": int,
"relType": PPRelTypeType,
}
@response_type
class SetPPUserDevRelationResp(Response):
FIELDS = {
"uid": int,
"relType": PPRelTypeType,
}

View File

@ -39,16 +39,12 @@ class ChildType:
return "{}({})".format(self.__class__.__name__, repr(self._attrs))
def cast_dict_to_childtype(t, d):
errors = {} # collect unknown keys
for k, v in d.items():
if k in t.FIELDS.keys():
if t.FIELDS[k] is not None and type(v) != t.FIELDS[k]:
d[k] = t.FIELDS[k](v)
else:
errors[k] = v
if errors != {}:
raise KeyError("The following keys are unknown for '{}': {}".format(t.__name__, errors))
raise KeyError()
return t(d)
@ -96,11 +92,7 @@ class MonitoringStateType(EnumType):
class PPRelTypeType(EnumType):
VALUES = [
"Fixed",
"Dynamic",
"Unbound",
]
VALUES = None
class AccountType(ChildType):
@ -138,18 +130,6 @@ class PPDevType(ChildType):
"ppDefaultProfileLoaded": bool,
"subscribeToPARIOnly": bool,
# undocumented
"ommId": str,
"ommIdAck": str,
"timeStampAdmin": int,
"timeStampRelation": int,
"timeStampRoaming": int,
"timeStampSubscription": int,
"autoCreate": bool,
"roaming": None, # value: 'RoamingComplete'
"modicType": str, # value: '01'
"locationData": str, # value: '000001000000'
"dectIeFixedId": str,
"subscriptionId": str,
"ppnSec": int,
}
@ -229,7 +209,4 @@ class PPUserType(ChildType):
"keyLockEnable": None,
"keyLockPin": None,
"keyLockTime": None,
"ppnOld": int,
"timeStampAdmin": int,
"timeStampRelation": int,
}

197
ommcli
View File

@ -1,197 +0,0 @@
#!/usr/bin/env python3
from mitel_ommclient2 import OMMClient2
from mitel_ommclient2.exceptions import ENoEnt
from mitel_ommclient2.messages import GetAccount, Ping
import time
import argparse
import base64
import getpass
import traceback
try:
# This is is only dependency not from the modules inlcuded in python by default, so we make it optional
import rsa
except ImportError:
rsa = None
# exit handling with argparse is a bit broken even with exit_on_error=False, so we hack this
def error_instead_exit(self, message):
raise argparse.ArgumentError(None, message)
argparse.ArgumentParser.error = error_instead_exit
def format_child_type(t):
return " {}\n{}".format(t.__class__.__name__, "\n".join(["{:<30} {}".format(key, value) for key, value in t._attrs.items()]))
def format_list(v):
return "\n\n\n\n".join(format_child_type(d) for d in v)
return fl
if __name__ == "__main__":
connect_parser = argparse.ArgumentParser(prog='ommclient2')
connect_parser.add_argument("-n", dest="hostname", default="127.0.0.1")
connect_parser.add_argument("-u", dest="username", default="omm")
connect_parser.add_argument("-p", dest="password")
connect_parser.add_argument("--ommsync", dest="ommsync", action='store_true', help="Log in with ommsync mode")
connect_parser.add_argument("subcommand", nargs="*")
args = connect_parser.parse_args()
hostname = args.hostname
username = args.username
password = args.password
ommsync = args.ommsync
subcommand = args.subcommand
if not password:
password = getpass.getpass(prompt="OMM password for {}@{}:".format(username, hostname))
c = OMMClient2(hostname, username, password, ommsync=ommsync)
def encrypt(secret):
if rsa is None:
raise Exception("rsa module is required for excryption")
publickey = c.get_publickey()
pubkey = rsa.PublicKey(*publickey)
byte_secret = secret.encode('utf8')
byte_encrypt = rsa.encrypt(byte_secret, pubkey)
encrypt = base64.b64encode(byte_encrypt).decode("utf8")
return encrypt
parser = argparse.ArgumentParser(prog="ommclient2", add_help=False, exit_on_error=False)
subparsers = parser.add_subparsers()
def add_parser(command_name, func, format=None, args={}):
subp = subparsers.add_parser(command_name, help=func.__doc__.strip().split("\n")[0], description=func.__doc__)
if format is not None:
subp.set_defaults(func=func, format=format)
else:
subp.set_defaults(func=func)
for a, t in args.items():
subp.add_argument(a, type=t)
return subp
parser_get_account = subparsers.add_parser("encrypt")
parser_get_account.add_argument("secret")
parser_get_account.set_defaults(func=encrypt)
parser_exit = subparsers.add_parser("exit")
parser_exit.set_defaults(func=exit)
parser_get_account = add_parser("attach_user_device", func=c.attach_user_device, format=format_list, args={
"uid": int,
"ppn": int,
})
parser_get_account = add_parser("create_user", func=c.create_user, format=format_child_type, args={
"num": str,
})
parser_get_account = add_parser("detach_user_device", func=c.detach_user_device, format=format_list, args={
"uid": int,
"ppn": int,
})
parser_get_account = add_parser("detach_user_device_by_device", func=c.detach_user_device_by_device, format=format_list, args={
"ppn": int,
})
parser_get_account = add_parser("detach_user_device_by_user", func=c.detach_user_device_by_user, format=format_list, args={
"uid": int,
})
parser_get_account = add_parser("get_account", func=c.get_account, format=format_child_type, args={
"id": int,
})
parser_get_account = add_parser("get_device", func=c.get_device, format=format_child_type, args={
"ppn": int,
})
parser_get_account = add_parser("get_devices", func=c.get_devices, format=format_list)
parser_get_account = add_parser("get_publickey", func=c.get_publickey)
parser_get_account = add_parser("get_user", func=c.get_user, format=format_child_type, args={
"uid": int,
})
parser_get_account = add_parser("get_users", func=c.get_users, format=format_list)
parser_help = subparsers.add_parser("help")
parser_help.set_defaults(func=parser.format_help)
parser_ping = subparsers.add_parser("ping")
parser_ping.set_defaults(func=lambda *args, **kwargs: "pong" if c.ping(*args, **kwargs) else "error")
parser_get_account = add_parser("set_user_name", func=c.set_user_name, args={
"uid": int,
"name": str,
})
parser_get_account = add_parser("set_user_num", func=c.set_user_num, args={
"uid": int,
"num": str,
})
parser_get_account = add_parser("set_user_relation_dynamic", func=c.set_user_relation_dynamic, args={
"uid": int,
})
parser_get_account = add_parser("set_user_relation_fixed", func=c.set_user_relation_fixed, args={
"uid": int,
})
parser_get_account = add_parser("set_user_sipauth", func=c.set_user_sipauth, args={
"uid": int,
"sipAuthId": str,
"sipPw": str,
})
if subcommand:
try:
args = parser.parse_args(subcommand)
except argparse.ArgumentError as e:
print("argument error:", e.message)
exit(1)
v = dict(vars(args))
v.pop("func")
format = lambda r: r
if v.get("format") is not None:
format = v.get("format")
v.pop("format")
try:
r = args.func(**v)
except Exception as e:
print("".join(traceback.format_exception(type(e), e, e.__traceback__)))
exit(1)
print(format(r))
exit()
print("OMMClient")
parser.print_help()
while True:
i = input("> ").split()
try:
args = parser.parse_args(i)
except argparse.ArgumentError as e:
print("argument error:", e.message)
continue
v = dict(vars(args))
v.pop("func")
format = lambda r: r
if v.get("format") is not None:
format = v.get("format")
v.pop("format")
try:
r = args.func(**v)
except Exception as e:
print("".join(traceback.format_exception(type(e), e, e.__traceback__)))
continue
print(format(r))