Shut down controller connections when shutting down service
This commit is contained in:
parent
2323f934a4
commit
51ce2e6ff0
@ -1,5 +1,7 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import selectors
|
||||||
|
import socket
|
||||||
import socketserver
|
import socketserver
|
||||||
import time
|
import time
|
||||||
import threading
|
import threading
|
||||||
@ -7,40 +9,73 @@ import threading
|
|||||||
class Controller:
|
class Controller:
|
||||||
def __init__(self, fp):
|
def __init__(self, fp):
|
||||||
self.fp = fp
|
self.fp = fp
|
||||||
|
self.handlers = [] # collect control sockets for handlers, so we can shut them down on demand
|
||||||
|
|
||||||
def get_handler(self):
|
def get_handler(self):
|
||||||
class HandleRequest(socketserver.BaseRequestHandler):
|
class HandleRequest(socketserver.BaseRequestHandler):
|
||||||
fp = self.fp
|
fp = self.fp
|
||||||
|
controller = self
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
self.handler_read, self.handler_write = socket.socketpair()
|
||||||
|
self.controller.handlers.append(self.handler_write)
|
||||||
|
|
||||||
def handle(self):
|
def handle(self):
|
||||||
self.request.sendall("FieldPOC interactive controller\n".encode("utf-8"))
|
self.request.sendall("FieldPOC interactive controller\n".encode("utf-8"))
|
||||||
|
|
||||||
|
sel = selectors.DefaultSelector()
|
||||||
|
sel.register(self.handler_read, selectors.EVENT_READ)
|
||||||
|
sel.register(self.request, selectors.EVENT_READ)
|
||||||
while True:
|
while True:
|
||||||
self.request.sendall("> ".encode("utf-8"))
|
self.request.sendall("> ".encode("utf-8"))
|
||||||
data = self.request.recv(1024).decode("utf-8").strip()
|
|
||||||
|
|
||||||
if data == "help":
|
for key, mask in sel.select():
|
||||||
self.request.sendall("""Availiable commands:
|
if key.fileobj == self.handler_read:
|
||||||
help Show this info
|
self.request.sendall("\ndisconnecting\n".encode("utf-8"))
|
||||||
exit Disconnect
|
return
|
||||||
|
elif key.fileobj == self.request:
|
||||||
|
data = self.request.recv(1024).decode("utf-8").strip()
|
||||||
|
|
||||||
|
if data == "help":
|
||||||
|
self.request.sendall("""Availiable commands:
|
||||||
|
help Show this info
|
||||||
|
handlers Show currently running handlers
|
||||||
|
sync Start syncing
|
||||||
|
queues Show queue stats
|
||||||
|
exit Disconnect
|
||||||
""".encode("utf-8"))
|
""".encode("utf-8"))
|
||||||
elif data == "quit" or data == "exit":
|
elif data == "quit" or data == "exit":
|
||||||
break
|
self.request.sendall("disconnecting\n".encode("utf-8"))
|
||||||
else:
|
return
|
||||||
self.request.sendall("Unknown command, type 'help'\n".encode("utf-8"))
|
elif data == "handlers":
|
||||||
|
self.request.sendall(("\n".join([str(h) for h in self.controller.handlers]) + "\n").encode("utf-8"))
|
||||||
|
elif data == "sync":
|
||||||
|
self.fp.queue_all({"type": "sync"})
|
||||||
|
elif data == "queues":
|
||||||
|
self.request.sendall(("\n".join(["{} {}".format(name, queue.qsize()) for name, queue in self.fp.queues.items()]) + "\n").encode("utf-8"))
|
||||||
|
else:
|
||||||
|
self.request.sendall("Unknown command, type 'help'\n".encode("utf-8"))
|
||||||
|
|
||||||
self.request.sendall("disconnecting\n".encode("utf-8"))
|
def finish(self):
|
||||||
|
self.controller.handlers.remove(self.handler_write)
|
||||||
|
|
||||||
return HandleRequest
|
return HandleRequest
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
with socketserver.ThreadingTCPServer((self.fp.config.controller.host, self.fp.config.controller.port), self.get_handler()) as server:
|
class Server(socketserver.ThreadingTCPServer):
|
||||||
|
allow_reuse_address = True
|
||||||
|
|
||||||
|
with Server((self.fp.config.controller.host, self.fp.config.controller.port), self.get_handler()) as server:
|
||||||
|
|
||||||
threading.Thread(target=server.serve_forever).start()
|
threading.Thread(target=server.serve_forever).start()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
msg = self.fp.queues["controller"].get()
|
msg = self.fp.queues["controller"].get()
|
||||||
|
self.fp.queues["controller"].task_done()
|
||||||
|
|
||||||
if msg.get("type") == "stop":
|
if msg.get("type") == "stop":
|
||||||
server.shutdown()
|
server.shutdown()
|
||||||
|
for h in self.handlers:
|
||||||
|
h.send(b'\0')
|
||||||
break
|
break
|
||||||
|
@ -34,6 +34,7 @@ class Dect:
|
|||||||
|
|
||||||
while True:
|
while True:
|
||||||
msg = self.fp.queues["dect"].get()
|
msg = self.fp.queues["dect"].get()
|
||||||
|
self.fp.queues["dect"].task_done()
|
||||||
|
|
||||||
if msg.get("type") == "stop":
|
if msg.get("type") == "stop":
|
||||||
break
|
break
|
||||||
|
Loading…
Reference in New Issue
Block a user