I wrote an alternative in the Shutting Down the Server section of Werkzeug's docs. This uses multiprocessing.Process
to start and wait for a child process, then terminate it once a received value is passed back over a multiprocessing.Queue
.
import multiprocessing
from werkzeug import Request, Response, run_simple
def run_token_server(q: multiprocessing.Queue) -> None:
@Request.application
def app(request: Request) -> Response:
q.put(request.args["token"])
return Response("", 204)
run_simple("localhost", 5000, app)
def get_token():
q = multiprocessing.Queue()
p = multiprocessing.Process(target=run_token_server, args=(q,))
p.start()
token = q.get(block=True)
p.terminate()
return token
You can see this work by adding the following to the bottom, then running the file with python
:
if __name__ == "__main__":
print(get_token())
Navigating to http://localhost:5000/?token=test
will print test
and exit.
Another similar alternative I showed in the deprecation discussion is to use threading.Thread
and make_server()
instead. It can be demonstrated the same way as above.
import threading
from queue import Queue
from werkzeug import Request, Response
from werkzeug.serving import make_server
def get_token():
@Request.application
def app(request):
q.put(request.args["token"])
return Response("", 204)
q = Queue()
s = make_server("localhost", 5000, app)
t = threading.Thread(target=s.serve_forever)
t.start()
token = q.get(block=True)
s.shutdown()
t.join()
return token
For Waitress, a production WSGI server that will work on Windows and Linux, the approach is almost identical. Replace run_simple()
with waitress.serve()
:
waitress.serve(app, host="localhost", port=5000)
Or for the threading approach use waitress.create_server()
and s.close()
:
s = waitress.create_server(app, host="localhost", port=5000)
t = threading.Thread(target=s.run)
...
s.close()
...
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…