Integrate call routing management

This commit is contained in:
clerie 2022-07-06 13:15:31 +02:00
parent f016f78902
commit 9394f6376b
3 changed files with 597 additions and 3 deletions

View File

@ -9,6 +9,7 @@ import threading
from . import config from . import config
from . import controller from . import controller
from . import dect from . import dect
from . import routing
from . import ywsd from . import ywsd
logger = logging.getLogger("fieldpoc.fieldpoc") logger = logging.getLogger("fieldpoc.fieldpoc")
@ -28,11 +29,13 @@ class FieldPOC:
self.queues = { self.queues = {
"controller": queue.Queue(), "controller": queue.Queue(),
"dect": queue.Queue(), "dect": queue.Queue(),
"routing": queue.Queue(),
} }
logger.info("initialising components") logger.info("initialising components")
self._controller = controller.Controller(self) self._controller = controller.Controller(self)
self._dect = dect.Dect(self) self._dect = dect.Dect(self)
self._routing = routing.Routing(self)
self._ywsd = ywsd.Ywsd(self) self._ywsd = ywsd.Ywsd(self)
def queue_all(self, msg): def queue_all(self, msg):
@ -64,6 +67,9 @@ class FieldPOC:
self._dect_thread = threading.Thread(target=self._dect.run) self._dect_thread = threading.Thread(target=self._dect.run)
self._dect_thread.start() self._dect_thread.start()
self._routing_thread = threading.Thread(target=self._routing.run)
self._routing_thread.start()
self._ywsd_thread = threading.Thread(target=self._ywsd.run, daemon=True) self._ywsd_thread = threading.Thread(target=self._ywsd.run, daemon=True)
self._ywsd_thread.start() self._ywsd_thread.start()

497
fieldpoc/routing.py Normal file
View File

