Compare commits

...

39 Commits

Author SHA1 Message Date
a11629f543 Readable error messages when OMM is unreachable when using cli 2023-06-17 18:27:30 +02:00
11f96a6069 Set short timeout to fail fast when the OMM is unreachable 2023-06-17 18:25:14 +02:00
421cc06d6c Add cli to flake and set default 2022-10-16 11:48:57 +02:00
aef2761f9f build documentation with flake too 2022-10-15 22:02:58 +02:00
607430ca69 add flake 2022-10-15 16:51:07 +02:00
f05ffdd86d Add cli to readme 2022-07-06 12:53:07 +02:00
ddb5194320 Add project script for cli 2022-07-06 12:50:16 +02:00
003b2ff367 Make ommcli part of the module 2022-07-06 12:49:57 +02:00
a7178b6f86 Add details to install 2022-07-06 01:46:32 +02:00
ee5bc790ea Add packaging information 2022-07-06 01:39:32 +02:00
f8c215d380 Make encryption part of the client library and make sipauth take plain text passwords 2022-06-19 16:21:29 +02:00
0df5286c0e Ignore if socket is blocking 2022-06-19 15:41:43 +02:00
000a00fec8 Add method to filter users 2022-06-19 15:40:28 +02:00
405f71f621 Add method to filter devices 2022-06-19 14:45:26 +02:00
82378e0bd7 Add user device realation changes 2022-06-17 00:08:40 +02:00
e85fa8ff29 Display type on cli 2022-06-16 23:45:01 +02:00
650dd842ce Use function docstring in cli usage 2022-06-16 23:39:56 +02:00
1d60043f37 Add method for detaching device and user 2022-06-16 23:13:54 +02:00
8849b8488c Introduce ability to attach users to devices 2022-06-16 23:01:04 +02:00
ac9ae74a0b Make the exception show you all unknown keys at once
In case a response type contains an unknown key, there are probably some 
more.
2022-06-16 23:00:22 +02:00
501be198c4 Allow ommsync mode to be used in cli 2022-06-16 22:40:01 +02:00
4796d1587b Improve exception message 2022-06-16 22:32:01 +02:00
deba76228b Add CreatePPUser message 2022-06-16 21:29:21 +02:00
428c4b8661 Add an interactive cli 2022-06-16 19:39:36 +02:00
93e0676147 Add infrastructure for dealing with encryption keys 2022-06-16 14:56:39 +02:00
2d8a1897f0 Add method for changing PP user num 2022-05-02 18:54:49 +02:00
e3f9fbf36b Allow modification of PPUser 2022-05-02 18:33:21 +02:00
36187c18b3 Fix message name quoting 2022-05-02 18:08:31 +02:00
2eb04e0428 Remove weird side effects that insert the values from previuos call 2022-05-02 18:07:15 +02:00
0a442e4ab5 Add child typing to all other existing message types 2022-05-02 00:41:46 +02:00
bb8dd03299 Implement EnumType 2022-05-02 00:14:24 +02:00
80eec844bf Update readme 2022-05-01 23:57:37 +02:00
bd68cbd7d8 Bootstrap child type casting 2022-05-01 23:55:18 +02:00
0e19927610 Restructuring api documentation 2022-05-01 22:43:52 +02:00
cf3c16c66a Refactor message classes
Message classes allow dynamic attribute access to message fields. Childs
get exposed through a dedicated class by attributes too. Message type
fields and childs have types that get enforces. None type is allowed
too while transitioning.
2022-05-01 22:32:08 +02:00
1752a9151d Add support for PP user 2022-05-01 13:27:56 +02:00
5365b942ce Add function to get all devices 2022-05-01 13:13:31 +02:00
7064d6f615 Rename get_pp_dev to get_device to match python-mitel feature naming 2022-05-01 12:53:46 +02:00
50d9b79ad0 Fix documentation 2022-05-01 12:50:59 +02:00
29 changed files with 1222 additions and 272 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
__pycache__
dist/
docs/_build

View File

