Refactor controller to HTTP REST API
This commit is contained in:
		| @@ -1,58 +1,46 @@ | |||||||
| # FieldPOC controller | # FieldPOC controller | ||||||
|  |  | ||||||
| The FieldPOC controller is an interactive console to help managing the current state of the FieldPOC system. | The FieldPOC controller is a HTTP REST API to help managing the current state of the FieldPOC system. | ||||||
|  |  | ||||||
| Connect to it via the IP address von port set in the [FieldPOC configuration](configuration.md) with `telnet`. | Connect to it via the IP address von port set in the [FieldPOC configuration](configuration.md). | ||||||
|  |  | ||||||
| ## Commands | ## API Endpoints | ||||||
|  |  | ||||||
| ### Help | ### `/sync` | ||||||
|  |  | ||||||
| `help` | ```bash | ||||||
|  | curl --json '{}' http://127.0.0.1:9437/sync | ||||||
| Show help info. | ``` | ||||||
|  |  | ||||||
| ### Show handlers |  | ||||||
|  |  | ||||||
| `handlers` |  | ||||||
|  |  | ||||||
| Show currently running handlers |  | ||||||
|  |  | ||||||
| ### Reconfigure all components |  | ||||||
|  |  | ||||||
| `sync` |  | ||||||
|  |  | ||||||
| Notify all parts of FieldPOC to check configuration of connected components and update it. | Notify all parts of FieldPOC to check configuration of connected components and update it. | ||||||
|  |  | ||||||
| ### Show queue stats | ### `/queues` | ||||||
|  |  | ||||||
| `queues` | ```bash | ||||||
|  | curl http://127.0.0.1:9437/queues | ||||||
|  | ``` | ||||||
|  |  | ||||||
| ### Reload configuration | Show internal message queue stats. | ||||||
|  |  | ||||||
| `reload` | ### `/reload` | ||||||
|  |  | ||||||
| Read [FieldPOC configuration](configuration.md) file and apply it. | ```bash | ||||||
|  | curl --json '{}' http://127.0.0.1:9437/reload | ||||||
|  | ``` | ||||||
|  |  | ||||||
| ### Bind extension to DECT device | Read [FieldPOC extensions specification](extensions.md) file and apply it. | ||||||
|  |  | ||||||
| `claim <ext> <token>` | ### `/claim` | ||||||
|  |  | ||||||
| - `ext` is the current extension number of the DECT device. | ```bash | ||||||
|  | curl --json '{"extension": "1111", "token": "1234"}' http://127.0.0.1:9437/claim | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | Bind extension to DECT device | ||||||
|  |  | ||||||
|  | - `extension` is the current extension number of the DECT device. | ||||||
| - `token` is the `dect_claim_token` of the extension that should get applied. | - `token` is the `dect_claim_token` of the extension that should get applied. | ||||||
|  |  | ||||||
| This works because newly connected DECT phones get a temporary number assigned. | This works because newly connected DECT phones get a temporary number assigned. | ||||||
| This temporary number is usually the current number. | This temporary number is usually the current number. | ||||||
| But it is possible to use any extension, so the extension for a device can be changed any time. | But it is possible to use any extension, so the extension for a device can be changed any time. | ||||||
|  |  | ||||||
| ### Disconnect |  | ||||||
|  |  | ||||||
| `exit` |  | ||||||
|  |  | ||||||
| Disconnect telnet session. |  | ||||||
|  |  | ||||||
| ### Stop controller |  | ||||||
|  |  | ||||||
| `stop` |  | ||||||
|  |  | ||||||
| Shutdown the controller, but FieldPOC continues running. |  | ||||||
|   | |||||||
| @@ -1,98 +1,190 @@ | |||||||
| #!/usr/bin/env python3 | #!/usr/bin/env python3 | ||||||
|  |  | ||||||
| import logging | import logging | ||||||
| import selectors | from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler, HTTPStatus | ||||||
| import socket |  | ||||||
| import socketserver |  | ||||||
| import time |  | ||||||
| import threading | import threading | ||||||
|  | import json | ||||||
|  |  | ||||||
| logger = logging.getLogger("fieldpoc.controller") | logger = logging.getLogger("fieldpoc.controller") | ||||||
|  |  | ||||||
| class Controller: | class Controller: | ||||||
|     def __init__(self, fp): |     def __init__(self, fp): | ||||||
|         self.fp = fp |         self.fp = fp | ||||||
|         self.handlers = [] # collect control sockets for handlers, so we can shut them down on demand |  | ||||||
|  |  | ||||||
|     def get_handler(self): |     def get_handler(self): | ||||||
|         class HandleRequest(socketserver.BaseRequestHandler): |         """ | ||||||
|             fp = self.fp |         Create a handler passed to the HTTP server | ||||||
|             controller = self |  | ||||||
|  |  | ||||||
|             def setup(self): |         Return a subclass of BaseHTTPRequestHandler | ||||||
|                 self.handler_read, self.handler_write = socket.socketpair() |         """ | ||||||
|                 self.controller.handlers.append(self.handler_write) |  | ||||||
|  |  | ||||||
|             def handle(self): |         # Paths by HTTP request method | ||||||
|                 self.request.sendall("FieldPOC interactive controller\n".encode("utf-8")) |         PATHS = { | ||||||
|  |             "GET": {}, | ||||||
|  |             "POST": {}, | ||||||
|  |         } | ||||||
|  |  | ||||||
|                 sel = selectors.DefaultSelector() |         def index(): | ||||||
|                 sel.register(self.handler_read, selectors.EVENT_READ) |             """ | ||||||
|                 sel.register(self.request, selectors.EVENT_READ) |             Index page | ||||||
|                 while True: |             """ | ||||||
|                     self.request.sendall("> ".encode("utf-8")) |  | ||||||
|  |  | ||||||
|                     for key, mask in sel.select(): |             return "FieldPOC HTTP Controller" | ||||||
|                         if key.fileobj == self.handler_read: |  | ||||||
|                             self.request.sendall("\ndisconnecting\n".encode("utf-8")) |  | ||||||
|                             return |  | ||||||
|                         elif key.fileobj == self.request: |  | ||||||
|                             data = self.request.recv(1024).decode("utf-8").strip() |  | ||||||
|  |  | ||||||
|                             if data == "help": |         PATHS["GET"]["/"] = index | ||||||
|                                 self.request.sendall("""Availiable commands: |  | ||||||
| help                    Show this info |         def sync(payload): | ||||||
| handlers                Show currently running handlers |             """ | ||||||
| sync                    Start syncing |             Notify all parts of FieldPOC to check configuration of connected components and update it | ||||||
| queues                  Show queue stats |             """ | ||||||
| reload                  Reload extension config file |  | ||||||
| claim <ext> <token>     claim dect extension |             self.fp.queue_all({"type": "sync"}) | ||||||
| exit                    Disconnect |  | ||||||
| """.encode("utf-8")) |             return "ok" | ||||||
|                             elif data == "": |  | ||||||
|                                 continue |         PATHS["POST"]["/sync"] = sync | ||||||
|                             elif data == "quit" or data == "exit": |  | ||||||
|                                 self.request.sendall("disconnecting\n".encode("utf-8")) |         def queues(): | ||||||
|                                 return |             """ | ||||||
|                             elif data == "handlers": |             Show status of message queues | ||||||
|                                 self.request.sendall(("\n".join([str(h) for h in self.controller.handlers]) + "\n").encode("utf-8")) |             """ | ||||||
|                             elif data == "sync": |  | ||||||
|                                 self.fp.queue_all({"type": "sync"}) |             return {name: queue.qsize() for name, queue in self.fp.queues.items()} | ||||||
|                             elif data == "queues": |  | ||||||
|                                 self.request.sendall(("\n".join(["{} {}".format(name, queue.qsize()) for name, queue in self.fp.queues.items()]) + "\n").encode("utf-8")) |         PATHS["GET"]["/queues"] = queues | ||||||
|                             elif data == "reload": |  | ||||||
|                                 self.fp.reload_extensions() |         def reload(payload): | ||||||
|                             elif data.startswith("claim"): |             """ | ||||||
|                                 data = data.split(" ") |             Read extensions specification and apply it | ||||||
|                                 if len(data) == 3: |             """ | ||||||
|                                     self.fp.queue_all({"type": "claim", "extension": data[1], "token": data[2]}) |  | ||||||
|                                 else: |             self.fp.reload_extensions() | ||||||
|                                     self.request.sendall("error: You have to specify calling extension and token\n".encode("utf-8")) |  | ||||||
|                             else: |         PATHS["POST"]["/reload"] = reload | ||||||
|                                 self.request.sendall("Unknown command, type 'help'\n".encode("utf-8")) |  | ||||||
|  |         def claim(payload): | ||||||
|  |             """ | ||||||
|  |             Bind extension to DECT device | ||||||
|  |             """ | ||||||
|  |  | ||||||
|  |             if payload is None: | ||||||
|  |                 return {} | ||||||
|  |             try: | ||||||
|  |                 self.fp.queue_all({"type": "claim", "extension": payload["extension"], "token": payload["token"]}) | ||||||
|  |                 return "ok" | ||||||
|  |             except KeyError: | ||||||
|  |                 return { "error": "You have to specify calling extension and token" } | ||||||
|  |  | ||||||
|  |         PATHS["POST"]["/claim"] = claim | ||||||
|  |  | ||||||
|  |         class HandleRequest(BaseHTTPRequestHandler): | ||||||
|  |  | ||||||
|  |             def do_GET(self): | ||||||
|  |                 if self.path in PATHS["GET"]: | ||||||
|  |                     response = PATHS["GET"][self.path]() | ||||||
|  |                     return self.make_response(response) | ||||||
|  |                 else: | ||||||
|  |                     self.send_error(HTTPStatus.NOT_FOUND, "Not found") | ||||||
|  |                     return | ||||||
|  |  | ||||||
|  |             def do_HEAD(self): | ||||||
|  |                 if self.path in PATHS["GET"]: | ||||||
|  |                     response = PATHS["GET"][self.path]() | ||||||
|  |                     return self.make_response(response, head_only=True) | ||||||
|  |                 else: | ||||||
|  |                     self.send_error(HTTPStatus.NOT_FOUND, "Not found") | ||||||
|  |                     return | ||||||
|  |  | ||||||
|  |             def do_POST(self): | ||||||
|  |                 if self.path in PATHS["POST"]: | ||||||
|  |                     payload = self.prepare_payload() | ||||||
|  |                     response = PATHS["POST"][self.path](payload) | ||||||
|  |                     return self.make_response(response) | ||||||
|  |                 else: | ||||||
|  |                     self.send_error(HTTPStatus.NOT_FOUND, "Not found") | ||||||
|  |                     return | ||||||
|  |  | ||||||
|  |             def make_response(self, content, head_only=False): | ||||||
|  |                 """ | ||||||
|  |                 Take a dict and return as HTTP json response | ||||||
|  |                 """ | ||||||
|  |  | ||||||
|  |                 # Convert response to json | ||||||
|  |                 content = json.dumps(content) + "\n" | ||||||
|  |                 # Encode repose as binary | ||||||
|  |                 encoded = content.encode("utf-8") | ||||||
|  |                 # Send 200 status code | ||||||
|  |                 self.send_response(HTTPStatus.OK) | ||||||
|  |                 # Set json header | ||||||
|  |                 self.send_header("Content-Type", "application/json; charset=utf-8") | ||||||
|  |                 # Specify content length | ||||||
|  |                 self.send_header("Conten-Length", str(len(encoded))) | ||||||
|  |                 # Finish headers | ||||||
|  |                 self.end_headers() | ||||||
|  |  | ||||||
|  |                 # Return early on HEAD request | ||||||
|  |                 if head_only: | ||||||
|  |                     return | ||||||
|  |  | ||||||
|  |                 # Send response | ||||||
|  |                 self.wfile.write(encoded) | ||||||
|  |  | ||||||
|  |             def prepare_payload(self): | ||||||
|  |                 """ | ||||||
|  |                 Read payload data from HTTP POST request and parse as json. | ||||||
|  |                 """ | ||||||
|  |  | ||||||
|  |                 # Get payload length | ||||||
|  |                 length = int(self.headers["Content-Length"]) | ||||||
|  |                 # Read payload up to length | ||||||
|  |                 encoded = self.rfile.read(length) | ||||||
|  |                 # Decode payload to string | ||||||
|  |                 content = encoded.decode("utf-8") | ||||||
|  |  | ||||||
|  |                 # Return early if no payload | ||||||
|  |                 if not content: | ||||||
|  |                     return None | ||||||
|  |  | ||||||
|  |                 # Parse payload as json | ||||||
|  |                 return json.loads(content) | ||||||
|  |  | ||||||
|             def finish(self): |  | ||||||
|                 self.controller.handlers.remove(self.handler_write) |  | ||||||
|  |  | ||||||
|         return HandleRequest |         return HandleRequest | ||||||
|  |  | ||||||
|  |  | ||||||
|     def run(self): |     def run(self): | ||||||
|         logger.info("starting server") |         logger.info("starting server") | ||||||
|         class Server(socketserver.ThreadingTCPServer): |  | ||||||
|  |         class Server(ThreadingHTTPServer): | ||||||
|  |             """ | ||||||
|  |             Subclass ThreadingHTTPServer to set custom options | ||||||
|  |             """ | ||||||
|  |  | ||||||
|  |             # Sometimes sockets are still bound to addresses and ports | ||||||
|  |             # we just ignore that with this | ||||||
|             allow_reuse_address = True |             allow_reuse_address = True | ||||||
|  |  | ||||||
|         with Server((self.fp.config.controller.host, self.fp.config.controller.port), self.get_handler()) as server: |         # Start the server | ||||||
|  |         with Server( | ||||||
|  |             # Passing address and port from config | ||||||
|  |             (self.fp.config.controller.host, self.fp.config.controller.port), | ||||||
|  |             # With the handler class we generated | ||||||
|  |             self.get_handler() | ||||||
|  |         ) as server: | ||||||
|  |  | ||||||
|  |             # Starting server loop in another thread | ||||||
|             threading.Thread(target=server.serve_forever).start() |             threading.Thread(target=server.serve_forever).start() | ||||||
|  |  | ||||||
|  |             # Listen on the message queue | ||||||
|             while True: |             while True: | ||||||
|  |                 # Wait for a new message and get it | ||||||
|                 msg = self.fp.queues["controller"].get() |                 msg = self.fp.queues["controller"].get() | ||||||
|  |                 # Remove message from queue | ||||||
|                 self.fp.queues["controller"].task_done() |                 self.fp.queues["controller"].task_done() | ||||||
|  |  | ||||||
|                 if msg.get("type") == "stop": |                 if msg.get("type") == "stop": | ||||||
|                     logger.info("stopping server") |                     logger.info("stopping server") | ||||||
|  |                     # Stop http server loop | ||||||
|                     server.shutdown() |                     server.shutdown() | ||||||
|                     for h in self.handlers: |                     # Exist message queue listening loop | ||||||
|                         h.send(b'\0') |  | ||||||
|                     break |                     break | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user