嗯 - 没有答案,所以我想没有预先推出的方法来做到这一点。为了后代,我推出了类似于这个人为版本的自己的解决方案。
所以我们开始吧。脚本 1) 启动一个休息服务器,2) 将生成执行工作并向 (1) 报告的外部客户端,3) 在收到所有数据后关闭服务器并继续。
基本上,由于此脚本将在命令行上运行,它应该使用临时端口(否则多个人同时运行它会端口冲突。)服务器只是一个线程中的 http.server,它有一个很少有回调来抓取事件,并使用线程安全 queue.Queue 将源自服务器的事件传递给主线程。这让我们知道何时关闭服务器。
import json
import threading
import http.server
import logging
from queue import Queue
q = Queue()
EPHEMERAL_PORT = 0
num_events = 0
MAX_EVENTS = 3
class TinyRESTHandler(http.server.BaseHTTPRequestHandler):
def __init__(self, service_map, *args):
self.service_map = service_map
http.server.BaseHTTPRequestHandler.__init__(self, *args)
def respond(self, code, message):
self.send_response(code)
self.send_header("Content-Type", "text/ascii")
self.send_header("Content-Length", str(len(message.encode())))
self.end_headers()
self.wfile.write(message.encode())
def handle_POST(self, json_handler):
"""
Route POST requests to the appropriate handler.
"""
payload = self.rfile.read(int(self.headers['Content-Length']))
try:
json_payload = json.loads(payload)
json_handler(self.path, json_payload)
except json.decoder.JSONDecodeError as e:
self.respond(400, "Bad Request: Invalid JSON")
self.respond(200, "OK")
def do_POST(self):
if (self.path in self.service_map):
self.handle_POST(self.service_map[self.path])
else:
self.respond(404, "Not Found")
class EphemeralHTTPServer(http.server.HTTPServer):
"""
We cannot know the port used by an Ephemeral HTTP server until
it has tried to bind a port (at which point the OS gives it a
free port.) This adds a callback to the bind function that allows
us to be notified as soon as a port has been obtained.
"""
def __init__(self, hostname, port_notify_cb, *args, **kwargs):
self.port_notify_cb = port_notify_cb
super().__init__((hostname, EPHEMERAL_PORT), *args, **kwargs)
def server_bind(self):
"""
The server will notify port_notify_cb of its address
once it has bound a port.
"""
super().server_bind()
if (self.port_notify_cb):
self.port_notify_cb(self.server_address)
class TinyRESTServer():
def __init__(self, host, port):
self.host = host
self.port = port
self.service_map = dict()
def register_service(self, path, callback):
self.service_map[path] = callback
def get_handler(self, *args):
"""
HTTPServer creates a new handler for every request. This ensures
that the TinyRESTHandlers are supplied with the service map.
"""
return TinyRESTHandler(self.service_map, *args)
def getHTTPServer(self):
return http.server.HTTPServer((self.host, self.port), self.get_handler)
def run(self):
"""
The server_close call forces HTTPServer to relinquish its port.
"""
self.server = self.getHTTPServer()
try:
self.server.serve_forever()
finally:
self.server.server_close()
def shutdown(self):
self.server.shutdown()
class EphemeralRESTServer(TinyRESTServer):
def __init__(self, host, address_cb):
self.address_cb = address_cb
super().__init__(host, 0)
def getHTTPServer(self):
return EphemeralHTTPServer(self.host, self.address_cb, self.get_handler)
class ServerEvent:
def __init__(self, name):
self.name = name
class PortAcquiredEvent(ServerEvent):
def __init__(self, hostname, port):
super().__init__("port acquired")
self.hostname = hostname
self.port = port
def __str__(self):
return f"{self.name}: (host, port) = ({self.hostname}, {self.port})"
class JSONEvent(ServerEvent):
def __init__(self, json_content):
super().__init__("JSON results")
self.json_content = json_content
def __str__(self):
return f"{self.name}: {self.json_content}"
def get_server_address(server_address):
"""
When the server binds an ephemeral port, it will call this
function to tell us what port the OS provided. Using a queue
ensures that the main prog doesn't try to get the port before
the HTTP server in the thread has successfully obtained one.
"""
q.put(PortAcquiredEvent(server_address[0], server_address[1]))
def add_to_queue(req_type, json_content):
"""
Contrived REST service handler.
"""
q.put(JSONEvent(json_content))
def check_if_we_should_stop_the_server(event):
"""
Contrived function to test when we should stop the http server
and do something with the received data.
"""
global num_events
global MAX_EVENTS
print(event)
num_events += 1
return num_events < MAX_EVENTS
# Start an HTTP server, in a thread, on port 0.
server = EphemeralRESTServer("localhost", get_server_address)
server.register_service('/post_server_info', add_to_queue)
server_thread = threading.Thread(None, server.run)
server_thread.start()
"""
Do something here to cause rest clients to start hitting this
server (for example invoking the clients via subprocess or
whatevs.)
"""
# Block until the queue obtains a value.
cur_val = q.get()
while check_if_we_should_stop_the_server(cur_val):
cur_val = q.get()
# Stop the HTTP server.
server.shutdown()
server_thread.join()
# Do normal script stuff with the data...
开始时“很小”好吗?因此得名。