Compare commits

..

62 Commits

Author SHA1 Message Date
a11629f543 Readable error messages when OMM is unreachable when using cli 2023-06-17 18:27:30 +02:00
11f96a6069 Set short timeout to fail fast when the OMM is unreachable 2023-06-17 18:25:14 +02:00
421cc06d6c Add cli to flake and set default 2022-10-16 11:48:57 +02:00
aef2761f9f build documentation with flake too 2022-10-15 22:02:58 +02:00
607430ca69 add flake 2022-10-15 16:51:07 +02:00
f05ffdd86d Add cli to readme 2022-07-06 12:53:07 +02:00
ddb5194320 Add project script for cli 2022-07-06 12:50:16 +02:00
003b2ff367 Make ommcli part of the module 2022-07-06 12:49:57 +02:00
a7178b6f86 Add details to install 2022-07-06 01:46:32 +02:00
ee5bc790ea Add packaging information 2022-07-06 01:39:32 +02:00
f8c215d380 Make encryption part of the client library and make sipauth take plain text passwords 2022-06-19 16:21:29 +02:00
0df5286c0e Ignore if socket is blocking 2022-06-19 15:41:43 +02:00
000a00fec8 Add method to filter users 2022-06-19 15:40:28 +02:00
405f71f621 Add method to filter devices 2022-06-19 14:45:26 +02:00
82378e0bd7 Add user device realation changes 2022-06-17 00:08:40 +02:00
e85fa8ff29 Display type on cli 2022-06-16 23:45:01 +02:00
650dd842ce Use function docstring in cli usage 2022-06-16 23:39:56 +02:00
1d60043f37 Add method for detaching device and user 2022-06-16 23:13:54 +02:00
8849b8488c Introduce ability to attach users to devices 2022-06-16 23:01:04 +02:00
ac9ae74a0b Make the exception show you all unknown keys at once
In case a response type contains an unknown key, there are probably some 
more.
2022-06-16 23:00:22 +02:00
501be198c4 Allow ommsync mode to be used in cli 2022-06-16 22:40:01 +02:00
4796d1587b Improve exception message 2022-06-16 22:32:01 +02:00
deba76228b Add CreatePPUser message 2022-06-16 21:29:21 +02:00
428c4b8661 Add an interactive cli 2022-06-16 19:39:36 +02:00
93e0676147 Add infrastructure for dealing with encryption keys 2022-06-16 14:56:39 +02:00
2d8a1897f0 Add method for changing PP user num 2022-05-02 18:54:49 +02:00
e3f9fbf36b Allow modification of PPUser 2022-05-02 18:33:21 +02:00
36187c18b3 Fix message name quoting 2022-05-02 18:08:31 +02:00
2eb04e0428 Remove weird side effects that insert the values from previuos call 2022-05-02 18:07:15 +02:00
0a442e4ab5 Add child typing to all other existing message types 2022-05-02 00:41:46 +02:00
bb8dd03299 Implement EnumType 2022-05-02 00:14:24 +02:00
80eec844bf Update readme 2022-05-01 23:57:37 +02:00
bd68cbd7d8 Bootstrap child type casting 2022-05-01 23:55:18 +02:00
0e19927610 Restructuring api documentation 2022-05-01 22:43:52 +02:00
cf3c16c66a Refactor message classes
Message classes allow dynamic attribute access to message fields. Childs
get exposed through a dedicated class by attributes too. Message type
fields and childs have types that get enforces. None type is allowed
too while transitioning.
2022-05-01 22:32:08 +02:00
1752a9151d Add support for PP user 2022-05-01 13:27:56 +02:00
5365b942ce Add function to get all devices 2022-05-01 13:13:31 +02:00
7064d6f615 Rename get_pp_dev to get_device to match python-mitel feature naming 2022-05-01 12:53:46 +02:00
50d9b79ad0 Fix documentation 2022-05-01 12:50:59 +02:00
49a4ecc07f Remove DictRequest type 2022-04-02 21:23:43 +02:00
7076eb1543 Expose connection object directly instead of proxying the request method 2022-04-02 21:10:05 +02:00
8f4a7b1f6d Introduce the different classes a bit more. 2022-01-17 14:00:47 +01:00
f05a985c5e Adding some documentation 2022-01-17 13:52:56 +01:00
e6f93150a0 Update API documentation 2022-01-17 12:29:14 +01:00
8d2e94f5f3 Move last components of session layer to client and remove session completely 2022-01-17 12:20:54 +01:00
80fb8cd7cc Replace send and recv in connection with a concurrent synchronous implementation of request 2022-01-17 00:09:16 +01:00
8501aab7c3 Handle message parsing in connection 2022-01-16 22:29:18 +01:00
1365226c00 Use thread for receiving data 2022-01-16 22:17:28 +01:00
a8d9afe3d1 Add ommsync mode 2022-01-16 13:58:38 +01:00
f1c340fb3d Add GetPPDev message 2022-01-16 13:52:08 +01:00
1b0a5caa1b Pass childs to childs attribute 2022-01-16 13:47:06 +01:00
c63ecf7781 Fix GetAccountResp data parsing 2022-01-06 23:58:59 +01:00
5d8affbb5f Actually pass response to exception object 2022-01-06 23:56:35 +01:00
9e1e7c3377 Fix data read loop from connection 2022-01-06 23:55:36 +01:00
a87893199c Use right var name 2022-01-06 23:55:01 +01:00
5c565bc62c Use request wrapper for session login 2022-01-06 23:54:40 +01:00
d14fb10ddb Import Open messages 2022-01-06 23:53:52 +01:00
3617494917 Default to unencrypted connection because the required old ssl protocols aren't supported anymore 2022-01-06 23:52:24 +01:00
f62579ce90 Add GetAccount 2022-01-06 18:13:48 +01:00
496b915c9d Stop on failed login 2022-01-06 17:59:12 +01:00
9f9df79ab8 Introduce exception handling 2022-01-06 17:28:19 +01:00
72f148f7b9 Add license and attribution 2022-01-06 16:20:46 +01:00
35 changed files with 1745 additions and 328 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
__pycache__ __pycache__
dist/
docs/_build docs/_build

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2022 clerie
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -1,6 +1,20 @@
# Mitel OMMClient2 # Mitel OMMClient2
Another attempt for a client library to the Mitel OM Application XML Interface. Another attempt for a modern client library to the Mitel OM Application XML Interface.
## Install
Without any additional dependencies:
```
pip install "mitel_ommclient2 @ git+https://git.clerie.de/clerie/mitel_ommclient2.git@main"
```
Add dependencies to enable secret handling, if you need it.
```
pip install "mitel_ommclient2[crypt] @ git+https://git.clerie.de/clerie/mitel_ommclient2.git@main"
```
## Quicksart ## Quicksart
@ -16,10 +30,22 @@ c = mitel_ommclient2.OMMClient2("omm.local", "admin", "admin")
c.ping() c.ping()
# Create custom messages # Create custom messages
r = c.session.request(mitel_ommclient2.messages.Ping(timeStamp=2342)) m = mitel_ommclient2.messages.Ping()
m.timeStamp = 2342
# Craft your own request, if it is not implemented yet r = c.connection.request(m)
r = c.session.request(mitel_ommclient2.messages.DictRequest("Ping", {"timeStamp": 2342}))
``` ```
Consult class documentation for more in depth examples and options. Consult class documentation for more in depth examples and options.
## Interactive CLI
The package installs a script called `ommclient2`.
This allowes basic interactive testing of the library.
```
ommclient2 --help
```
## Attribution
This software is inspired by `python-mitel` by Thomas and n-st.