@@ -2,6 +2,20 @@
Another attempt for a modern client library to the Mitel OM Application XML Interface.
## Install
Without any additional dependencies:
```
pip install "mitel_ommclient2 @ git+https://git.clerie.de/clerie/mitel_ommclient2.git@main"
```
Add dependencies to enable secret handling, if you need it.
```
pip install "mitel_ommclient2[crypt] @ git+https://git.clerie.de/clerie/mitel_ommclient2.git@main"
```
## Quicksart
Just some examples to give you an idea what this does.
@@ -16,11 +30,22 @@ c = mitel_ommclient2.OMMClient2("omm.local", "admin", "admin")
c.ping()
# Create custom messages
r = c.connection.request(mitel_ommclient2.messages.Ping(timeStamp=2342))
m = mitel_ommclient2.messages.Ping()
m.timeStamp = 2342
r = c.connection.request(m)
```
Consult class documentation for more in depth examples and options.
## Interactive CLI
The package installs a script called `ommclient2`.
This allowes basic interactive testing of the library.
```
ommclient2 --help
```
## Attribution
This software is inspired by `python-mitel` by Thomas and n-st.

7
docs/api/client.rst Normal file
View File

@@ -0,0 +1,7 @@
mitel\_ommclient2.client module
===============================
.. automodule:: mitel_ommclient2.client
:members:
:undoc-members:
:show-inheritance:

7
docs/api/connection.rst Normal file
View File

@@ -0,0 +1,7 @@
mitel\_ommclient2.connection module
===================================
.. automodule:: mitel_ommclient2.connection
:members:
:undoc-members:
:show-inheritance:

7
docs/api/exceptions.rst Normal file
View File

@@ -0,0 +1,7 @@
mitel\_ommclient2.exceptions module
===================================
.. automodule:: mitel_ommclient2.exceptions
:members:
:undoc-members:
:show-inheritance:

View File

@@ -6,4 +6,7 @@ API Documentation
.. toctree::
:maxdepth: 4
mitel_ommclient2
client
connection
exceptions
messages

View File

@@ -1,45 +1,32 @@
mitel\_ommclient2.messages package
==================================
Submodules
----------
mitel\_ommclient2.messages.getaccount module
--------------------------------------------
.. automodule:: mitel_ommclient2.messages
:members:
:undoc-members:
:show-inheritance:
.. automodule:: mitel_ommclient2.messages.getaccount
:members:
:undoc-members:
:show-inheritance:
mitel\_ommclient2.messages.getppdev module
------------------------------------------
.. automodule:: mitel_ommclient2.messages.getppdev
:members:
:undoc-members:
:show-inheritance:
mitel\_ommclient2.messages.open module
--------------------------------------
.. automodule:: mitel_ommclient2.messages.getppuser
:members:
:undoc-members:
:show-inheritance:
.. automodule:: mitel_ommclient2.messages.open
:members:
:undoc-members:
:show-inheritance:
mitel\_ommclient2.messages.ping module
--------------------------------------
.. automodule:: mitel_ommclient2.messages.ping
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: mitel_ommclient2.messages
:members:
:undoc-members:
:show-inheritance:

View File

@@ -1,45 +0,0 @@
mitel\_ommclient2 package
=========================
Subpackages
-----------
.. toctree::
:maxdepth: 4
mitel_ommclient2.messages
Submodules
----------
mitel\_ommclient2.client module
-------------------------------
.. automodule:: mitel_ommclient2.client
:members:
:undoc-members:
:show-inheritance:
mitel\_ommclient2.connection module
-----------------------------------
.. automodule:: mitel_ommclient2.connection
:members:
:undoc-members:
:show-inheritance:
mitel\_ommclient2.exceptions module
-----------------------------------
.. automodule:: mitel_ommclient2.exceptions
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: mitel_ommclient2
:members:
:undoc-members:
:show-inheritance:

View File

@@ -14,7 +14,7 @@ This is the documentation for mitel_ommclient2. To get started stick to the :doc
manual/client
manual/connection
manual/messages
api/modules
api/index
Indices and tables
==================

View File

@@ -15,7 +15,7 @@ to establish a transport to the API.
import mitel_ommclient2
conn = mitel_ommclient2..connection.Connection("omm.local")
conn = mitel_ommclient2.connection.Connection("omm.local")
To actually connect to the OMM, you need to call :func:`mitel_ommclient2.connection.Connection.connect`.
@@ -45,8 +45,8 @@ You hand over a Request object and receive a response object.
.. code-block:: python
>>> request = mitel_ommclient2.messages.Ping()
>>> r = conn.request(request)
>>> m = mitel_ommclient2.messages.Ping()
>>> r = conn.request(m)
>>> r.name
'PingResp'

View File

@@ -5,14 +5,13 @@ The API consists of three main message types: request, response and event. They
are represented by :class:`mitel_ommclient2.messages.Request`, :class:`mitel_ommclient2.messages.Response`
and events aren't supported yet.
There are several subclasses for each messages type, but they are just just overlays
to provide a conveinient interface to message content using attributes.
There are several subclasses for each messages type, which provide a conveinient
interface to message content using attributes.
Each message provides three attributes that define the central interfaces:
* name
* attrs
* childs
For each message you can access each field directly as class attributes.
There are two special attributes:
* name: returns the message name
* childs: allowes you to access childs by the child name as class attributes
Using messages
--------------
@@ -27,27 +26,20 @@ and hand it over to :func:`mitel_ommclient2.client.OMMClient2.request` or
my_time = int(time.time())
request = mitel_ommclient2.messages.Ping(timeStamp=my_time)
r = c.request(request)
m = mitel_ommclient2.messages.Ping()
m.timeStamp = my_time
r = c.request(m)
ping = r.timeStamp - my_time
Crafting your own messages
--------------------------
A more complex example
----------------------
It some cases your message class isn't implemented yet. Then you can craft the
message yourself.
This demonstrates how to access message childs.
.. code-block:: python
import time
my_time = int(time.time())
request = mitel_ommclient2.messages.Request("Ping")
request.attrs = {
"timeStamp": my_time,
}
r = c.request(request)
ping = r.attrs["timeStamp"] - my_time
m = messages.GetAccount()
m.id = id
r = self.connection.request(m)
return r.childs.account[0]

27
flake.lock generated Normal file
View File

@@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1665732960,
"narHash": "sha256-WBZ+uSHKFyjvd0w4inbm0cNExYTn8lpYFcHEes8tmec=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "4428e23312933a196724da2df7ab78eb5e67a88e",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

49
flake.nix Normal file
View File

@@ -0,0 +1,49 @@
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
};
outputs = { self, nixpkgs, ... }: {
packages.x86_64-linux = let
pkgs = import nixpkgs {
system = "x86_64-linux";
};
in {
mitel-ommclient2 = pkgs.python3Packages.buildPythonPackage rec {
pname = "mitel-ommclient2";
version = "0.0.1";
src = ./.;
outputs = [
"out"
"doc"
];
nativeBuildInputs = [
pkgs.python3Packages.sphinxHook
];
format = "pyproject";
buildInputs = [ pkgs.python3Packages.hatchling ];
propagatedBuildInputs = [ pkgs.python3Packages.rsa ];
pythonImportsCheck = [ "mitel_ommclient2" ];
};
default = self.packages.x86_64-linux.mitel-ommclient2;
};
apps.x86_64-linux = {
ommclient2 = {
type = "app";
program = self.packages.x86_64-linux.mitel-ommclient2 + "/bin/ommclient2";
};
default = self.apps.x86_64-linux.ommclient2;
};
hydraJobs = {
inherit (self)
packages;
};
};
}

189
mitel_ommclient2/cli.py Executable file
View File

@@ -0,0 +1,189 @@
#!/usr/bin/env python3
import argparse
import getpass
import time
import traceback
from . import OMMClient2
from .exceptions import EAuth, ENoEnt
from .messages import GetAccount, Ping
# exit handling with argparse is a bit broken even with exit_on_error=False, so we hack this
def error_instead_exit(self, message):
raise argparse.ArgumentError(None, message)
argparse.ArgumentParser.error = error_instead_exit
def format_child_type(t):
return " {}\n{}".format(t.__class__.__name__, "\n".join(["{:<30} {}".format(key, value) for key, value in t._attrs.items()]))
def format_list(v):
return "\n\n\n\n".join(format_child_type(d) for d in v)
return fl
def main():
connect_parser = argparse.ArgumentParser(prog='ommclient2')
connect_parser.add_argument("-n", dest="hostname", default="127.0.0.1")
connect_parser.add_argument("-u", dest="username", default="omm")
connect_parser.add_argument("-p", dest="password")
connect_parser.add_argument("--ommsync", dest="ommsync", action='store_true', help="Log in with ommsync mode")
connect_parser.add_argument("subcommand", nargs="*")
args = connect_parser.parse_args()
hostname = args.hostname
username = args.username
password = args.password
ommsync = args.ommsync
subcommand = args.subcommand
if not password:
password = getpass.getpass(prompt="OMM password for {}@{}:".format(username, hostname))
try:
c = OMMClient2(hostname, username, password, ommsync=ommsync)
except EAuth:
print("Authentication failed")
exit(1)
except TimeoutError:
print("OMM unreachable")
exit(1)
parser = argparse.ArgumentParser(prog="ommclient2", add_help=False, exit_on_error=False)
subparsers = parser.add_subparsers()
def add_parser(command_name, func, format=None, args={}):
subp = subparsers.add_parser(command_name, help=func.__doc__.strip().split("\n")[0], description=func.__doc__)
if format is not None:
subp.set_defaults(func=func, format=format)
else:
subp.set_defaults(func=func)
for a, t in args.items():
subp.add_argument(a, type=t)
return subp
parser_exit = subparsers.add_parser("exit")
parser_exit.set_defaults(func=exit)
parser_get_account = add_parser("attach_user_device", func=c.attach_user_device, format=format_list, args={
"uid": int,
"ppn": int,
})
parser_get_account = add_parser("create_user", func=c.create_user, format=format_child_type, args={
"num": str,
})
parser_get_account = add_parser("detach_user_device", func=c.detach_user_device, format=format_list, args={
"uid": int,
"ppn": int,
})
parser_get_account = add_parser("detach_user_device_by_device", func=c.detach_user_device_by_device, format=format_list, args={
"ppn": int,
})
parser_get_account = add_parser("detach_user_device_by_user", func=c.detach_user_device_by_user, format=format_list, args={
"uid": int,
})
parser_get_account = add_parser("encrypt", func=c.encrypt, args={
"secret": str,
})
parser_get_account = add_parser("get_account", func=c.get_account, format=format_child_type, args={
"id": int,
})
parser_get_account = add_parser("get_device", func=c.get_device, format=format_child_type, args={
"ppn": int,
})
parser_get_account = add_parser("get_devices", func=c.get_devices, format=format_list)
parser_get_account = add_parser("get_publickey", func=c.get_publickey)
parser_get_account = add_parser("get_user", func=c.get_user, format=format_child_type, args={
"uid": int,
})
parser_get_account = add_parser("get_users", func=c.get_users, format=format_list)
parser_help = subparsers.add_parser("help")
parser_help.set_defaults(func=parser.format_help)
parser_ping = subparsers.add_parser("ping")
parser_ping.set_defaults(func=lambda *args, **kwargs: "pong" if c.ping(*args, **kwargs) else "error")
parser_get_account = add_parser("set_user_name", func=c.set_user_name, args={
"uid": int,
"name": str,
})
parser_get_account = add_parser("set_user_num", func=c.set_user_num, args={
"uid": int,
"num": str,
})
parser_get_account = add_parser("set_user_relation_dynamic", func=c.set_user_relation_dynamic, args={
"uid": int,
})
parser_get_account = add_parser("set_user_relation_fixed", func=c.set_user_relation_fixed, args={
"uid": int,
})
parser_get_account = add_parser("set_user_sipauth", func=c.set_user_sipauth, args={
"uid": int,
"sipAuthId": str,
"sipPw": str,
})
if subcommand:
try:
args = parser.parse_args(subcommand)
except argparse.ArgumentError as e:
print("argument error:", e.message)
exit(1)
v = dict(vars(args))
v.pop("func")
format = lambda r: r
if v.get("format") is not None:
format = v.get("format")
v.pop("format")
try:
r = args.func(**v)
except Exception as e:
print("".join(traceback.format_exception(type(e), e, e.__traceback__)))
exit(1)
print(format(r))
exit()
print("OMMClient")
parser.print_help()
while True:
i = input("> ").split()
try:
args = parser.parse_args(i)
except argparse.ArgumentError as e:
print("argument error:", e.message)
continue
v = dict(vars(args))
v.pop("func")
format = lambda r: r
if v.get("format") is not None:
format = v.get("format")
v.pop("format")
try:
r = args.func(**v)
except Exception as e:
print("".join(traceback.format_exception(type(e), e, e.__traceback__)))
continue
print(format(r))
if __name__ == "__main__":
main()

View File

@@ -1,7 +1,16 @@
#!/usr/bin/env python3
import base64
try:
# This is is only dependency not from the modules inlcuded in python by default, so we make it optional
import rsa
except ImportError:
rsa = None
from .connection import Connection
from . import exceptions
from . import messages
from . import types
class OMMClient2:
"""
@@ -43,9 +52,156 @@ class OMMClient2:
self.connection.connect()
# Login
r = self.connection.request(messages.Open(self._username, self._password, UserDeviceSyncClient=self._ommsync))
m = messages.Open()
m.username = self._username
m.password = self._password
if self._ommsync:
m.UserDeviceSyncClient = "true"
r = self.connection.request(m)
r.raise_on_error()
def attach_user_device(self, uid, ppn):
"""
Attach user to device
:param uid: User id
:param ppn: Device id
Requires ommsync=True
"""
t_u = types.PPUserType()
t_u.uid = uid
t_u.ppn = ppn
t_u.relType = types.PPRelTypeType("Dynamic")
t_d = types.PPDevType()
t_d.ppn = ppn
t_d.uid = uid
t_d.relType = types.PPRelTypeType("Dynamic")
m = messages.SetPP()
m.childs.user = [t_u]
m.childs.pp = [t_d]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0], r.childs.pp[0]
def create_user(self, num):
"""
Create PP user
:param num: User number
"""
t = types.PPUserType()
t.num = num
m = messages.CreatePPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]
def detach_user_device(self, uid, ppn):
"""
Detach user from device
:param uid: User id
:param ppn: Device id
Requires ommsync=True
"""
t_u = types.PPUserType()
t_u.uid = uid
t_u.ppn = 0
t_u.relType = types.PPRelTypeType("Unbound")
t_d = types.PPDevType()
t_d.ppn = ppn
t_d.uid = 0
t_d.relType = types.PPRelTypeType("Unbound")
m = messages.SetPP()
m.childs.user = [t_u]
m.childs.pp = [t_d]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0], r.childs.pp[0]
def detach_user_device_by_user(self, uid):
"""
Detach user from device
This just requires the user id
:param uid: User id
Requires ommsync=True
"""
u = self.get_user(uid)
return self.detach_user_device(uid, u.ppn)
def detach_user_device_by_device(self, ppn):
"""
Detach user from device
This just requires the device id
:param ppn: Device id
Requires ommsync=True
"""
d = self.get_device(ppn)
return self.detach_user_device(d.uid, ppn)
def encrypt(self, secret):
"""
Encrypt secret for OMM
Required rsa module to be installed
:param secret: String to encrypt
"""
if rsa is None:
raise Exception("rsa module is required for excryption")
publickey = self.get_publickey()
pubkey = rsa.PublicKey(*publickey)
byte_secret = secret.encode('utf8')
byte_encrypt = rsa.encrypt(byte_secret, pubkey)
encrypt = base64.b64encode(byte_encrypt).decode("utf8")
return encrypt
def find_devices(self, filter):
"""
Get all devices matching a filter
:param filter: function taking one parameter which is a device, returns True to keep, False to discard
Usage::
>>> c.find_devices(lambda d: d.relType == mitel_ommclient2.types.PPRelTypeType("Unbound"))
"""
for d in self.get_devices():
if filter(d):
yield d
def find_users(self, filter):
"""
Get all users matching a filter
:param filter: function taking one parameter which is a user, returns True to keep, False to discard
Usage::
>>> c.find_users(lambda u: u.num.startswith("9998"))
"""
for u in self.get_users():
if filter(u):
yield u
def get_account(self, id):
"""
Get account
@@ -53,23 +209,97 @@ class OMMClient2:
:param id: User id
"""
r = self.connection.request(messages.GetAccount(id))
m = messages.GetAccount()
m.id = id
r = self.connection.request(m)
r.raise_on_error()
if r.account is None:
if r.childs.account is None:
return None
return r.account[0]
return r.childs.account[0]
def get_pp_dev(self, ppn):
def get_device(self, ppn):
"""
Get PP device
:param id: Device id
:param ppn: Device id
"""
r = self.connection.request(messages.GetPPDev(ppn))
m = messages.GetPPDev()
m.ppn = ppn
r = self.connection.request(m)
r.raise_on_error()
if r.pp is None:
if r.childs.pp is None:
return None
return r.pp[0]
return r.childs.pp[0]
def get_devices(self):
"""
Get all PP devices
"""
next_ppn = 0
while True:
m = messages.GetPPDev()
m.ppn = next_ppn
m.maxRecords = 20
r = self.connection.request(m)
try:
r.raise_on_error()
except exceptions.ENoEnt:
# No more devices to fetch
break
# Output all found devices
for pp in r.childs.pp:
yield pp
# Determine next possible ppn
next_ppn = int(pp.ppn) + 1
def get_publickey(self):
"""
Get public key for encrypted values
"""
m = messages.GetPublicKey()
r = self.connection.request(m)
r.raise_on_error()
return int(r.modulus, 16), int(r.exponent, 16)
def get_user(self, uid):
"""
Get PP user
:param uid: User id
"""
m = messages.GetPPUser()
m.uid = uid
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]
def get_users(self):
"""
Get all PP users
"""
next_uid = 0
while True:
m = messages.GetPPUser()
m.uid = next_uid
m.maxRecords = 20
r = self.connection.request(m)
try:
r.raise_on_error()
except exceptions.ENoEnt:
# No more devices to fetch
break
# Output all found devices
for user in r.childs.user:
yield user
# Determine next possible ppn
next_uid = int(user.uid) + 1
def ping(self):
"""
@@ -82,3 +312,83 @@ class OMMClient2:
if r.errCode is None:
return True
return False
def set_user_name(self, uid, name):
"""
Set PP user name
:param uid: User id
:param name: User name
"""
t = types.PPUserType()
t.uid = uid
t.name = name
m = messages.SetPPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]
def set_user_num(self, uid, num):
"""
Set PP user number
:param uid: User id
:param num: User number
"""
t = types.PPUserType()
t.uid = uid
t.num = num
m = messages.SetPPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]
def set_user_relation_dynamic(self, uid):
"""
Set PP user to PP device relation to dynamic type
:param uid: User id
"""
m = messages.SetPPUserDevRelation()
m.uid = uid
m.relType = types.PPRelTypeType("Dynamic")
r = self.connection.request(m)
r.raise_on_error()
def set_user_relation_fixed(self, uid):
"""
Set PP user to PP device relation to fixed type
:param uid: User id
"""
m = messages.SetPPUserDevRelation()
m.uid = uid
m.relType = types.PPRelTypeType("Fixed")
r = self.connection.request(m)
r.raise_on_error()
def set_user_sipauth(self, uid, sipAuthId, sipPw):
"""
Set PP user sip credentials
:param uid: User id
:param sipAuthId: SIP user name
:param sipPw: Plain text password
"""
t = types.PPUserType()
t.uid = uid
t.sipAuthId = sipAuthId
t.sipPw = self.encrypt(sipPw)
m = messages.SetPPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]

View File

@@ -26,6 +26,7 @@ class Connection:
self._host = host
self._port = port
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(2)
self._seq = 0 # state of the sequence number generator
self._requests = {} # waiting area for pending responses
@@ -55,8 +56,11 @@ class Connection:
if select.select([self._socket], [], []) != ([], [], []):
# wait for data availiable
while True:
# fill buffer with one message
data = self._socket.recv(1024)
try:
# fill buffer with one message
data = self._socket.recv(1024)
except BlockingIOError:
continue
if not data:
# buffer is empty

View File

@@ -3,47 +3,107 @@
from xml.dom.minidom import getDOMImplementation, parseString
from ..exceptions import exception_classes, OMResponseException
from ..types import cast_dict_to_childtype
class Request:
class Message:
"""
Request message class
:param name: Name of the message
:param seq: Unique sequence number to associate responses
Usage::
>>> req = Request("Ping")
"""
def __init__(self, name, seq=None):
self.name = name
self.attrs = {}
self.childs = {}
if seq is not None:
self.attrs["seq"] = seq
@property
def seq(self):
return self.attrs.get("seq")
@seq.setter
def seq(self, seq):
self.attrs["seq"] = seq
class Response:
"""
Response message class
Base message class
:param name: Name of the message
:param attrs: Message attributes
:param childs: Message children
"""
def __init__(self, name, attrs={}, childs={}):
# Fields defined by the base type class
BASE_FIELDS = {}
# Fields defined by subclasses
FIELDS = {}
# Child types
CHILDS = {}
# Fields dicts consist of the field name as name and the field type as value
# Use None if the field type is unknown, any type is allowed then
class Childs:
"""
Contains message childs
"""
CHILDS = {}
def __init__(self, child_types, child_dict):
self.CHILDS = child_types
self._child_dict = child_dict
def __getattr__(self, name):
if name in self.CHILDS.keys():
return self._child_dict.get(name)
else:
raise AttributeError()
def __setattr__(self, name, value):
if name in self.CHILDS.keys():
if not isinstance(value, list):
raise TypeError()
for v in value:
if self.CHILDS[name] is not None and type(v) != self.CHILDS[name]:
raise TypeError()
self._child_dict[name] = value
else:
object.__setattr__(self, name, value)
def __init__(self, name=None, attrs={}, childs={}):
self.name = name
self.attrs = attrs
self.childs = childs
if not self.name:
self.name = self.__class__.__name__
self._attrs = {} | attrs
self._childs = {} | childs
self.childs = self.Childs(self.CHILDS, self._childs)
def __getattr__(self, name):
fields = self.FIELDS | self.BASE_FIELDS
if name in fields.keys():
return self._attrs.get(name)
else:
raise AttributeError()
def __setattr__(self, name, value):
fields = self.FIELDS | self.BASE_FIELDS
if name in fields.keys():
if fields[name] is not None and type(value) != fields[name]:
raise TypeError()
self._attrs[name] = value
else:
object.__setattr__(self, name, value)
def __repr__(self):
return "{}({}, {}, {})".format(self.__class__.__name__, repr(self.name), repr(self._attrs), repr(self._childs))
class Request(Message):
"""
Request message type class
"""
BASE_FIELDS = {
"seq": int,
}
class Response(Message):
"""
Response message type class
"""
BASE_FIELDS = {
"seq": int,
"errCode": None,
"info": None,
"bad": None,
"maxLen": None,
}
def raise_on_error(self):
"""
@@ -62,30 +122,28 @@ class Response:
if self.errCode is not None:
raise exception_classes.get(self.errCode, OMResponseException)(response=self)
@property
def seq(self):
return int(self.attrs.get("seq"))
@property
def errCode(self):
return self.attrs.get("errCode")
REQUEST_TYPES = {}
RESPONSE_TYPES = {}
@property
def info(self):
return self.attrs.get("info")
def request_type(c):
REQUEST_TYPES[c.__name__] = c
return c
@property
def bad(self):
return self.attrs.get("bad")
@property
def maxLen(self):
return self.attrs.get("maxLen")
def response_type(c):
RESPONSE_TYPES[c.__name__] = c
return c
from .createppuser import CreatePPUser, CreatePPUserResp
from .getaccount import GetAccount, GetAccountResp
from .getppdev import GetPPDev, GetPPDevResp
from .getppuser import GetPPUser, GetPPUserResp
from .getpublickey import GetPublicKey, GetPublicKeyResp
from .open import Open, OpenResp
from .ping import Ping, PingResp
from .setpp import SetPP, SetPPResp
from .setppuser import SetPPUser, SetPPUserResp
from .setppuserdevrelation import SetPPUserDevRelation, SetPPUserDevRelationResp
def construct(request):
"""
@@ -95,29 +153,19 @@ def construct(request):
message = impl.createDocument(None, request.name, None)
root = message.documentElement
for k, v in request.attrs.items():
for k, v in request._attrs.items():
root.setAttribute(str(k), str(v))
for k, v in request.childs.items():
child = message.createElement(k)
if v is not None:
for c_k, c_v in v.items():
child.setAttribute(str(c_k), str(c_v))
root.appendChild(child)
for child_name, child_list in request._childs.items():
if child_list is not None:
for child_list_item in child_list:
child = message.createElement(child_name)
for child_item_key, child_item_value in child_list_item._attrs.items():
child.setAttribute(str(child_item_key), str(child_item_value))
root.appendChild(child)
return root.toxml()
def _response_type_by_name(name):
response_types = [
GetAccountResp,
GetPPDevResp,
PingResp,
]
response_types_dict = {r.__name__: r for r in response_types}
return response_types_dict.get(name, Response)
def parse(message):
message = parseString(message)
root = message.documentElement
@@ -126,9 +174,15 @@ def parse(message):
attrs = {}
childs = {}
response_type = RESPONSE_TYPES.get(name)
fields = response_type.FIELDS | response_type.BASE_FIELDS
for i in range(0, root.attributes.length):
item = root.attributes.item(i)
attrs[item.name] = item.value
if fields.get(item.name) is not None:
attrs[item.name] = fields[item.name](item.value)
else:
attrs[item.name] = item.value
child = root.firstChild
while child is not None:
@@ -138,6 +192,11 @@ def parse(message):
new_child[item.name] = item.value
childname = child.tagName
# cast dict into child type
if response_type.CHILDS.get(childname) is not None:
new_child = cast_dict_to_childtype(response_type.CHILDS[childname], new_child)
if childname in childs:
childs[childname].append(new_child)
else:
@@ -146,4 +205,4 @@ def parse(message):
child = child.nextSibling
return _response_type_by_name(name)(name, attrs, childs)
return response_type(name, attrs, childs)

View File

@@ -0,0 +1,18 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPUserType
@request_type
class CreatePPUser(Request):
CHILDS = {
"user": PPUserType,
}
@response_type
class CreatePPUserResp(Response):
CHILDS = {
"user": PPUserType,
}

View File

@@ -1,27 +1,19 @@
#!/usr/bin/env python3
from . import Request, Response
from . import Request, Response, request_type, response_type
from ..types import AccountType
@request_type
class GetAccount(Request):
def __init__(self, id, maxRecords=None, **kwargs):
super().__init__("GetAccount", **kwargs)
self.attrs["id"] = id
if maxRecords is not None:
self.attrs["maxRecords"] = maxRecords
@property
def id(self):
return self.attrs.get("id")
@property
def maxRecords(self):
return self.attrs.get("maxRecords")
FIELDS = {
"id": int,
"maxRecords": int,
}
@response_type
class GetAccountResp(Response):
@property
def account(self):
return self.childs.get("account")
CHILDS = {
"account": AccountType,
}

View File

@@ -1,27 +1,19 @@
#!/usr/bin/env python3
from . import Request, Response
from . import Request, Response, request_type, response_type
from ..types import PPDevType
@request_type
class GetPPDev(Request):
def __init__(self, ppn, maxRecords=None, **kwargs):
super().__init__("GetPPDev", **kwargs)
self.attrs["ppn"] = ppn
if maxRecords is not None:
self.attrs["maxRecords"] = maxRecords
@property
def ppn(self):
return self.attrs.get("ppn")
@property
def maxRecords(self):
return self.attrs.get("maxRecords")
FIELDS = {
"ppn": int,
"maxRecords": int,
}
@response_type
class GetPPDevResp(Response):
@property
def pp(self):
return self.childs.get("pp")
CHILDS = {
"pp": PPDevType,
}

View File

@@ -0,0 +1,19 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPUserType
@request_type
class GetPPUser(Request):
FIELDS = {
"uid": int,
"maxRecords": int,
}
@response_type
class GetPPUserResp(Response):
CHILDS = {
"user": PPUserType,
}

View File

@@ -0,0 +1,16 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
@request_type
class GetPublicKey(Request):
pass
@response_type
class GetPublicKeyResp(Response):
FIELDS = {
"modulus": str,
"exponent": str,
}

View File

@@ -1,55 +1,23 @@
#!/usr/bin/env python3
from . import Request, Response
from . import Request, Response, request_type, response_type
@request_type
class Open(Request):
"""
Authenticate Message
FIELDS = {
"username": None,
"password": None,
"UserDeviceSyncClient": None,
}
Needs to be the first message on a new connection.
:param username: Username
:param password: Password
:param UserDeviceSyncClient: If True login as OMM-Sync client. Some operations in OMM-Sync mode might lead to destroy DECT paring.
"""
def __init__(self, username, password, UserDeviceSyncClient=False, **kwargs):
super().__init__("Open", **kwargs)
self.attrs["username"] = username
self.attrs["password"] = password
if UserDeviceSyncClient:
self.attrs["UserDeviceSyncClient"] = "true"
@property
def username(self):
return self.attrs.get("username")
@property
def password(self):
return self.attrs.get("password")
@property
def UserDeviceSyncClient(self):
return self.attrs.get("UserDeviceSyncClient")
@response_type
class OpenResp(Response):
@property
def protocolVersion(self):
return self.attrs.get("protocolVersion")
@property
def minPPSwVersion1(self):
return self.attrs.get("minPPSwVersion1")
@property
def minPPSwVersion2(self):
return self.attrs.get("minPPSwVersion2")
@property
def ommStbState(self):
return self.attrs.get("ommStbState")
@property
def publicKey(self):
return self.attrs.get("publicKey")
FIELDS = {
"protocolVersion": None,
"minPPSwVersion1": None,
"minPPSwVersion2": None,
"ommStbState": None,
"publicKey": None,
}

View File

@@ -1,20 +1,17 @@
#!/usr/bin/env python3
from . import Request, Response
from . import Request, Response, request_type, response_type
@request_type
class Ping(Request):
def __init__(self, timeStamp=None, **kwargs):
super().__init__("Ping", **kwargs)
FIELDS = {
"timeStamp": int,
}
if timeStamp is not None:
self.attrs["timeStamp"] = timeStamp
@property
def timeStamp(self):
return self.attrs.get("timeStamp")
@response_type
class PingResp(Response):
@property
def timeStamp(self):
return self.attrs.get("timeStamp")
FIELDS = {
"timeStamp": int,
}

View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPDevType, PPUserType
@request_type
class SetPP(Request):
CHILDS = {
"pp": PPDevType,
"user": PPUserType,
}
@response_type
class SetPPResp(Response):
CHILDS = {
"pp": PPDevType,
"user": PPUserType,
}

View File

@@ -0,0 +1,18 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPUserType
@request_type
class SetPPUser(Request):
CHILDS = {
"user": PPUserType,
}
@response_type
class SetPPUserResp(Response):
CHILDS = {
"user": PPUserType,
}

View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPRelTypeType
@request_type
class SetPPUserDevRelation(Request):
FIELDS = {
"uid": int,
"relType": PPRelTypeType,
}
@response_type
class SetPPUserDevRelationResp(Response):
FIELDS = {
"uid": int,
"relType": PPRelTypeType,
}

View File

@@ -0,0 +1,238 @@
#!/usr/bin/env python3
class ChildType:
"""
Base type class
:param name: Name of the message
:param attrs: Message attributes
:param childs: Message children
"""
FIELDS = {}
def __init__(self, attrs={}):
self._attrs = {}
if self.FIELDS is not None:
for k, v in attrs.items():
setattr(self, k, v)
else:
# don't check attrs for types we do have any information
self._attrs = attrs
def __getattr__(self, name):
if name in self.FIELDS.keys():
return self._attrs.get(name)
else:
raise AttributeError()
def __setattr__(self, name, value):
if name in self.FIELDS.keys():
if self.FIELDS[name] is not None and type(value) != self.FIELDS[name]:
raise TypeError()
self._attrs[name] = value
else:
object.__setattr__(self, name, value)
def __repr__(self):
return "{}({})".format(self.__class__.__name__, repr(self._attrs))
def cast_dict_to_childtype(t, d):
errors = {} # collect unknown keys
for k, v in d.items():
if k in t.FIELDS.keys():
if t.FIELDS[k] is not None and type(v) != t.FIELDS[k]:
d[k] = t.FIELDS[k](v)
else:
errors[k] = v
if errors != {}:
raise KeyError("The following keys are unknown for '{}': {}".format(t.__name__, errors))
return t(d)
class EnumType:
VALUES = [] # Allowed values
def __init__(self, s):
if self.VALUES is not None:
if s in self.VALUES:
self.value = s
else:
raise ValueError()
else:
self.value = s
def __str__(self):
return str(self.value)
def __repr__(self):
return "{}({})".format(self.__class__.__name__, repr(self.value))
def __eq__(self, other):
return isinstance(other, type(self)) and self.value == other.value
class CallForwardStateType(EnumType):
VALUES = [
"Off",
"Busy",
"NoAnswer",
"BusyNoAnswer",
"All",
]
class DECTSubscriptionStateType(EnumType):
VALUES = None
class LanguageType(EnumType):
VALUES = None
class MonitoringStateType(EnumType):
VALUES = None
class PPRelTypeType(EnumType):
VALUES = [
"Fixed",
"Dynamic",
"Unbound",
]
class AccountType(ChildType):
FIELDS = {
"id": int,
"username": str,
"password": str,
"oldPassword": str,
"permission": None,
"active": bool,
"aging": None,
"expire": int,
"state": str,
}
class PPDevType(ChildType):
FIELDS = {
"ppn": int,
"timeStamp": int,
"relType": PPRelTypeType,
"uid": int,
"ipei": str,
"ac": str,
"s": DECTSubscriptionStateType,
"uak": str,
"encrypt": bool,
"capMessaging": bool,
"capMessagingForInternalUse": bool,
"capEnhLocating": bool,
"capBluetooth": bool,
"ethAddr": str,
"hwType": str,
"ppProfileCapability": bool,
"ppDefaultProfileLoaded": bool,
"subscribeToPARIOnly": bool,
# undocumented
"ommId": str,
"ommIdAck": str,
"timeStampAdmin": int,
"timeStampRelation": int,
"timeStampRoaming": int,
"timeStampSubscription": int,
"autoCreate": bool,
"roaming": None, # value: 'RoamingComplete'
"modicType": str, # value: '01'
"locationData": str, # value: '000001000000'
"dectIeFixedId": str,
"subscriptionId": str,
"ppnSec": int,
}
class PPUserType(ChildType):
FIELDS = {
"uid": int,
"timeStamp": int,
"relType": PPRelTypeType,
"ppn": int,
"name": str,
"num": str,
"hierarchy1": str,
"hierarchy2": str,
"addId": str,
"pin": str,
"sipAuthId": str,
"sipPw": str,
"sosNum": str,
"voiceboxNum": str,
"manDownNum": str,
"forwardState": CallForwardStateType,
"forwardTime": int,
"forwardDest": str,
"langPP": LanguageType,
"holdRingBackTime": int,
"autoAnswer": str,
"microphoneMute": str,
"warningTone": str,
"allowBargeIn": str,
"callWaitingDisabled": bool,
"external": bool,
"trackingActive": bool,
"locatable": bool,
"BTlocatable": bool,
"BTsensitivity": str,
"locRight": bool,
"msgRight": bool,
"sendVcardRight": bool,
"recvVcardRight": bool,
"keepLocalPB": bool,
"vip": bool,
"sipRegisterCheck": bool,
"allowVideoStream": bool,
"conferenceServerType": str,
"conferenceServerURI": str,
"monitoringMode": str,
"CUS": MonitoringStateType,
"HAS": MonitoringStateType,
"HSS": MonitoringStateType,
"HRS": MonitoringStateType,
"HCS": MonitoringStateType,
"SRS": MonitoringStateType,
"SCS": MonitoringStateType,
"CDS": MonitoringStateType,
"HBS": MonitoringStateType,
"BTS": MonitoringStateType,
"SWS": MonitoringStateType,
"credentialPw": str,
"configurationDataLoaded": bool,
"ppData": str,
"ppProfileId": int,
"fixedSipPort": int,
"calculatedSipPort": int,
# undocumented
"uidSec": int,
"permanent": bool,
"lang": None,
"autoLogoutOnCharge": bool,
"hotDeskingSupport": bool,
"authenticateLogout": bool,
"useSIPUserName": None,
"useSIPUserAuthentication": None,
"serviceUserName": None,
"serviceAuthName": None,
"serviceAuthPassword": None,
"keyLockEnable": None,
"keyLockPin": None,
"keyLockTime": None,
"ppnOld": int,
"timeStampAdmin": int,
"timeStampRelation": int,
}

30
pyproject.toml Normal file
View File

@@ -0,0 +1,30 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "mitel_ommclient2"
version = "0.0.1"
authors = [
{ name="clerie", email="hallo@clerie.de" },
]
description = "Another attempt for a modern client library to the Mitel OM Application XML Interface."
readme = "README.md"
license = { file="LICENSE" }
requires-python = ">=3.7"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
[project.optional-dependencies]
crypt = [
"rsa",
]
[project.scripts]
ommclient2 = "mitel_ommclient2.cli:main"
[project.urls]
"Source" = "https://git.clerie.de/clerie/mitel_ommclient2"