diff --git a/fieldpoc/controller.py b/fieldpoc/controller.py index 50b61d1..cb90ccf 100644 --- a/fieldpoc/controller.py +++ b/fieldpoc/controller.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +import selectors +import socket import socketserver import time import threading @@ -7,40 +9,73 @@ import threading class Controller: def __init__(self, fp): self.fp = fp + self.handlers = [] # collect control sockets for handlers, so we can shut them down on demand def get_handler(self): class HandleRequest(socketserver.BaseRequestHandler): fp = self.fp + controller = self + + def setup(self): + self.handler_read, self.handler_write = socket.socketpair() + self.controller.handlers.append(self.handler_write) def handle(self): self.request.sendall("FieldPOC interactive controller\n".encode("utf-8")) + + sel = selectors.DefaultSelector() + sel.register(self.handler_read, selectors.EVENT_READ) + sel.register(self.request, selectors.EVENT_READ) while True: self.request.sendall("> ".encode("utf-8")) - data = self.request.recv(1024).decode("utf-8").strip() - if data == "help": - self.request.sendall("""Availiable commands: -help Show this info -exit Disconnect + for key, mask in sel.select(): + if key.fileobj == self.handler_read: + self.request.sendall("\ndisconnecting\n".encode("utf-8")) + return + elif key.fileobj == self.request: + data = self.request.recv(1024).decode("utf-8").strip() + + if data == "help": + self.request.sendall("""Availiable commands: +help Show this info +handlers Show currently running handlers +sync Start syncing +queues Show queue stats +exit Disconnect """.encode("utf-8")) - elif data == "quit" or data == "exit": - break - else: - self.request.sendall("Unknown command, type 'help'\n".encode("utf-8")) + elif data == "quit" or data == "exit": + self.request.sendall("disconnecting\n".encode("utf-8")) + return + elif data == "handlers": + self.request.sendall(("\n".join([str(h) for h in self.controller.handlers]) + "\n").encode("utf-8")) + elif data == "sync": + self.fp.queue_all({"type": "sync"}) + elif data == "queues": + self.request.sendall(("\n".join(["{} {}".format(name, queue.qsize()) for name, queue in self.fp.queues.items()]) + "\n").encode("utf-8")) + else: + self.request.sendall("Unknown command, type 'help'\n".encode("utf-8")) - self.request.sendall("disconnecting\n".encode("utf-8")) + def finish(self): + self.controller.handlers.remove(self.handler_write) return HandleRequest def run(self): - with socketserver.ThreadingTCPServer((self.fp.config.controller.host, self.fp.config.controller.port), self.get_handler()) as server: + class Server(socketserver.ThreadingTCPServer): + allow_reuse_address = True + + with Server((self.fp.config.controller.host, self.fp.config.controller.port), self.get_handler()) as server: threading.Thread(target=server.serve_forever).start() while True: msg = self.fp.queues["controller"].get() + self.fp.queues["controller"].task_done() if msg.get("type") == "stop": server.shutdown() + for h in self.handlers: + h.send(b'\0') break diff --git a/fieldpoc/dect.py b/fieldpoc/dect.py index b9aa2eb..26a6178 100644 --- a/fieldpoc/dect.py +++ b/fieldpoc/dect.py @@ -34,6 +34,7 @@ class Dect: while True: msg = self.fp.queues["dect"].get() + self.fp.queues["dect"].task_done() if msg.get("type") == "stop": break