7
docs/api/client.rst Normal file
View File

@ -0,0 +1,7 @@
mitel\_ommclient2.client module
===============================
.. automodule:: mitel_ommclient2.client
:members:
:undoc-members:
:show-inheritance:

7
docs/api/connection.rst Normal file
View File

@ -0,0 +1,7 @@
mitel\_ommclient2.connection module
===================================
.. automodule:: mitel_ommclient2.connection
:members:
:undoc-members:
:show-inheritance:

7
docs/api/exceptions.rst Normal file
View File

@ -0,0 +1,7 @@
mitel\_ommclient2.exceptions module
===================================
.. automodule:: mitel_ommclient2.exceptions
:members:
:undoc-members:
:show-inheritance:

12
docs/api/index.rst Normal file
View File

@ -0,0 +1,12 @@
.. _api:
API Documentation
=================
.. toctree::
:maxdepth: 4
client
connection
exceptions
messages

View File

@ -1,29 +1,32 @@
mitel\_ommclient2.messages package mitel\_ommclient2.messages package
================================== ==================================
Submodules .. automodule:: mitel_ommclient2.messages
---------- :members:
:undoc-members:
:show-inheritance:
mitel\_ommclient2.messages.open module .. automodule:: mitel_ommclient2.messages.getaccount
-------------------------------------- :members:
:undoc-members:
:show-inheritance:
.. automodule:: mitel_ommclient2.messages.getppdev
:members:
:undoc-members:
:show-inheritance:
.. automodule:: mitel_ommclient2.messages.getppuser
:members:
:undoc-members:
:show-inheritance:
.. automodule:: mitel_ommclient2.messages.open .. automodule:: mitel_ommclient2.messages.open
:members: :members:
:undoc-members: :undoc-members:
:show-inheritance: :show-inheritance:
mitel\_ommclient2.messages.ping module
--------------------------------------
.. automodule:: mitel_ommclient2.messages.ping .. automodule:: mitel_ommclient2.messages.ping
:members: :members:
:undoc-members: :undoc-members:
:show-inheritance: :show-inheritance:
Module contents
---------------
.. automodule:: mitel_ommclient2.messages
:members:
:undoc-members:
:show-inheritance:

View File

@ -1,45 +0,0 @@
mitel\_ommclient2 package
=========================
Subpackages
-----------
.. toctree::
:maxdepth: 4
mitel_ommclient2.messages
Submodules
----------
mitel\_ommclient2.client module
-------------------------------
.. automodule:: mitel_ommclient2.client
:members:
:undoc-members:
:show-inheritance:
mitel\_ommclient2.connection module
-----------------------------------
.. automodule:: mitel_ommclient2.connection
:members:
:undoc-members:
:show-inheritance:
mitel\_ommclient2.session module
--------------------------------
.. automodule:: mitel_ommclient2.session
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: mitel_ommclient2
:members:
:undoc-members:
:show-inheritance:

View File

@ -1,9 +0,0 @@
.. _api:
mitel_ommclient2
================
.. toctree::
:maxdepth: 4
mitel_ommclient2

View File

@ -6,11 +6,15 @@
Welcome to Mitel OMMClient2's documentation! Welcome to Mitel OMMClient2's documentation!
============================================ ============================================
This is the documentation for mitel_ommclient2. To get started stick to the :doc:`manual/client`.
.. toctree:: .. toctree::
:maxdepth: 2 :maxdepth: 2
:caption: Contents:
:ref:`api` manual/client
manual/connection
manual/messages
api/index
Indices and tables Indices and tables
================== ==================

43
docs/manual/client.rst Normal file
View File

@ -0,0 +1,43 @@
Client Usage Manual
===================
This should give you an introduction on how to use :class:`mitel_ommclient2.client.OMMClient2`.
If you are not interested in using the abstraction layer, even though it is recommended,
have a look at :doc:`/manual/connection` .
Creating a client
-----------------
To start with this client, you need login credentials for your OMM. The permissions
you have using this client are the ones assinged to the user you login with.
You are required to specify at least the host to connect to, a username and a corresponding
password. For further options see :class:`mitel_ommclient2.client.OMMClient2`.
.. code-block:: python
import mitel_ommclient2
c = mitel_ommclient2.OMMClient2("omm.local", "admin", "password")
Creating this object will directly connect to the API of the corresponding host.
Failure in connection or authenticating will raise an excaption.
Using the API
-------------
:class:`mitel_ommclient2.client.OMMClient2` ships with several mathods that wraps and
validate common API requests. See class documentation to get and overview and options.
.. code-block:: python
c.ping()
Making custom requests
----------------------
:class:`mitel_ommclient2.client.OMMClient2` holds its :class:`mitel_ommclient2.connection.Connection`
in the connection attribute.
Use :func:`mitel_ommclient2.connection.Connection.request` directly for making some custom requests.
See :doc:`/manual/connection` about using this.

View File

@ -0,0 +1,74 @@
Connection Usage Manual
=======================
This manual documents the underlying connation infrastructure. If you just wanna
use the API, please see :doc:`/manual/client` . Ideally you don't use to use this
class directly, except for expanding the clients functionality.
Using connections
-----------------
The :class:`mitel_ommclient2.connection.Connection` requires just host and port
to establish a transport to the API.
.. code-block:: python
import mitel_ommclient2
conn = mitel_ommclient2.connection.Connection("omm.local")
To actually connect to the OMM, you need to call :func:`mitel_ommclient2.connection.Connection.connect`.
.. code-block:: python
conn.connect()
This establishes a connections and spawns a thread that reads new messages from
the connection.
Please use :func:`mitel_ommclient2.connection.Connection.close` when finishing
with talking to the API.
.. code-block:: python
conn.close()
This stops the thread and closes the connection.
Making requests
---------------
:func:`mitel_ommclient2.connection.Connection.request` provides a synchronous way
to work with the asynchronous API of the OMM.
You hand over a Request object and receive a response object.
.. code-block:: python
>>> m = mitel_ommclient2.messages.Ping()
>>> r = conn.request(m)
>>> r.name
'PingResp'
Request will generate an internal sequence number and attach this to you request
object. After sending you request to the OMM it will wait for a response with the
corresponding sequence number. Please note: Even though you can set your own sequence
number in the request object, it will be overridden by :func:`mitel_ommclient2.connection.Connection.request`.
The response object will contain the sequence number generated by :func:`mitel_ommclient2.connection.Connection.request`
and not the one set by your own.
See :doc:`/manual/messages` on how to work with message objects.
Authenticate
------------
Before you can send general requests, you need to authenticate youself agains the
OMM. The only allowed message on a new connection is :func:`mitel_ommclient2.messages.Open`.
.. code-block:: python
>>> r = conn.request(mitel_ommclient2.messages.Open("username", "password"))
>>> r.raise_on_error()
If this throws no exception, login is was successful and you can send other requests.
If your authentication request failed, you can just send a new Open message to try again.