@ -0,0 +1,497 @@
#!/usr/bin/env python3
import diffsync
import logging
import sqlalchemy
from typing import Optional, List
from ywsd.objects import Extension, ForkRank, User, Yate
logger = logging.getLogger("fieldpoc.routing")
class YateModel(diffsync.DiffSyncModel):
_modelname = "yate"
_identifiers = ("guru3_identifier",)
_attributes = (
"hostname",
"voip_listener",
)
guru3_identifier: str
hostname: str
voip_listener: str
yate_id: Optional[int]
class YwsdYateModel(YateModel):
@classmethod
def create(cls, diffsync, ids, attrs):
with diffsync.engine.connect() as conn:
conn.execute(
Yate.table.insert().values(
guru3_identifier=ids["guru3_identifier"], **attrs
)
)
return super().create(diffsync, ids=ids, attrs=attrs)
def update(self, attrs):
with self.diffsync.engine.connect() as conn:
conn.execute(
Yate.table.update()
.where(Yate.table.c.id == self.yate_id)
.values(**attrs)
)
return super().update(attrs)
def delete(self):
with self.diffsync.engine.connect() as conn:
conn.execute(Yate.table.delete().where(Yate.table.c.id == self.yate_id))
return super().delete()
class ExtensionModel(diffsync.DiffSyncModel):
_modelname = "extension"
_identifiers = ("extension",)
_attributes = (
"yate",
"name",
"extension_type",
"outgoing_extension",
"dialout_allowed",
"forwarding_mode",
"lang",
)
_children = {"extension": "forwarding_extension"}
yate: Optional[str]
extension: str
name: Optional[str]
short_name: Optional[str]
extension_type: str
outgoing_extension: Optional[str]
outgoing_name: Optional[str]
dialout_allowed: bool
ringback: Optional[str]
forwarding_mode: str
forwarding_delay: Optional[int]
forwarding_extension: List = []
lang: str
extension_id: Optional[int]
class YwsdExtensionModel(ExtensionModel):
@classmethod
def create(cls, diffsync, ids, attrs):
with diffsync.engine.connect() as conn:
conn.execute(
Extension.table.insert().values(
extension=ids["extension"],
type=attrs["extension_type"],
yate_id=diffsync.get("yate", attrs["yate"]).yate_id
if attrs.get("yate")
else None,
**dict(
(k, v)
for k, v in attrs.items()
if k not in ["extension_type", "yate"]
),
)
)
return super().create(diffsync, ids=ids, attrs=attrs)
def update(self, attrs):
with self.diffsync.engine.connect() as conn:
addn_attrs = {}
if attrs.get("extension_type"):
addn_attrs["type"] = attrs["extension_typ"]
if attrs.get("yate"):
addn_attrs["yate_id"] = (
self.diffsync.get("yate", attrs["yate"]).yate_id,
)
conn.execute(
Extension.table.update()
.where(Extension.table.c.id == self.extension_id)
.values(
**dict(
(k, v)
for k, v in attrs.items()
if k not in ["extension_type", "yate"]
),
**addn_attrs,
)
)
return super().update(attrs)
def delete(self):
with self.diffsync.engine.connect() as conn:
conn.execute(
Extension.table.delete().where(
Extension.table.c.id == self.extension_id
)
)
return super().delete()
class UserModel(diffsync.DiffSyncModel):
_modelname = "user"
_identifiers = ("username",)
_attributes = (
"displayname",
"password",
"user_type",
"dect_displaymode",
"trunk",
"static_target",
)
username: str
displayname: Optional[str]
password: str
inuse: Optional[bool]
user_type: str
dect_displaymode: Optional[str]
trunk: Optional[bool]
static_target: Optional[str]
class YwsdUserModel(UserModel):
@classmethod
def create(cls, diffsync, ids, attrs):
with diffsync.engine.connect() as conn:
conn.execute(
User.table.insert().values(
username=ids["username"],
type=attrs["user_type"],
**dict((k, v) for k, v in attrs.items() if k not in ["user_type"]),
)
)
return super().create(diffsync, ids=ids, attrs=attrs)
def update(self, attrs):
with self.diffsync.engine.connect() as conn:
conn.execute(
User.table.update()
.where(User.table.c.username == self.username)
.values(
type=attrs.get("user_type", self.user_type),
**dict((k, v) for k, v in attrs.items() if k not in ["user_type"]),
)
)
return super().update(attrs)
def delete(self):
with self.diffsync.engine.connect() as conn:
conn.execute(
User.table.delete().where(User.table.c.username == self.username)
)
return super().delete()
class ForkRankModel(diffsync.DiffSyncModel):
_modelname = "forkrank"
_identifiers = (
"extension",
"index",
)
_attributes = (
"mode",
"delay",
)
forkrank_id: Optional[int]
extension: str
index: int
mode: str
delay: Optional[int]
class YwsdForkRankModel(ForkRankModel):
@classmethod
def create(cls, diffsync, ids, attrs):
with diffsync.engine.connect() as conn:
conn.execute(
ForkRank.table.insert().values(
extension_id=diffsync.get(
"extension", ids["extension"]
).extension_id,
index=ids["index"],
**attrs,
)
)
return super().create(diffsync, ids=ids, attrs=attrs)
def update(self, attrs):
with self.diffsync.engine.connect() as conn:
conn.execute(
ForkRank.table.update()
.where(ForkRank.table.c.id == self.forkrank_id)
.values(**attrs)
)
return super().update(attrs)
def delete(self):
with self.diffsync.engine.connect() as conn:
conn.execute(
ForkRank.table.delete().where(ForkRank.table.c.id == self.forkrank_id)
)
return super().delete()
class ForkRankMemberModel(diffsync.DiffSyncModel):
_modelname = "forkrankmember"
_identifiers = (
"forkrank",
"extension",
)
_attributes = (
"rankmember_type",
"active",
)
forkrank: str
extension: str
rankmember_type: str
active: bool
class YwsdForkRankMemberModel(ForkRankMemberModel):
@classmethod
def create(cls, diffsync, ids, attrs):
with diffsync.engine.connect() as conn:
conn.execute(
ForkRank.member_table.insert().values(
forkrank_id=diffsync.get("forkrank", ids["forkrank"]).forkrank_id,
extension_id=diffsync.get(
"extension", ids["extension"]
).extension_id,
**attrs,
)
)
return super().create(diffsync, ids=ids, attrs=attrs)
def update(self, attrs):
with self.diffsync.engine.connect() as conn:
conn.execute(
ForkRank.member_table.update()
.where(
_and(
ForkRank.member_table.c.forkrank_id
== self.diffsync.get("forkrank", self.forkrank).forkrank_id,
ForkRank.member_table.c.extension_id
== self.diffsync.get("extension", self.extension).extension_id,
)
)
.values(**attrs)
)
return super().update(attrs)
def delete(self):
with self.diffsync.engine.connect() as conn:
conn.execute(
ForkRank.member_table.delete().where(
_and(
ForkRank.member_table.c.forkrank_id
== self.diffsync.get("forkrank", self.forkrank).forkrank_id,
ForkRank.member_table.c.extension_id
== self.diffsync.get("extension", self.extension).extension_id,
)
)
)
return super().delete()
class BackendNerd(diffsync.DiffSync):
yate = YateModel
extension = ExtensionModel
user = UserModel
forkrank = ForkRankModel
forkrankmember = ForkRankMemberModel
top_level = ["yate", "extension", "user", "forkrank", "forkrankmember"]
def load(self, data):
yate_dect = self.yate(
guru3_identifier="dect", hostname="dect", voip_listener="local"
)
self.add(yate_dect)
yate_sip = self.yate(
guru3_identifier="sip", hostname="sip", voip_listener="local"
)
self.add(yate_sip)
for key, value in data["extensions"].items():
yate = None
if value["type"] in ["dect", "static"]:
# yate = yate_dect.guru3_identifier
yate = yate_sip.guru3_identifier
elif value["type"] in ["sip"]:
yate = yate_sip.guru3_identifier
elif value["type"] in ["callgroup"]:
forkrank = self.forkrank(extension=key, index=0, mode="DEFAULT")
self.add(forkrank)
for member in value["callgroup_members"]:
frm = self.forkrankmember(
forkrank=forkrank.get_unique_id(),
extension=member,
rankmember_type="DEFAULT",
active=True,
)
self.add(frm)
extension = self.extension(
extension=key,
**dict(
(k, v)
for k, v in value.items()
if k in ["name", "outgoing_extension", "dialout_allowed"]
),
extension_type=("SIMPLE" if not value["trunk"] else "TRUNK")
if not value["type"] == "callgroup"
else "GROUP",
forwarding_mode="DISABLED",
lang="en-GB",
yate=yate,
)
self.add(extension)
if value["type"] in ["sip", "static", "dect"]:
user_type = {"sip": "user", "dect": "user", "static": "static"}
user = self.user(
username=key,
displayname=value["name"],
password=value.get("sip_password", key),
user_type=user_type[value["type"]],
trunk=value["trunk"],
static_target=value.get("static_target", ""),
)
self.add(user)
class BackendYwsd(diffsync.DiffSync):
yate = YwsdYateModel
extension = YwsdExtensionModel
user = YwsdUserModel
forkrank = YwsdForkRankModel
forkrankmember = YwsdForkRankMemberModel
top_level = ["yate", "extension", "user", "forkrank", "forkrankmember"]
def load(self, database_url):
self.engine = sqlalchemy.create_engine(database_url)
extension_ids = {}
forkrank_ids = {}
with self.engine.connect() as conn:
for row in conn.execute(Yate.table.select()):
yate = self.yate(
**dict(
(k, v)
for k, v in row._mapping.items()
if k in ["guru3_identifier", "hostname", "voip_listener"]
),
yate_id=row.id,
)
self.add(yate)
for row in conn.execute(Extension.table.select()):
extension = self.extension(
**dict(
(k, v)
for k, v in row._mapping.items()
if k
in [
"extension",
"name",
"short_name",
"outgoing_extension",
"outgoing_name",
"dialout_allowed",
"ringback",
"forwarding_mode",
"forwarding_delay",
"lang",
]
),
extension_type=row.type,
extension_id=row.id,
)
extension_ids[row.id] = row.extension
# TODO: forwarding_extension_id
if row.yate_id:
for yate in self.get_all("yate"):
if yate.yate_id == row.yate_id:
extension.yate = yate.guru3_identifier
self.add(extension)
for row in conn.execute(User.table.select()):
user = self.user(
**dict(
(k, v)
for k, v in row._mapping.items()
if k
in [
"username",
"displayname",
"password",
"inuse",
"dect_displaymode",
"trunk",
"static_target",
]
),
user_type=row.type,
)
self.add(user)
for row in conn.execute(ForkRank.table.select()):
fr = self.forkrank(
forkrank_id=row.id,
extension=extension_ids[row.extension_id],
**dict(
(k, v)
for k, v in row._mapping.items()
if k in ["index", "mode", "delay"]
),
)
forkrank_ids[row.id] = fr.get_unique_id()
self.add(fr)
for row in conn.execute(ForkRank.member_table.select()):
frm = self.forkrankmember(
forkrank=forkrank_ids[row.forkrank_id],
extension=extension_ids[row.extension_id],
**dict(
(k, v)
for k, v in row._mapping.items()
if k in ["rankmember_type", "active"]
),
)
self.add(frm)
class Routing:
def __init__(self, fp):
self.fp = fp
def run(self):
while True:
msg = self.fp.queues["dect"].get()
self.fp.queues["dect"].task_done()
if msg.get("type") == "stop":
break
elif msg.get("type") == "sync":
logger.info("syncing")
state_fieldpoc = BackendNerd()
state_fieldpoc.load(self.fp.extensions._c)
state_yate = BackendYwsd()
state_yate.load("postgresql+psycopg2://{}:{}@{}/{}".format(
self.fp.config.database.username,
self.fp.config.database.password,
self.fp.config.database.hostname,
self.fp.config.database.database,
))
logger.info("\n" + state_yate.diff_from(state_fieldpoc).str())
state_yate.sync_from(state_fieldpoc)

