Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions integration_tests/test_graceful_shutdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import os
import socket
import subprocess
import sys
import textwrap
import time

import pytest


pytestmark = pytest.mark.skipif(sys.platform.startswith("win32"), reason="SIGTERM graceful shutdown test is POSIX-only")


def _get_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
return sock.getsockname()[1]


def _wait_for_server(port: int, process: subprocess.Popen[bytes]) -> None:
deadline = time.monotonic() + 15
while time.monotonic() < deadline:
if process.poll() is not None:
stdout, stderr = process.communicate()
raise AssertionError(f"Robyn server exited early with {process.returncode}\nstdout: {stdout!r}\nstderr: {stderr!r}")

try:
with socket.create_connection(("127.0.0.1", port), timeout=1):
return
except OSError:
time.sleep(0.1)

process.kill()
stdout, stderr = process.communicate()
raise AssertionError(f"Robyn server did not start on port {port}\nstdout: {stdout!r}\nstderr: {stderr!r}")


def test_sigterm_runs_shutdown_handler(tmp_path):
port = _get_free_port()
sentinel_file = tmp_path / "shutdown.txt"
app_file = tmp_path / "graceful_shutdown_app.py"
app_file.write_text(
textwrap.dedent(
"""
import os
from pathlib import Path

from robyn import Robyn

app = Robyn(__file__)

@app.get("/")
def index():
return "Hello World!"

@app.shutdown_handler
def shutdown_handler():
Path(os.environ["ROBYN_SHUTDOWN_SENTINEL"]).write_text("shutdown")

if __name__ == "__main__":
app.start(host="127.0.0.1", port=int(os.environ["ROBYN_PORT"]))
"""
)
)

env = os.environ.copy()
env["ROBYN_PORT"] = str(port)
env["ROBYN_SHUTDOWN_SENTINEL"] = str(sentinel_file)

process = subprocess.Popen([sys.executable, str(app_file)], env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
_wait_for_server(port, process)
process.terminate()
process.wait(timeout=15)

assert process.returncode == 0
assert sentinel_file.read_text() == "shutdown"
finally:
if process.poll() is None:
process.kill()
process.communicate()
38 changes: 36 additions & 2 deletions robyn/processpool.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import signal
import sys
import time
import webbrowser

from multiprocess import Process # type: ignore
Expand All @@ -11,6 +12,39 @@
from robyn.router import GlobalMiddleware, Route, RouteMiddleware
from robyn.types import Directory

GRACEFUL_SHUTDOWN_TIMEOUT = 10


def _raise_keyboard_interrupt(_sig, _frame):
raise KeyboardInterrupt


def _register_graceful_shutdown_handler() -> None:
if sys.platform.startswith("win32"):
return

signal.signal(signal.SIGTERM, _raise_keyboard_interrupt)


def _terminate_process_pool(process_pool: list[Process]) -> None:
for process in process_pool:
try:
process.terminate()
except ProcessLookupError:
pass

deadline = time.monotonic() + GRACEFUL_SHUTDOWN_TIMEOUT
for process in process_pool:
remaining_timeout = max(deadline - time.monotonic(), 0)
process.join(timeout=remaining_timeout)
if process.is_alive():
logger.warn("Worker process %s did not exit gracefully, force killing it.", process.pid)
try:
process.kill()
except ProcessLookupError:
pass
process.join()


def run_processes(
url: str,
Expand Down Expand Up @@ -51,8 +85,7 @@ def run_processes(

def terminating_signal_handler(_sig, _frame):
logger.info("Terminating server!!", bold=True)
for process in process_pool:
process.kill()
_terminate_process_pool(process_pool)

signal.signal(signal.SIGINT, terminating_signal_handler)
signal.signal(signal.SIGTERM, terminating_signal_handler)
Expand Down Expand Up @@ -177,6 +210,7 @@ def spawn_process(
"""

loop = initialize_event_loop()
_register_graceful_shutdown_handler()

server = Server()

Expand Down
26 changes: 23 additions & 3 deletions robyn/reloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from robyn.logger import Colors, logger

GRACEFUL_SHUTDOWN_TIMEOUT = 10


def compile_rust_files(directory_path: str) -> list[str]:
rust_files = glob.glob(os.path.join(directory_path, "**/*.rs"), recursive=True)
Expand Down Expand Up @@ -92,6 +94,7 @@ def setup_reloader(directory_path: str, file_path: str) -> None:

def terminating_signal_handler(_sig, _frame):
event_handler.stop_server()
event_handler.wait_for_server_shutdown()
logger.info("Terminating reloader", bold=True)
observer.stop()
observer.join()
Expand All @@ -109,7 +112,7 @@ def terminating_signal_handler(_sig, _frame):
finally:
observer.stop()
observer.join()
event_handler.process.wait()
event_handler.wait_for_server_shutdown()


class EventHandler(FileSystemEventHandler):
Expand All @@ -123,7 +126,24 @@ def __init__(self, file_path: str, directory_path: str) -> None:

def stop_server(self) -> None:
if self.process:
os.kill(self.process.pid, signal.SIGTERM) # Stop the subprocess using os.kill()
try:
self.process.terminate()
except ProcessLookupError:
pass

def wait_for_server_shutdown(self) -> None:
if not self.process:
return

try:
self.process.wait(timeout=GRACEFUL_SHUTDOWN_TIMEOUT)
except subprocess.TimeoutExpired:
logger.warn("Server process %s did not exit gracefully, force killing it.", self.process.pid)
try:
self.process.kill()
except ProcessLookupError:
pass
self.process.wait()

def reload(self) -> None:
self.stop_server()
Expand All @@ -140,7 +160,7 @@ def reload(self) -> None:

prev_process = self.process
if prev_process:
prev_process.kill()
self.wait_for_server_shutdown()

self.process = subprocess.Popen(
[sys.executable, *arguments],
Expand Down