45
docs/manual/messages.rst Normal file
View File

@ -0,0 +1,45 @@
Message Usage Manual
====================
The API consists of three main message types: request, response and event. They
are represented by :class:`mitel_ommclient2.messages.Request`, :class:`mitel_ommclient2.messages.Response`
and events aren't supported yet.
There are several subclasses for each messages type, which provide a conveinient
interface to message content using attributes.
For each message you can access each field directly as class attributes.
There are two special attributes:
* name: returns the message name
* childs: allowes you to access childs by the child name as class attributes
Using messages
--------------
Just choose one of several message classes of :module:`mitel_ommclient2.messages`
and hand it over to :func:`mitel_ommclient2.client.OMMClient2.request` or
:func:`mitel_ommclient2.connection.Connection.request`.
.. code-block:: python
import time
my_time = int(time.time())
m = mitel_ommclient2.messages.Ping()
m.timeStamp = my_time
r = c.request(m)
ping = r.timeStamp - my_time
A more complex example
----------------------
This demonstrates how to access message childs.
.. code-block:: python
m = messages.GetAccount()
m.id = id
r = self.connection.request(m)
return r.childs.account[0]

27
flake.lock Normal file
View File

@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1665732960,
"narHash": "sha256-WBZ+uSHKFyjvd0w4inbm0cNExYTn8lpYFcHEes8tmec=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "4428e23312933a196724da2df7ab78eb5e67a88e",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

49
flake.nix Normal file
View File

@ -0,0 +1,49 @@
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
};
outputs = { self, nixpkgs, ... }: {
packages.x86_64-linux = let
pkgs = import nixpkgs {
system = "x86_64-linux";
};
in {
mitel-ommclient2 = pkgs.python3Packages.buildPythonPackage rec {
pname = "mitel-ommclient2";
version = "0.0.1";
src = ./.;
outputs = [
"out"
"doc"
];
nativeBuildInputs = [
pkgs.python3Packages.sphinxHook
];
format = "pyproject";
buildInputs = [ pkgs.python3Packages.hatchling ];
propagatedBuildInputs = [ pkgs.python3Packages.rsa ];
pythonImportsCheck = [ "mitel_ommclient2" ];
};
default = self.packages.x86_64-linux.mitel-ommclient2;
};
apps.x86_64-linux = {
ommclient2 = {
type = "app";
program = self.packages.x86_64-linux.mitel-ommclient2 + "/bin/ommclient2";
};
default = self.apps.x86_64-linux.ommclient2;
};
hydraJobs = {
inherit (self)
packages;
};
};
}

View File

@ -3,6 +3,5 @@
from . import client from . import client
from . import connection from . import connection
from . import messages from . import messages
from . import session
from .client import OMMClient2 from .client import OMMClient2

189
mitel_ommclient2/cli.py Executable file
View File

@ -0,0 +1,189 @@
#!/usr/bin/env python3
import argparse
import getpass
import time
import traceback
from . import OMMClient2
from .exceptions import EAuth, ENoEnt
from .messages import GetAccount, Ping
# exit handling with argparse is a bit broken even with exit_on_error=False, so we hack this
def error_instead_exit(self, message):
raise argparse.ArgumentError(None, message)
argparse.ArgumentParser.error = error_instead_exit
def format_child_type(t):
return " {}\n{}".format(t.__class__.__name__, "\n".join(["{:<30} {}".format(key, value) for key, value in t._attrs.items()]))
def format_list(v):
return "\n\n\n\n".join(format_child_type(d) for d in v)
return fl
def main():
connect_parser = argparse.ArgumentParser(prog='ommclient2')
connect_parser.add_argument("-n", dest="hostname", default="127.0.0.1")
connect_parser.add_argument("-u", dest="username", default="omm")
connect_parser.add_argument("-p", dest="password")
connect_parser.add_argument("--ommsync", dest="ommsync", action='store_true', help="Log in with ommsync mode")
connect_parser.add_argument("subcommand", nargs="*")
args = connect_parser.parse_args()
hostname = args.hostname
username = args.username
password = args.password
ommsync = args.ommsync
subcommand = args.subcommand
if not password:
password = getpass.getpass(prompt="OMM password for {}@{}:".format(username, hostname))
try:
c = OMMClient2(hostname, username, password, ommsync=ommsync)
except EAuth:
print("Authentication failed")
exit(1)
except TimeoutError:
print("OMM unreachable")
exit(1)
parser = argparse.ArgumentParser(prog="ommclient2", add_help=False, exit_on_error=False)
subparsers = parser.add_subparsers()
def add_parser(command_name, func, format=None, args={}):
subp = subparsers.add_parser(command_name, help=func.__doc__.strip().split("\n")[0], description=func.__doc__)
if format is not None:
subp.set_defaults(func=func, format=format)
else:
subp.set_defaults(func=func)
for a, t in args.items():
subp.add_argument(a, type=t)
return subp
parser_exit = subparsers.add_parser("exit")
parser_exit.set_defaults(func=exit)
parser_get_account = add_parser("attach_user_device", func=c.attach_user_device, format=format_list, args={
"uid": int,
"ppn": int,
})
parser_get_account = add_parser("create_user", func=c.create_user, format=format_child_type, args={
"num": str,
})
parser_get_account = add_parser("detach_user_device", func=c.detach_user_device, format=format_list, args={
"uid": int,
"ppn": int,
})
parser_get_account = add_parser("detach_user_device_by_device", func=c.detach_user_device_by_device, format=format_list, args={
"ppn": int,
})
parser_get_account = add_parser("detach_user_device_by_user", func=c.detach_user_device_by_user, format=format_list, args={
"uid": int,
})
parser_get_account = add_parser("encrypt", func=c.encrypt, args={
"secret": str,
})
parser_get_account = add_parser("get_account", func=c.get_account, format=format_child_type, args={
"id": int,
})
parser_get_account = add_parser("get_device", func=c.get_device, format=format_child_type, args={
"ppn": int,
})
parser_get_account = add_parser("get_devices", func=c.get_devices, format=format_list)
parser_get_account = add_parser("get_publickey", func=c.get_publickey)
parser_get_account = add_parser("get_user", func=c.get_user, format=format_child_type, args={
"uid": int,
})
parser_get_account = add_parser("get_users", func=c.get_users, format=format_list)
parser_help = subparsers.add_parser("help")
parser_help.set_defaults(func=parser.format_help)
parser_ping = subparsers.add_parser("ping")
parser_ping.set_defaults(func=lambda *args, **kwargs: "pong" if c.ping(*args, **kwargs) else "error")
parser_get_account = add_parser("set_user_name", func=c.set_user_name, args={
"uid": int,
"name": str,
})
parser_get_account = add_parser("set_user_num", func=c.set_user_num, args={
"uid": int,
"num": str,
})
parser_get_account = add_parser("set_user_relation_dynamic", func=c.set_user_relation_dynamic, args={
"uid": int,
})
parser_get_account = add_parser("set_user_relation_fixed", func=c.set_user_relation_fixed, args={
"uid": int,
})
parser_get_account = add_parser("set_user_sipauth", func=c.set_user_sipauth, args={
"uid": int,
"sipAuthId": str,
"sipPw": str,
})
if subcommand:
try:
args = parser.parse_args(subcommand)
except argparse.ArgumentError as e:
print("argument error:", e.message)
exit(1)
v = dict(vars(args))
v.pop("func")
format = lambda r: r
if v.get("format") is not None:
format = v.get("format")
v.pop("format")
try:
r = args.func(**v)
except Exception as e:
print("".join(traceback.format_exception(type(e), e, e.__traceback__)))
exit(1)
print(format(r))
exit()
print("OMMClient")
parser.print_help()
while True:
i = input("> ").split()
try:
args = parser.parse_args(i)
except argparse.ArgumentError as e:
print("argument error:", e.message)
continue
v = dict(vars(args))
v.pop("func")
format = lambda r: r
if v.get("format") is not None:
format = v.get("format")
v.pop("format")
try:
r = args.func(**v)
except Exception as e:
print("".join(traceback.format_exception(type(e), e, e.__traceback__)))
continue
print(format(r))
if __name__ == "__main__":
main()

View File

@ -1,7 +1,16 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from .session import Session import base64
try:
# This is is only dependency not from the modules inlcuded in python by default, so we make it optional
import rsa
except ImportError:
rsa = None
from .connection import Connection
from . import exceptions
from . import messages from . import messages
from . import types
class OMMClient2: class OMMClient2:
""" """
@ -13,29 +22,284 @@ class OMMClient2:
:param host: Hostname or IP address of the OMM :param host: Hostname or IP address of the OMM
:param username: Username :param username: Username
:param password: Password :param password: Password
:param session: A :class:`mitel_ommclient2.session.Session` object :param port: Port where to access the API, if None, use default value
:param ommsync: If True login as OMM-Sync client. Some operations in OMM-Sync mode might lead to destroy DECT paring.
Usage:: Usage::
>>> c = OMMClient2("omm.local", "admin", "admin") >>> c = OMMClient2("omm.local", "admin", "admin")
>>> c.ping() >>> c.ping()
Use session for not implemented features:: Use request to send custom messages::
>>> r = s.session.request(mitel_ommclient2.messages.Ping()) >>> r = s.connection.request(mitel_ommclient2.messages.Ping())
To get more contol over the connection handling, initialize
:class:`mitel_ommclient2.session.Session` manually::
>>> s = mitel_ommclient2.session.Session("omm.local", "admin", "admin", port=12345)
>>> c = OMMClient2(session=s)
""" """
def __init__(self, host=None, username=None, password=None, session=None): def __init__(self, host, username, password, port=None, ommsync=False):
if session is None: self._host = host
self.session = Session(host, username, password) self._username = username
else: self._password = password
self.session = session self._port = port
self._ommsync = ommsync
# prepare connect arguments
kwargs = {}
if self._port is not None:
kwargs["port"] = self._port
# Connect
self.connection = Connection(self._host, **kwargs)
self.connection.connect()
# Login
m = messages.Open()
m.username = self._username
m.password = self._password
if self._ommsync:
m.UserDeviceSyncClient = "true"
r = self.connection.request(m)
r.raise_on_error()
def attach_user_device(self, uid, ppn):
"""
Attach user to device
:param uid: User id
:param ppn: Device id
Requires ommsync=True
"""
t_u = types.PPUserType()
t_u.uid = uid
t_u.ppn = ppn
t_u.relType = types.PPRelTypeType("Dynamic")
t_d = types.PPDevType()
t_d.ppn = ppn
t_d.uid = uid
t_d.relType = types.PPRelTypeType("Dynamic")
m = messages.SetPP()
m.childs.user = [t_u]
m.childs.pp = [t_d]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0], r.childs.pp[0]
def create_user(self, num):
"""
Create PP user
:param num: User number
"""
t = types.PPUserType()
t.num = num
m = messages.CreatePPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]
def detach_user_device(self, uid, ppn):
"""
Detach user from device
:param uid: User id
:param ppn: Device id
Requires ommsync=True
"""
t_u = types.PPUserType()
t_u.uid = uid
t_u.ppn = 0
t_u.relType = types.PPRelTypeType("Unbound")
t_d = types.PPDevType()
t_d.ppn = ppn
t_d.uid = 0
t_d.relType = types.PPRelTypeType("Unbound")
m = messages.SetPP()
m.childs.user = [t_u]
m.childs.pp = [t_d]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0], r.childs.pp[0]
def detach_user_device_by_user(self, uid):
"""
Detach user from device
This just requires the user id
:param uid: User id
Requires ommsync=True
"""
u = self.get_user(uid)
return self.detach_user_device(uid, u.ppn)
def detach_user_device_by_device(self, ppn):
"""
Detach user from device
This just requires the device id
:param ppn: Device id
Requires ommsync=True
"""
d = self.get_device(ppn)
return self.detach_user_device(d.uid, ppn)
def encrypt(self, secret):
"""
Encrypt secret for OMM
Required rsa module to be installed
:param secret: String to encrypt
"""
if rsa is None:
raise Exception("rsa module is required for excryption")
publickey = self.get_publickey()
pubkey = rsa.PublicKey(*publickey)
byte_secret = secret.encode('utf8')
byte_encrypt = rsa.encrypt(byte_secret, pubkey)
encrypt = base64.b64encode(byte_encrypt).decode("utf8")
return encrypt
def find_devices(self, filter):
"""
Get all devices matching a filter
:param filter: function taking one parameter which is a device, returns True to keep, False to discard
Usage::
>>> c.find_devices(lambda d: d.relType == mitel_ommclient2.types.PPRelTypeType("Unbound"))
"""
for d in self.get_devices():
if filter(d):
yield d
def find_users(self, filter):
"""
Get all users matching a filter
:param filter: function taking one parameter which is a user, returns True to keep, False to discard
Usage::
>>> c.find_users(lambda u: u.num.startswith("9998"))
"""
for u in self.get_users():
if filter(u):
yield u
def get_account(self, id):
"""
Get account
:param id: User id
"""
m = messages.GetAccount()
m.id = id
r = self.connection.request(m)
r.raise_on_error()
if r.childs.account is None:
return None
return r.childs.account[0]
def get_device(self, ppn):
"""
Get PP device
:param ppn: Device id
"""
m = messages.GetPPDev()
m.ppn = ppn
r = self.connection.request(m)
r.raise_on_error()
if r.childs.pp is None:
return None
return r.childs.pp[0]
def get_devices(self):
"""
Get all PP devices
"""
next_ppn = 0
while True:
m = messages.GetPPDev()
m.ppn = next_ppn
m.maxRecords = 20
r = self.connection.request(m)
try:
r.raise_on_error()
except exceptions.ENoEnt:
# No more devices to fetch
break
# Output all found devices
for pp in r.childs.pp:
yield pp
# Determine next possible ppn
next_ppn = int(pp.ppn) + 1
def get_publickey(self):
"""
Get public key for encrypted values
"""
m = messages.GetPublicKey()
r = self.connection.request(m)
r.raise_on_error()
return int(r.modulus, 16), int(r.exponent, 16)
def get_user(self, uid):
"""
Get PP user
:param uid: User id
"""
m = messages.GetPPUser()
m.uid = uid
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]
def get_users(self):
"""
Get all PP users
"""
next_uid = 0
while True:
m = messages.GetPPUser()
m.uid = next_uid
m.maxRecords = 20
r = self.connection.request(m)
try:
r.raise_on_error()
except exceptions.ENoEnt:
# No more devices to fetch
break
# Output all found devices
for user in r.childs.user:
yield user
# Determine next possible ppn
next_uid = int(user.uid) + 1
def ping(self): def ping(self):
""" """
@ -44,7 +308,87 @@ class OMMClient2:
Returns `True` when response is received. Returns `True` when response is received.
""" """
r = self.session.request(messages.Ping()) r = self.connection.request(messages.Ping())
if r.errCode is None: if r.errCode is None:
return True return True
return False return False
def set_user_name(self, uid, name):
"""
Set PP user name
:param uid: User id
:param name: User name
"""
t = types.PPUserType()
t.uid = uid
t.name = name
m = messages.SetPPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]
def set_user_num(self, uid, num):
"""
Set PP user number
:param uid: User id
:param num: User number
"""
t = types.PPUserType()
t.uid = uid
t.num = num
m = messages.SetPPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]
def set_user_relation_dynamic(self, uid):
"""
Set PP user to PP device relation to dynamic type
:param uid: User id
"""
m = messages.SetPPUserDevRelation()
m.uid = uid
m.relType = types.PPRelTypeType("Dynamic")
r = self.connection.request(m)
r.raise_on_error()
def set_user_relation_fixed(self, uid):
"""
Set PP user to PP device relation to fixed type
:param uid: User id
"""
m = messages.SetPPUserDevRelation()
m.uid = uid
m.relType = types.PPRelTypeType("Fixed")
r = self.connection.request(m)
r.raise_on_error()
def set_user_sipauth(self, uid, sipAuthId, sipPw):
"""
Set PP user sip credentials
:param uid: User id
:param sipAuthId: SIP user name
:param sipPw: Plain text password
"""
t = types.PPUserType()
t.uid = uid
t.sipAuthId = sipAuthId
t.sipPw = self.encrypt(sipPw)
m = messages.SetPPUser()
m.childs.user = [t]
r = self.connection.request(m)
r.raise_on_error()
if r.childs.user is None:
return None
return r.childs.user[0]