View File

@ -1,17 +1,108 @@
{ {
"extensions": { "extensions": {
"1000": {
"name": "strontium",
"type": "sip",
"dialout_allowed": true,
"trunk": false,
"sip_password": "AeShahS2"
},
"2020": {
"name": "Hotline",
"type": "static",
"dialout_allowed": false,
"trunk": false,
"static_target": "external/nodata//tmp/yate-tcl/hotline.tcl"
},
"2342": {
"name": "hydrox",
"type": "sip",
"dialout_allowed": true,
"trunk": false,
"sip_password": "Xua2liej"
},
"2574": { "2574": {
"name": "clerie", "name": "clerie",
"type": "dect", "type": "dect",
"trunk": false,
"dialout_allowed": true,
"dect_claim_token": "2574" "dect_claim_token": "2574"
}, },
"2789": {
"name": "bruttl",
"type": "dect",
"dialout_allowed": true,
"trunk": false,
"dect_ipei": "03586 0682136 3"
},
"5375": {
"name": "n0emis",
"type": "callgroup",
"dialout_allowed": true,
"trunk": false,
"callgroup_members": [
"5376",
"5377",
"5379"
]
},
"5376": {
"name": "n0emis SIP",
"type": "sip",
"dialout_allowed": true,
"trunk": false,
"outgoing_extension": "5375",
"sip_password": "wieK5xal"
},
"5377": {
"name": "n0emis DECT",
"type": "dect",
"dialout_allowed": true,
"trunk": false,
"outgoing_extension": "5375",
"dect_ipei": "10345 0136625 3"
},
"5379": {
"name": "n0emis iPhone",
"type": "sip",
"dialout_allowed": true,
"trunk": false,
"outgoing_extension": "5375",
"sip_password": "uuchahD9"
},
"7486": {
"name": "floppy",
"type": "dect",
"dialout_allowed": true,
"trunk": false,
"dect_ipei": "12877 0914855 9"
},
"8463": {
"name": "Time",
"type": "static",
"dialout_allowed": false,
"trunk": false,
"static_target": "external/nodata//tmp/yate-tcl/time.tcl"
},
"8888": {
"name": "test",
"type": "static",
"dialout_allowed": true,
"trunk": false,
"static_target": "line/01754566012;line=dialout"
},
"9998": { "9998": {
"name": "Temporary Numbers", "name": "Temporary Numbers",
"type": "temp" "trunk": false,
"dialout_allowed": true,
"type": "static"
}, },
"9999": { "9999": {
"name": "DECT Claim Token Pool", "name": "DECT Claim Extensions",
"type": "claim_token" "type": "static",
"dialout_allowed": false,
"trunk": true,
"static_target": "external/nodata//opt/nerdsync/claim.py"
} }
} }
} }