Skip to content

Commit 848242b

Browse files
authored
Add liveness probe to services (#150)
Add a --liveness option to zocalo.service which starts a liveness endpoint on the port defined by --liveness-port This sends a message to the service, which, if alive, will return a message to the frontend confirming it is alive. If confirmation of aliveness is received the endpoint will return a 200 status code. This is intended for use as a livenessProbe when running services on Kubernetes. It should force a container restart in the event that a pod is blocked indefinitely.
1 parent 63dd683 commit 848242b

5 files changed

Lines changed: 71 additions & 3 deletions

File tree

src/workflows/contrib/start_service.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,31 @@ def run(
8282
help="Name of the service to start. Known services: "
8383
+ ", ".join(known_services),
8484
)
85+
parser.add_option(
86+
"--liveness",
87+
dest="liveness",
88+
action="store_true",
89+
default=False,
90+
help=(
91+
"Expose a liveness check endpoint for this service."
92+
"A return code of 200 indicates success."
93+
"A return code of 408 indicates failure."
94+
),
95+
)
96+
parser.add_option(
97+
"--liveness-port",
98+
dest="liveness_port",
99+
default=8000,
100+
type="int",
101+
help="Expose liveness check endpoint on this port.",
102+
)
103+
parser.add_option(
104+
"--liveness-timeout",
105+
dest="liveness_timeout",
106+
default=30,
107+
type="float",
108+
help="Timeout for the liveness check (in seconds).",
109+
)
85110
if add_metrics_option:
86111
parser.add_option(
87112
"-m",
@@ -152,6 +177,12 @@ def on_transport_preparation_hook():
152177
if add_metrics_option and options.metrics:
153178
kwargs["environment"]["metrics"] = {"port": options.metrics_port}
154179

180+
if options.liveness:
181+
kwargs["environment"]["liveness"] = {
182+
"port": options.liveness_port,
183+
"timeout": options.liveness_timeout,
184+
}
185+
155186
# Call before_frontend_construction hook
156187
kwargs = self.before_frontend_construction(kwargs) or kwargs
157188

src/workflows/frontend/__init__.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ def __getitem__(self, key):
128128
else:
129129
self.update_status()
130130

131+
if environment and "liveness" in environment:
132+
self._start_liveness_endpoint(environment["liveness"]["port"])
133+
131134
def update_status(self, status_code=None):
132135
"""Update the service status kept inside the frontend (_service_status).
133136
The status is broadcast over the network immediately. If the status
@@ -339,6 +342,11 @@ def parse_band_status_update(self, message):
339342
self.log.debug("Status update: " + str(message))
340343
self.update_status(status_code=message["statuscode"])
341344

345+
def parse_band_liveness_check(self, message):
346+
"""Respond by sending message to backend to let it know we are alive."""
347+
self.log.debug("Service liveness check: alive!")
348+
self.__alive = True
349+
342350
def get_host_id(self):
343351
"""Get a cached copy of the host id."""
344352
return self.__hostid
@@ -451,3 +459,27 @@ def _terminate_service(self):
451459
if self._service:
452460
self._service.join() # must wait for process to be actually destroyed
453461
self._service = None
462+
463+
def _start_liveness_endpoint(self, port: int):
464+
from wsgiref.simple_server import make_server
465+
466+
def alive(environ, start_response):
467+
self.__alive = False
468+
self.send_command({"band": "command", "payload": "liveness_check"})
469+
470+
while True:
471+
if self.__alive:
472+
status = "200 OK"
473+
headers = [
474+
("Content-type", "text/plain; charset=utf-8")
475+
] # HTTP Headers
476+
start_response(status, headers)
477+
return [b"ok"]
478+
479+
httpd = make_server("", port, alive)
480+
self.log.debug(f"Serving liveness check on port {port}...")
481+
482+
# Serve until process is killed
483+
t = threading.Thread(target=httpd.serve_forever)
484+
t.daemon = True
485+
t.start()

src/workflows/services/common_service.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,6 @@ def start_transport(self):
200200
port = metrics["port"]
201201
self.log.debug(f"Starting metrics endpoint on port {port}")
202202
prometheus_client.start_http_server(port=port)
203-
204203
else:
205204
self.log.debug("No transport layer defined for service. Skipping.")
206205

@@ -500,9 +499,12 @@ def __process_command(self, command):
500499
"""Process an incoming command message from the frontend."""
501500
if command == Commands.SHUTDOWN:
502501
self.__shutdown = True
502+
elif command == Commands.LIVENESS_CHECK:
503+
self.__send_to_frontend({"band": "liveness_check", "payload": "alive"})
503504

504505

505506
class Commands:
506507
"""A list of command strings used for communicating with the frontend."""
507508

508509
SHUTDOWN = "shutdown"
510+
LIVENESS_CHECK = "liveness_check"

tests/contrib/test_start_service.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def test_script_initialises_transport_and_starts_frontend(
2929
mock_options = mock.Mock()
3030
mock_options.service = "someservice"
3131
mock_options.transport = mock.sentinel.transport
32+
mock_options.liveness = False
3233
mock_parser.return_value.parse_args.return_value = (mock_options, mock.Mock())
3334
mock_services.get_known_services.return_value = {"SomeService": None}
3435

@@ -58,6 +59,7 @@ def test_add_metrics_option(mock_services, mock_frontend, mock_tlookup, mock_par
5859
mock_options = mock.Mock()
5960
mock_options.service = "someservice"
6061
mock_options.transport = mock.sentinel.transport
62+
mock_options.liveness = False
6163
mock_options.metrics = False
6264
mock_options.metrics_port = 4242
6365
mock_parser.return_value.parse_args.return_value = (mock_options, mock.Mock())

tests/frontend/test_frontend.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,14 @@ def test_start_service_in_frontend(mock_transport, mock_mp):
141141
mock_mp.Pipe.side_effect = [(pipes[0], pipes[1]), (pipes[2], pipes[3]), None]
142142

143143
# initialize frontend
144-
fe = workflows.frontend.Frontend(environment=mock.sentinel.environment)
144+
environment = {mock.sentinel.foo: mock.sentinel.bar}
145+
fe = workflows.frontend.Frontend(environment=environment)
145146

146147
# start service
147148
fe.switch_service(mock_service)
148149

149150
# check service was started properly
150-
mock_service.assert_called_once_with(environment=mock.sentinel.environment)
151+
mock_service.assert_called_once_with(environment=environment)
151152
mock_service.return_value.connect.assert_called_once_with(
152153
commands=pipes[0], frontend=pipes[3]
153154
)

0 commit comments

Comments
 (0)