View File

@ -1,8 +1,12 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import queue
import select
import socket import socket
import ssl import ssl
import threading
from . import messages
class Connection: class Connection:
""" """
@ -22,7 +26,12 @@ class Connection:
self._host = host self._host = host
self._port = port self._port = port
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._recv_buffer = "" self._socket.settimeout(2)
self._seq = 0 # state of the sequence number generator
self._requests = {} # waiting area for pending responses
self._close = False
def connect(self): def connect(self):
""" """
@ -30,46 +39,107 @@ class Connection:
""" """
self._socket.connect((self._host, self._port)) self._socket.connect((self._host, self._port))
self._socket.setblocking(False)
def send(self, message): threading.Thread(target=self._receive_loop, daemon=True).start()
def _receive_loop(self):
""" """
Sends message string Receives messages from socket and associates them to the responding request
:param message: Message string This function is intended to be executed in thread.
""" """
recv_buffer = b""
while not self._close:
if select.select([self._socket], [], []) != ([], [], []):
# wait for data availiable
while True:
try:
# fill buffer with one message
data = self._socket.recv(1024)
except BlockingIOError:
continue
if not data:
# buffer is empty
break
recv_buffer += data
if b"\0" in recv_buffer:
# there is a full message in buffer, handle that first
break
if b"\0" not in recv_buffer:
# no new messages
break
# get one message from recv_buffer
message, buffer = recv_buffer.split(b"\0", 1)
recv_buffer = buffer
# parse the message
message = message.decode("utf-8")
response = messages.parse(message)
if response.seq in self._requests:
# if this response belongs to a request, we return it and resolve the lock
self._requests[response.seq]["response"] = response
self._requests[response.seq]["event"].set()
# else the message will be ignored
def _generate_seq(self):
"""
Returns new sequence number
This generates a number that tries to be unique during a session
"""
seq = self._seq
self._seq += 1
return seq
def request(self, request):
"""
Sends a request, waits for response and return response
:param request: Request object
Usage::
>>> r = c.request(mitel_ommclient2.messages.Ping())
>>> r.name
'PingResp'
"""
# generate new sequence number and attach to request
seq = self._generate_seq()
request.seq = seq
# add request to waiting area
self._requests[seq] = {
"event": threading.Event(),
}
# send request
message = messages.construct(request)
self._socket.send(message.encode("utf-8") + b"\0") self._socket.send(message.encode("utf-8") + b"\0")
def recv(self): # wait for response
""" self._requests[seq]["event"].wait()
Returns one message
Use multiple times to receive multiple messages # return reponse and remove from waiting area
""" return self._requests.pop(seq, {"response": None})["response"]
data = b""
while True:
new_data = self._socket.recv(65536)
if new_data is not None:
data += new_data
else:
break
self._recv_buffer += data.decode("utf-8")
if "\0" not in self._recv_buffer:
# no new messages
return None
message, buffer = self._recv_buffer.split("\0", 1)
self._recv_buffer = buffer
return message
def close(self): def close(self):
""" """
Shout down connection Shut down connection
""" """
self._close = True
return self._socket.close() return self._socket.close()
def __del__(self): def __del__(self):
@ -80,6 +150,10 @@ class SSLConnection(Connection):
""" """
Establishes a secure connection to the OM Application XML Interface Establishes a secure connection to the OM Application XML Interface
Please not that this class might be useless on your system since new
versions of OpenSSL don't ship with TLVv1.2 or lower anymore which are
the protocols supported by OMM.
:param host: Hostname or IP address of OMM :param host: Hostname or IP address of OMM
:param port: Port of the OM Application XML ssl TCP port :param port: Port of the OM Application XML ssl TCP port

View File

