Description
We are seeing multiple dropped connections when updating our flask app running on gunicorn with sync workers running in kubernetes.
As far as I understand, this happens because Kubernetes sends the Pod a TERM
signal, which the master catches, and re-sends to its (sync) workers. These then use the grace period to complete one last request and then exit. When the last worker has exited, the master exits as well, closing the listening socket.
This is already nice, as no half finished requests are killed. However, this leads to all requests in the socket backlog that have not been accepted by a worker (which stopped calling accept when they got the TERM
signal) to be dropped.
I guess this behavior was introduced for a workload where the master server forks a new version of the master server, which then forks new workers, and then the old workers and master server kill themselves after handling all in flight requests.
However, in a deployment scenario (with or without kubernetes) where servers are deployed on different machines and switch machines on upgrade this will almost surely lead to dropped connections.
So I'm proposing an option, to redefine how the graceful timeout period is used, so it does not only finish the current in flight request, but also drains the request queue of the listening socket before the worker processes kill themselves.
There are some workarounds, for example this bug advises delaying sending the term signal for graceful shutdown until the socket backlog has been drained. This probably works (still need to test that), and can probably be improved on by using something like ss
to look at the socket backlog and waiting until it is empty. However that is still a lot of effort for a workaround for something that would be much better handled by gunicorn itself.
Here's a proof of concept:
diff --git a/gunicorn/arbiter.py b/gunicorn/arbiter.py
index 646d684e..9b2856f8 100644
--- a/gunicorn/arbiter.py
+++ b/gunicorn/arbiter.py
@@ -255,6 +255,8 @@ class Arbiter:
def handle_term(self):
"SIGTERM handling"
+ if self.cfg.drain_on_term:
+ self.stop(graceful=True, drain=True)
raise StopIteration
def handle_int(self):
@@ -371,7 +373,7 @@ class Arbiter:
except KeyboardInterrupt:
sys.exit()
- def stop(self, graceful=True):
+ def stop(self, graceful=True, drain=False):
"""\
Stop workers
@@ -383,6 +385,13 @@ class Arbiter:
and not self.systemd
and not self.cfg.reuse_port
)
+ if drain:
+ sig = signal.SIGTERM
+ self.kill_workers(sig)
+ limit = time.time() + self.cfg.graceful_timeout
+ while self.WORKERS and time.time() < limit:
+ time.sleep(0.1)
+
sock.close_sockets(self.LISTENERS, unlink)
self.LISTENERS = []
diff --git a/gunicorn/config.py b/gunicorn/config.py
index 07c5aab3..83205047 100644
--- a/gunicorn/config.py
+++ b/gunicorn/config.py
@@ -814,6 +814,16 @@ class GracefulTimeout(Setting):
the receipt of the restart signal) are force killed.
"""
+class DrainOnTerm(Setting):
+ name = "drain_on_term"
+ section = "Worker Processes"
+ cli = ["--drain-on-term"]
+ validator = validate_bool
+ action = "store_true"
+ default = False
+ desc = """\
+ Drain the socket backlog before exiting the worker on SIGTERM
+ """
class Keepalive(Setting):
name = "keepalive"
diff --git a/gunicorn/workers/base.py b/gunicorn/workers/base.py
index 93c465c9..dccc98ca 100644
--- a/gunicorn/workers/base.py
+++ b/gunicorn/workers/base.py
@@ -51,6 +51,7 @@ class Worker:
self.booted = False
self.aborted = False
self.reloader = None
+ self.is_draining = False
self.nr = 0
@@ -189,7 +190,11 @@ class Worker:
self.log.reopen_files()
def handle_exit(self, sig, frame):
- self.alive = False
+ if self.cfg.drain_on_term:
+ self.is_draining = True
+ os.write(self.PIPE[1], b"1")
+ else:
+ self.alive = False
def handle_quit(self, sig, frame):
self.alive = False
diff --git a/gunicorn/workers/sync.py b/gunicorn/workers/sync.py
index 4c029f91..1c33ee36 100644
--- a/gunicorn/workers/sync.py
+++ b/gunicorn/workers/sync.py
@@ -38,6 +38,10 @@ class SyncWorker(base.Worker):
if self.PIPE[0] in ret[0]:
os.read(self.PIPE[0], 1)
return ret[0]
+ elif self.is_draining:
+ self.alive = False
+ # timeout happened, if draining the worker should exit
+ raise StopWaiting
except OSError as e:
if e.args[0] == errno.EINTR:
@@ -80,6 +84,8 @@ class SyncWorker(base.Worker):
if not self.is_parent_alive():
return
+ if self.is_draining:
+ return
try:
self.wait(timeout)
except StopWaiting: