Compare commits

..

14 Commits

9 changed files with 215 additions and 32 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
__pycache__ __pycache__
dist/
docs/_build docs/_build

View File

@@ -2,6 +2,20 @@
Another attempt for a modern client library to the Mitel OM Application XML Interface. Another attempt for a modern client library to the Mitel OM Application XML Interface.
## Install
Without any additional dependencies:
```
pip install "mitel_ommclient2 @ git+https://git.clerie.de/clerie/mitel_ommclient2.git@main"
```
Add dependencies to enable secret handling, if you need it.
```
pip install "mitel_ommclient2[crypt] @ git+https://git.clerie.de/clerie/mitel_ommclient2.git@main"
```
## Quicksart ## Quicksart
Just some examples to give you an idea what this does. Just some examples to give you an idea what this does.
@@ -23,6 +37,15 @@ r = c.connection.request(m)
Consult class documentation for more in depth examples and options. Consult class documentation for more in depth examples and options.
## Interactive CLI
The package installs a script called `ommclient2`.
This allowes basic interactive testing of the library.
```
ommclient2 --help
```
## Attribution ## Attribution
This software is inspired by `python-mitel` by Thomas and n-st. This software is inspired by `python-mitel` by Thomas and n-st.

27
flake.lock generated Normal file
View File

@@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1665732960,
"narHash": "sha256-WBZ+uSHKFyjvd0w4inbm0cNExYTn8lpYFcHEes8tmec=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "4428e23312933a196724da2df7ab78eb5e67a88e",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

49
flake.nix Normal file
View File

@@ -0,0 +1,49 @@
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
};
outputs = { self, nixpkgs, ... }: {
packages.x86_64-linux = let
pkgs = import nixpkgs {
system = "x86_64-linux";
};
in {
mitel-ommclient2 = pkgs.python3Packages.buildPythonPackage rec {
pname = "mitel-ommclient2";
version = "0.0.1";
src = ./.;
outputs = [
"out"
"doc"
];
nativeBuildInputs = [
pkgs.python3Packages.sphinxHook
];
format = "pyproject";
buildInputs = [ pkgs.python3Packages.hatchling ];
propagatedBuildInputs = [ pkgs.python3Packages.rsa ];
pythonImportsCheck = [ "mitel_ommclient2" ];
};
default = self.packages.x86_64-linux.mitel-ommclient2;
};
apps.x86_64-linux = {
ommclient2 = {
type = "app";
program = self.packages.x86_64-linux.mitel-ommclient2 + "/bin/ommclient2";
};
default = self.apps.x86_64-linux.ommclient2;
};
hydraJobs = {
inherit (self)
packages;
};
};
}

View File

@@ -1,20 +1,13 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from mitel_ommclient2 import OMMClient2
from mitel_ommclient2.exceptions import ENoEnt
from mitel_ommclient2.messages import GetAccount, Ping
import time
import argparse import argparse
import base64
import getpass import getpass
import time
import traceback import traceback
try: from . import OMMClient2
# This is is only dependency not from the modules inlcuded in python by default, so we make it optional from .exceptions import EAuth, ENoEnt
import rsa from .messages import GetAccount, Ping
except ImportError:
rsa = None
# exit handling with argparse is a bit broken even with exit_on_error=False, so we hack this # exit handling with argparse is a bit broken even with exit_on_error=False, so we hack this
def error_instead_exit(self, message): def error_instead_exit(self, message):
@@ -29,8 +22,7 @@ def format_list(v):
return fl return fl
if __name__ == "__main__": def main():
connect_parser = argparse.ArgumentParser(prog='ommclient2') connect_parser = argparse.ArgumentParser(prog='ommclient2')
connect_parser.add_argument("-n", dest="hostname", default="127.0.0.1") connect_parser.add_argument("-n", dest="hostname", default="127.0.0.1")
connect_parser.add_argument("-u", dest="username", default="omm") connect_parser.add_argument("-u", dest="username", default="omm")
@@ -48,17 +40,14 @@ if __name__ == "__main__":
if not password: if not password:
password = getpass.getpass(prompt="OMM password for {}@{}:".format(username, hostname)) password = getpass.getpass(prompt="OMM password for {}@{}:".format(username, hostname))
c = OMMClient2(hostname, username, password, ommsync=ommsync) try:
c = OMMClient2(hostname, username, password, ommsync=ommsync)
def encrypt(secret): except EAuth:
if rsa is None: print("Authentication failed")
raise Exception("rsa module is required for excryption") exit(1)
publickey = c.get_publickey() except TimeoutError:
pubkey = rsa.PublicKey(*publickey) print("OMM unreachable")
byte_secret = secret.encode('utf8') exit(1)
byte_encrypt = rsa.encrypt(byte_secret, pubkey)
encrypt = base64.b64encode(byte_encrypt).decode("utf8")
return encrypt
parser = argparse.ArgumentParser(prog="ommclient2", add_help=False, exit_on_error=False) parser = argparse.ArgumentParser(prog="ommclient2", add_help=False, exit_on_error=False)
subparsers = parser.add_subparsers() subparsers = parser.add_subparsers()
@@ -75,10 +64,6 @@ if __name__ == "__main__":
return subp return subp
parser_get_account = subparsers.add_parser("encrypt")
parser_get_account.add_argument("secret")
parser_get_account.set_defaults(func=encrypt)
parser_exit = subparsers.add_parser("exit") parser_exit = subparsers.add_parser("exit")
parser_exit.set_defaults(func=exit) parser_exit.set_defaults(func=exit)
@@ -104,6 +89,10 @@ if __name__ == "__main__":
"uid": int, "uid": int,
}) })
parser_get_account = add_parser("encrypt", func=c.encrypt, args={
"secret": str,
})
parser_get_account = add_parser("get_account", func=c.get_account, format=format_child_type, args={ parser_get_account = add_parser("get_account", func=c.get_account, format=format_child_type, args={
"id": int, "id": int,
}) })
@@ -195,3 +184,6 @@ if __name__ == "__main__":
print("".join(traceback.format_exception(type(e), e, e.__traceback__))) print("".join(traceback.format_exception(type(e), e, e.__traceback__)))
continue continue
print(format(r)) print(format(r))
if __name__ == "__main__":
main()