@ -0,0 +1,156 @@
#!/usr/bin/env python3
exception_classes = {}
def _collect_exception_class(c):
"""
Decorator that collects exception classes for parsing error codes.
"""
exception_classes[c.__name__] = c
return c
class OMResponseException(Exception):
def __init__(self, response, msg=None):
self.response = response
if msg is None:
msg = self.response.info
super().__init__(msg)
@_collect_exception_class
class EAreaFull(OMResponseException):
pass
@_collect_exception_class
class EAuth(OMResponseException):
pass
@_collect_exception_class
class EDectRegDomainInvalid(OMResponseException):
pass
@_collect_exception_class
class EEncryptNotAllowed(OMResponseException):
pass
@_collect_exception_class
class EExist(OMResponseException):
pass
@_collect_exception_class
class EFailed(OMResponseException):
pass
@_collect_exception_class
class EForbidden(OMResponseException):
pass
@_collect_exception_class
class EInProgress(OMResponseException):
pass
@_collect_exception_class
class EInval(OMResponseException):
def __init__(self, response):
super().__init__(response, response.bad)
@_collect_exception_class
class EInvalidChars(OMResponseException):
pass
@_collect_exception_class
class ELicense(OMResponseException):
pass
@_collect_exception_class
class ELicenseFile(OMResponseException):
pass
@_collect_exception_class
class ELicenseWrongInstallId(OMResponseException):
pass
@_collect_exception_class
class EMissing(OMResponseException):
def __init__(self, response):
super().__init__(response, response.bad)
@_collect_exception_class
class ENoEnt(OMResponseException):
pass
@_collect_exception_class
class ENoMem(OMResponseException):
pass
@_collect_exception_class
class EPerm(OMResponseException):
pass
@_collect_exception_class
class EPwEmpty(OMResponseException):
pass
@_collect_exception_class
class EPwSimilarToHost(OMResponseException):
pass
@_collect_exception_class
class EPwSimilarToName(OMResponseException):
pass
@_collect_exception_class
class EPwTooManySimilarChars(OMResponseException):
pass
@_collect_exception_class
class EPwTooShort(OMResponseException):
pass
@_collect_exception_class
class EPwTooSimilar(OMResponseException):
pass
@_collect_exception_class
class EPwTooWeak(OMResponseException):
pass
@_collect_exception_class
class EPwUnchanged(OMResponseException):
pass
@_collect_exception_class
class ETooLong(OMResponseException):
def __init__(self, response):
super().__init__(response, response.bad + ", maximum of " + str(response.maxLen))
@_collect_exception_class
class EWlanRegDomainInvalid(OMResponseException):
pass

View File

@ -2,85 +2,148 @@
from xml.dom.minidom import getDOMImplementation, parseString from xml.dom.minidom import getDOMImplementation, parseString
from ..exceptions import exception_classes, OMResponseException
from ..types import cast_dict_to_childtype
class Request:
class Message:
""" """
Request message class Base message class
:param name: Name of the message
:param seq: Unique sequence number to associate responses
Usage::
>>> req = Request("Ping")
"""
def __init__(self, name, seq=None):
self.name = name
self.attrs = {}
self.childs = {}
if seq is not None:
self.attrs["seq"] = seq
@property
def seq(self):
return self.attrs.get("seq")
class DictRequest(Request):
"""
Create a message by dict attributes
:param name: Name of the message
:param attrs: Message attributes
:param childs: Message children
Usage::
>>> req = DictRequest("Ping", {"timeStamp": 2342})
"""
def __init__(self, name, attrs={}, childs={}):
self.name = name
self.attrs = attrs
self.childs = attrs
class Response:
"""
Response message class
:param name: Name of the message :param name: Name of the message
:param attrs: Message attributes :param attrs: Message attributes
:param childs: Message children :param childs: Message children
""" """
def __init__(self, name, attrs={}, childs={}):
# Fields defined by the base type class
BASE_FIELDS = {}
# Fields defined by subclasses
FIELDS = {}
# Child types
CHILDS = {}
# Fields dicts consist of the field name as name and the field type as value
# Use None if the field type is unknown, any type is allowed then
class Childs:
"""
Contains message childs
"""
CHILDS = {}
def __init__(self, child_types, child_dict):
self.CHILDS = child_types
self._child_dict = child_dict
def __getattr__(self, name):
if name in self.CHILDS.keys():
return self._child_dict.get(name)
else:
raise AttributeError()
def __setattr__(self, name, value):
if name in self.CHILDS.keys():
if not isinstance(value, list):
raise TypeError()
for v in value:
if self.CHILDS[name] is not None and type(v) != self.CHILDS[name]:
raise TypeError()
self._child_dict[name] = value
else:
object.__setattr__(self, name, value)
def __init__(self, name=None, attrs={}, childs={}):
self.name = name self.name = name
self.attrs = attrs if not self.name:
self.childs = childs self.name = self.__class__.__name__
self._attrs = {} | attrs
self._childs = {} | childs
self.childs = self.Childs(self.CHILDS, self._childs)
@property def __getattr__(self, name):
def seq(self): fields = self.FIELDS | self.BASE_FIELDS
return self.attrs.get("seq") if name in fields.keys():
return self._attrs.get(name)
else:
raise AttributeError()
@property def __setattr__(self, name, value):
def errCode(self): fields = self.FIELDS | self.BASE_FIELDS
return self.attrs.get("errCode") if name in fields.keys():
if fields[name] is not None and type(value) != fields[name]:
raise TypeError()
self._attrs[name] = value
else:
object.__setattr__(self, name, value)
@property def __repr__(self):
def info(self): return "{}({}, {}, {})".format(self.__class__.__name__, repr(self.name), repr(self._attrs), repr(self._childs))
return self.attrs.get("info")
@property
def bad(self):
return self.attrs.get("bad")
@property
def maxLen(self):
return self.attrs.get("maxLen")
class Request(Message):
"""
Request message type class
"""
BASE_FIELDS = {
"seq": int,
}
class Response(Message):
"""
Response message type class
"""
BASE_FIELDS = {
"seq": int,
"errCode": None,
"info": None,
"bad": None,
"maxLen": None,
}
def raise_on_error(self):
"""
Raises an exception if the response contains an error.
Usage::
>>> try:
>>> r.raise_on_error()
>>> except mitel_ommclient2.exceptions.EAuth as e:
>>> print("We don't care about authentication!")
See children of :class:`mitel_ommclient2.exceptions.OMResponseException` for all possible exceptions.
"""
if self.errCode is not None:
raise exception_classes.get(self.errCode, OMResponseException)(response=self)
REQUEST_TYPES = {}
RESPONSE_TYPES = {}
def request_type(c):
REQUEST_TYPES[c.__name__] = c
return c
def response_type(c):
RESPONSE_TYPES[c.__name__] = c
return c
from .createppuser import CreatePPUser, CreatePPUserResp
from .getaccount import GetAccount, GetAccountResp
from .getppdev import GetPPDev, GetPPDevResp
from .getppuser import GetPPUser, GetPPUserResp
from .getpublickey import GetPublicKey, GetPublicKeyResp
from .open import Open, OpenResp
from .ping import Ping, PingResp from .ping import Ping, PingResp
from .setpp import SetPP, SetPPResp
from .setppuser import SetPPUser, SetPPUserResp
from .setppuserdevrelation import SetPPUserDevRelation, SetPPUserDevRelationResp
def construct(request): def construct(request):
""" """
@ -90,27 +153,19 @@ def construct(request):
message = impl.createDocument(None, request.name, None) message = impl.createDocument(None, request.name, None)
root = message.documentElement root = message.documentElement
for k, v in request.attrs.items(): for k, v in request._attrs.items():
root.setAttribute(str(k), str(v)) root.setAttribute(str(k), str(v))
for k, v in request.childs.items(): for child_name, child_list in request._childs.items():
child = message.createElement(k) if child_list is not None:
if v is not None: for child_list_item in child_list:
for c_k, c_v in v.items(): child = message.createElement(child_name)
child.setAttribute(str(c_k), str(c_v)) for child_item_key, child_item_value in child_list_item._attrs.items():
root.appendChild(child) child.setAttribute(str(child_item_key), str(child_item_value))
root.appendChild(child)
return root.toxml() return root.toxml()
def _response_type_by_name(name):
response_types = [
PingResp,
]
response_types_dict = {r.__name__: r for r in response_types}
return response_types_dict.get(name, Response)
def parse(message): def parse(message):
message = parseString(message) message = parseString(message)
root = message.documentElement root = message.documentElement
@ -119,9 +174,15 @@ def parse(message):
attrs = {} attrs = {}
childs = {} childs = {}
response_type = RESPONSE_TYPES.get(name)
fields = response_type.FIELDS | response_type.BASE_FIELDS
for i in range(0, root.attributes.length): for i in range(0, root.attributes.length):
item = root.attributes.item(i) item = root.attributes.item(i)
attrs[item.name] = item.value if fields.get(item.name) is not None:
attrs[item.name] = fields[item.name](item.value)
else:
attrs[item.name] = item.value
child = root.firstChild child = root.firstChild
while child is not None: while child is not None:
@ -131,6 +192,11 @@ def parse(message):
new_child[item.name] = item.value new_child[item.name] = item.value
childname = child.tagName childname = child.tagName
# cast dict into child type
if response_type.CHILDS.get(childname) is not None:
new_child = cast_dict_to_childtype(response_type.CHILDS[childname], new_child)
if childname in childs: if childname in childs:
childs[childname].append(new_child) childs[childname].append(new_child)
else: else:
@ -139,4 +205,4 @@ def parse(message):
child = child.nextSibling child = child.nextSibling
return _response_type_by_name(name)(name, attrs, childs) return response_type(name, attrs, childs)

