Skip to content

Commit 5962ce6

Browse files
committed
feat: add function to compute queue load factor
Signed-off-by: Keshav Priyadarshi <git@keshav.space>
1 parent 7479d40 commit 5962ce6

2 files changed

Lines changed: 65 additions & 7 deletions

File tree

vulnerabilities/models.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2387,6 +2387,11 @@ def all_runs(self):
23872387
def latest_run(self):
23882388
return self.pipelineruns.first() if self.pipelineruns.exists() else None
23892389

2390+
@property
2391+
def latest_successful_run(self):
2392+
successful_runs = self.pipelineruns.filter(run_end_date__isnull=False, run_exitcode=0)
2393+
return successful_runs.first() if successful_runs.exists() else None
2394+
23902395
@property
23912396
def earliest_run(self):
23922397
return self.pipelineruns.earliest("run_start_date") if self.pipelineruns.exists() else None

vulnerabilities/tasks.py

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,23 @@
99

1010

1111
import logging
12+
from collections import Counter
13+
from contextlib import suppress
1214
from io import StringIO
1315
from traceback import format_exc as traceback_format_exc
1416

1517
import django_rq
18+
from redis.exceptions import ConnectionError
19+
from rq import Worker
1620

1721
from vulnerabilities import models
1822
from vulnerabilities.importer import Importer
1923
from vulnerabilities.improver import Improver
24+
from vulnerablecode.settings import RQ_QUEUES
2025

2126
logger = logging.getLogger(__name__)
2227

23-
default_queue = django_rq.get_queue("default")
24-
high_queue = django_rq.get_queue("high")
25-
26-
queues = {
27-
"default": django_rq.get_queue("default"),
28-
"high": django_rq.get_queue("high"),
29-
}
28+
queues = {queue: django_rq.get_queue(queue) for queue in RQ_QUEUES.keys()}
3029

3130

3231
def execute_pipeline(pipeline_id, run_id):
@@ -151,3 +150,57 @@ def dequeue_job(job_id):
151150
for queue in queues.values():
152151
if job_id in queue.jobs:
153152
queue.remove(job_id)
153+
154+
155+
def compute_queue_load_factor():
156+
"""
157+
Compute worker load per queue.
158+
159+
Load factor is the ratio of the total compute required to run all active pipelines
160+
in a queue to the available worker capacity for that queue over a 24-hour period.
161+
A value greater than 1 indicates that the number of workers is insufficient to
162+
run all pipelines within the schedule.
163+
164+
Also compute the additional workers needed to balance each queue
165+
"""
166+
field = models.PipelineSchedule._meta.get_field("run_priority")
167+
label_to_value = {label: value for value, label in field.choices}
168+
total_compute_seconds_per_queue = {}
169+
worker_per_queue = {}
170+
load_per_queue = {}
171+
seconds_in_24_hr = 86400
172+
173+
for queue in RQ_QUEUES.keys():
174+
total_compute_seconds_per_queue[queue] = sum(
175+
(p.latest_successful_run.runtime / (p.run_interval / 24))
176+
for p in models.PipelineSchedule.objects.filter(
177+
is_active=True, run_priority=label_to_value[queue]
178+
)
179+
if p.latest_successful_run
180+
)
181+
182+
with suppress(ConnectionError):
183+
redis_conn = django_rq.get_connection()
184+
queue_names = [
185+
w.queue_names()[0] for w in Worker.all(connection=redis_conn) if w.queue_names()
186+
]
187+
worker_per_queue = dict(Counter(queue_names))
188+
189+
for queue_name, worker_count in worker_per_queue.items():
190+
total_compute = total_compute_seconds_per_queue.get(queue_name, 0)
191+
if worker_count == 0 or total_compute == 0:
192+
continue
193+
194+
unit_load_on_queue = total_compute / seconds_in_24_hr
195+
196+
num_of_worker_for_balanced_queue = round(unit_load_on_queue)
197+
addition_worker_needed = max(num_of_worker_for_balanced_queue - worker_count, 0)
198+
199+
net_load_on_queue = unit_load_on_queue / worker_count
200+
201+
load_per_queue[queue_name] = {
202+
"load_factor": net_load_on_queue,
203+
"additional_worker": addition_worker_needed,
204+
}
205+
206+
return dict(sorted(load_per_queue.items(), key=lambda x: x[0], reverse=True))

0 commit comments

Comments
 (0)