View File

@@ -1,5 +1,12 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import base64
try:
# This is is only dependency not from the modules inlcuded in python by default, so we make it optional
import rsa
except ImportError:
rsa = None
from .connection import Connection from .connection import Connection
from . import exceptions from . import exceptions
from . import messages from . import messages
@@ -147,6 +154,53 @@ class OMMClient2:
d = self.get_device(ppn) d = self.get_device(ppn)
return self.detach_user_device(d.uid, ppn) return self.detach_user_device(d.uid, ppn)
def encrypt(self, secret):
"""
Encrypt secret for OMM
Required rsa module to be installed
:param secret: String to encrypt
"""
if rsa is None:
raise Exception("rsa module is required for excryption")
publickey = self.get_publickey()
pubkey = rsa.PublicKey(*publickey)
byte_secret = secret.encode('utf8')
byte_encrypt = rsa.encrypt(byte_secret, pubkey)
encrypt = base64.b64encode(byte_encrypt).decode("utf8")
return encrypt
def find_devices(self, filter):
"""
Get all devices matching a filter
:param filter: function taking one parameter which is a device, returns True to keep, False to discard
Usage::
>>> c.find_devices(lambda d: d.relType == mitel_ommclient2.types.PPRelTypeType("Unbound"))
"""
for d in self.get_devices():
if filter(d):
yield d
def find_users(self, filter):
"""
Get all users matching a filter
:param filter: function taking one parameter which is a user, returns True to keep, False to discard
Usage::
>>> c.find_users(lambda u: u.num.startswith("9998"))
"""
for u in self.get_users():
if filter(u):
yield u
def get_account(self, id): def get_account(self, id):
""" """
@@ -325,12 +379,12 @@ class OMMClient2:
:param uid: User id :param uid: User id
:param sipAuthId: SIP user name :param sipAuthId: SIP user name
:param sipPw: Encrypted sip password :param sipPw: Plain text password
""" """
t = types.PPUserType() t = types.PPUserType()
t.uid = uid t.uid = uid
t.sipAuthId = sipAuthId t.sipAuthId = sipAuthId
t.sipPw = sipPw t.sipPw = self.encrypt(sipPw)
m = messages.SetPPUser() m = messages.SetPPUser()
m.childs.user = [t] m.childs.user = [t]
r = self.connection.request(m) r = self.connection.request(m)

View File

@@ -26,6 +26,7 @@ class Connection:
self._host = host self._host = host
self._port = port self._port = port
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(2)
self._seq = 0 # state of the sequence number generator self._seq = 0 # state of the sequence number generator
self._requests = {} # waiting area for pending responses self._requests = {} # waiting area for pending responses
@@ -55,8 +56,11 @@ class Connection:
if select.select([self._socket], [], []) != ([], [], []): if select.select([self._socket], [], []) != ([], [], []):
# wait for data availiable # wait for data availiable
while True: while True:
# fill buffer with one message try:
data = self._socket.recv(1024) # fill buffer with one message
data = self._socket.recv(1024)
except BlockingIOError:
continue
if not data: if not data:
# buffer is empty # buffer is empty

View File

@@ -72,6 +72,9 @@ class EnumType:
def __repr__(self): def __repr__(self):
return "{}({})".format(self.__class__.__name__, repr(self.value)) return "{}({})".format(self.__class__.__name__, repr(self.value))
def __eq__(self, other):
return isinstance(other, type(self)) and self.value == other.value
class CallForwardStateType(EnumType): class CallForwardStateType(EnumType):
VALUES = [ VALUES = [

30
pyproject.toml Normal file
View File

@@ -0,0 +1,30 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "mitel_ommclient2"
version = "0.0.1"
authors = [
{ name="clerie", email="hallo@clerie.de" },
]
description = "Another attempt for a modern client library to the Mitel OM Application XML Interface."
readme = "README.md"
license = { file="LICENSE" }
requires-python = ">=3.7"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
[project.optional-dependencies]
crypt = [
"rsa",
]
[project.scripts]
ommclient2 = "mitel_ommclient2.cli:main"
[project.urls]
"Source" = "https://git.clerie.de/clerie/mitel_ommclient2"