View File

@ -0,0 +1,18 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPUserType
@request_type
class CreatePPUser(Request):
CHILDS = {
"user": PPUserType,
}
@response_type
class CreatePPUserResp(Response):
CHILDS = {
"user": PPUserType,
}

View File

@ -0,0 +1,19 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import AccountType
@request_type
class GetAccount(Request):
FIELDS = {
"id": int,
"maxRecords": int,
}
@response_type
class GetAccountResp(Response):
CHILDS = {
"account": AccountType,
}

View File

@ -0,0 +1,19 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPDevType
@request_type
class GetPPDev(Request):
FIELDS = {
"ppn": int,
"maxRecords": int,
}
@response_type
class GetPPDevResp(Response):
CHILDS = {
"pp": PPDevType,
}

View File

@ -0,0 +1,19 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPUserType
@request_type
class GetPPUser(Request):
FIELDS = {
"uid": int,
"maxRecords": int,
}
@response_type
class GetPPUserResp(Response):
CHILDS = {
"user": PPUserType,
}

View File

@ -0,0 +1,16 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
@request_type
class GetPublicKey(Request):
pass
@response_type
class GetPublicKeyResp(Response):
FIELDS = {
"modulus": str,
"exponent": str,
}

View File

@ -1,40 +1,23 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from . import Request, Response from . import Request, Response, request_type, response_type
@request_type
class Open(Request): class Open(Request):
def __init__(self, username, password, **kwargs): FIELDS = {
super().__init__("Open", **kwargs) "username": None,
"password": None,
"UserDeviceSyncClient": None,
}
self.attrs["username"] = username
self.attrs["password"] = password
@property
def username(self):
return self.attrs.get("username")
@property
def password(self):
return self.attrs.get("password")
@response_type
class OpenResp(Response): class OpenResp(Response):
@property FIELDS = {
def protocolVersion(self): "protocolVersion": None,
return self.attrs.get("protocolVersion") "minPPSwVersion1": None,
"minPPSwVersion2": None,
@property "ommStbState": None,
def minPPSwVersion1(self): "publicKey": None,
return self.attrs.get("minPPSwVersion1") }
@property
def minPPSwVersion2(self):
return self.attrs.get("minPPSwVersion2")
@property
def ommStbState(self):
return self.attrs.get("ommStbState")
@property
def publicKey(self):
return self.attrs.get("publicKey")

View File

@ -1,20 +1,17 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from . import Request, Response from . import Request, Response, request_type, response_type
@request_type
class Ping(Request): class Ping(Request):
def __init__(self, timeStamp=None, **kwargs): FIELDS = {
super().__init__("Ping", **kwargs) "timeStamp": int,
}
if timeStamp is not None:
self.attrs["timeStamp"] = timeStamp
@property
def timeStamp(self):
return self.attrs.get("timeStamp")
@response_type
class PingResp(Response): class PingResp(Response):
@property FIELDS = {
def timeStamp(self): "timeStamp": int,
return self.attrs.get("timeStamp") }

View File

@ -0,0 +1,20 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPDevType, PPUserType
@request_type
class SetPP(Request):
CHILDS = {
"pp": PPDevType,
"user": PPUserType,
}
@response_type
class SetPPResp(Response):
CHILDS = {
"pp": PPDevType,
"user": PPUserType,
}

View File

@ -0,0 +1,18 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPUserType
@request_type
class SetPPUser(Request):
CHILDS = {
"user": PPUserType,
}
@response_type
class SetPPUserResp(Response):
CHILDS = {
"user": PPUserType,
}

View File

@ -0,0 +1,20 @@
#!/usr/bin/env python3
from . import Request, Response, request_type, response_type
from ..types import PPRelTypeType
@request_type
class SetPPUserDevRelation(Request):
FIELDS = {
"uid": int,
"relType": PPRelTypeType,
}
@response_type
class SetPPUserDevRelationResp(Response):
FIELDS = {
"uid": int,
"relType": PPRelTypeType,
}

View File

@ -1,80 +0,0 @@
#!/usr/bin/env python3
from time import sleep
from . import connection
from . import messages
class Session:
"""
Synchronous API session handler
:param host: Hostname or IP address of OMM
:param username: Username
:param password: Password
:param port: Port
:param connection_class: One of :class:`mitel_ommclient2.connection.Connection` or :class:`mitel_ommclient2.connection.SSLConnection`
Usage::
>>> s = Session("omm.local", "admin", "admin")
>>> s.request(mitel_ommclient2.messages.Ping())
"""
def __init__(self, host, username, password, port=None, connection_class=None):
self.host = host
self.username = username
self.password = password
self.port = port
self.connection_class = connection_class
if self.connection_class is None:
self.connection_class = connection.SSLConnection
self._connection = None
self._ensure_connection()
def _wait_for_respose(self):
"""
Wait until data got received and return message string
"""
while True:
r = self.connection.recv()
if r is not None:
return r
sleep(0.1)
def _ensure_connection(self):
"""
Make sure we are connected and logged in
"""
if self._connection is None:
kwargs = {}
if self.port is not None:
kwargs["port"] = self.port
self._connection = self.connection_class(self.host, **kwargs)
self._connection.connect()
self._connection.send(messages.Open(self.username, self.password))
res = self._wait_for_respose()
def request(self, request):
"""
Sends a request and waits for response
:param request: Request object
Usage::
>>> r = s.request(mitel_ommclient2.messages.Ping())
>>> r.name
'PingResp'
"""
message = messages.construct(request)
self.connection.send(message)
res = self._wait_for_respose()
return messages.parse(res)

View File

@ -0,0 +1,238 @@
#!/usr/bin/env python3
class ChildType:
"""
Base type class
:param name: Name of the message
:param attrs: Message attributes
:param childs: Message children
"""
FIELDS = {}
def __init__(self, attrs={}):
self._attrs = {}
if self.FIELDS is not None:
for k, v in attrs.items():
setattr(self, k, v)
else:
# don't check attrs for types we do have any information
self._attrs = attrs
def __getattr__(self, name):
if name in self.FIELDS.keys():
return self._attrs.get(name)
else:
raise AttributeError()
def __setattr__(self, name, value):
if name in self.FIELDS.keys():
if self.FIELDS[name] is not None and type(value) != self.FIELDS[name]:
raise TypeError()
self._attrs[name] = value
else:
object.__setattr__(self, name, value)
def __repr__(self):
return "{}({})".format(self.__class__.__name__, repr(self._attrs))
def cast_dict_to_childtype(t, d):
errors = {} # collect unknown keys
for k, v in d.items():
if k in t.FIELDS.keys():
if t.FIELDS[k] is not None and type(v) != t.FIELDS[k]:
d[k] = t.FIELDS[k](v)
else:
errors[k] = v
if errors != {}:
raise KeyError("The following keys are unknown for '{}': {}".format(t.__name__, errors))
return t(d)
class EnumType:
VALUES = [] # Allowed values
def __init__(self, s):
if self.VALUES is not None:
if s in self.VALUES:
self.value = s
else:
raise ValueError()
else:
self.value = s
def __str__(self):
return str(self.value)
def __repr__(self):
return "{}({})".format(self.__class__.__name__, repr(self.value))
def __eq__(self, other):
return isinstance(other, type(self)) and self.value == other.value
class CallForwardStateType(EnumType):
VALUES = [
"Off",
"Busy",
"NoAnswer",
"BusyNoAnswer",
"All",
]
class DECTSubscriptionStateType(EnumType):
VALUES = None
class LanguageType(EnumType):
VALUES = None
class MonitoringStateType(EnumType):
VALUES = None
class PPRelTypeType(EnumType):
VALUES = [
"Fixed",
"Dynamic",
"Unbound",
]
class AccountType(ChildType):
FIELDS = {
"id": int,
"username": str,
"password": str,
"oldPassword": str,
"permission": None,
"active": bool,
"aging": None,
"expire": int,
"state": str,
}
class PPDevType(ChildType):
FIELDS = {
"ppn": int,
"timeStamp": int,
"relType": PPRelTypeType,
"uid": int,
"ipei": str,
"ac": str,
"s": DECTSubscriptionStateType,
"uak": str,
"encrypt": bool,
"capMessaging": bool,
"capMessagingForInternalUse": bool,
"capEnhLocating": bool,
"capBluetooth": bool,
"ethAddr": str,
"hwType": str,
"ppProfileCapability": bool,
"ppDefaultProfileLoaded": bool,
"subscribeToPARIOnly": bool,
# undocumented
"ommId": str,
"ommIdAck": str,
"timeStampAdmin": int,
"timeStampRelation": int,
"timeStampRoaming": int,
"timeStampSubscription": int,
"autoCreate": bool,
"roaming": None, # value: 'RoamingComplete'
"modicType": str, # value: '01'
"locationData": str, # value: '000001000000'
"dectIeFixedId": str,
"subscriptionId": str,
"ppnSec": int,
}
class PPUserType(ChildType):
FIELDS = {
"uid": int,
"timeStamp": int,
"relType": PPRelTypeType,
"ppn": int,
"name": str,
"num": str,
"hierarchy1": str,
"hierarchy2": str,
"addId": str,
"pin": str,
"sipAuthId": str,
"sipPw": str,
"sosNum": str,
"voiceboxNum": str,
"manDownNum": str,
"forwardState": CallForwardStateType,
"forwardTime": int,
"forwardDest": str,
"langPP": LanguageType,
"holdRingBackTime": int,
"autoAnswer": str,
"microphoneMute": str,
"warningTone": str,
"allowBargeIn": str,
"callWaitingDisabled": bool,
"external": bool,
"trackingActive": bool,
"locatable": bool,
"BTlocatable": bool,
"BTsensitivity": str,
"locRight": bool,
"msgRight": bool,
"sendVcardRight": bool,
"recvVcardRight": bool,
"keepLocalPB": bool,
"vip": bool,
"sipRegisterCheck": bool,
"allowVideoStream": bool,
"conferenceServerType": str,
"conferenceServerURI": str,
"monitoringMode": str,
"CUS": MonitoringStateType,
"HAS": MonitoringStateType,
"HSS": MonitoringStateType,
"HRS": MonitoringStateType,
"HCS": MonitoringStateType,
"SRS": MonitoringStateType,
"SCS": MonitoringStateType,
"CDS": MonitoringStateType,
"HBS": MonitoringStateType,
"BTS": MonitoringStateType,
"SWS": MonitoringStateType,
"credentialPw": str,
"configurationDataLoaded": bool,
"ppData": str,
"ppProfileId": int,
"fixedSipPort": int,
"calculatedSipPort": int,
# undocumented
"uidSec": int,
"permanent": bool,
"lang": None,
"autoLogoutOnCharge": bool,
"hotDeskingSupport": bool,
"authenticateLogout": bool,
"useSIPUserName": None,
"useSIPUserAuthentication": None,
"serviceUserName": None,
"serviceAuthName": None,
"serviceAuthPassword": None,
"keyLockEnable": None,
"keyLockPin": None,
"keyLockTime": None,
"ppnOld": int,
"timeStampAdmin": int,
"timeStampRelation": int,
}

30
pyproject.toml Normal file
View File

@ -0,0 +1,30 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "mitel_ommclient2"
version = "0.0.1"
authors = [
{ name="clerie", email="hallo@clerie.de" },
]
description = "Another attempt for a modern client library to the Mitel OM Application XML Interface."
readme = "README.md"
license = { file="LICENSE" }
requires-python = ">=3.7"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
[project.optional-dependencies]
crypt = [
"rsa",
]
[project.scripts]
ommclient2 = "mitel_ommclient2.cli:main"
[project.urls]
"Source" = "https://git.clerie.de/clerie/mitel_ommclient2"