|
9 | 9 |
|
10 | 10 |
|
11 | 11 | import logging |
| 12 | +from collections import Counter |
| 13 | +from contextlib import suppress |
12 | 14 | from io import StringIO |
13 | 15 | from traceback import format_exc as traceback_format_exc |
14 | 16 |
|
15 | 17 | import django_rq |
| 18 | +from redis.exceptions import ConnectionError |
| 19 | +from rq import Worker |
16 | 20 |
|
17 | 21 | from vulnerabilities import models |
18 | 22 | from vulnerabilities.importer import Importer |
19 | 23 | from vulnerabilities.improver import Improver |
| 24 | +from vulnerablecode.settings import RQ_QUEUES |
20 | 25 |
|
21 | 26 | logger = logging.getLogger(__name__) |
22 | 27 |
|
23 | | -default_queue = django_rq.get_queue("default") |
24 | | -high_queue = django_rq.get_queue("high") |
25 | | - |
26 | | -queues = { |
27 | | - "default": django_rq.get_queue("default"), |
28 | | - "high": django_rq.get_queue("high"), |
29 | | -} |
| 28 | +queues = {queue: django_rq.get_queue(queue) for queue in RQ_QUEUES.keys()} |
30 | 29 |
|
31 | 30 |
|
32 | 31 | def execute_pipeline(pipeline_id, run_id): |
@@ -151,3 +150,61 @@ def dequeue_job(job_id): |
151 | 150 | for queue in queues.values(): |
152 | 151 | if job_id in queue.jobs: |
153 | 152 | queue.remove(job_id) |
| 153 | + |
| 154 | + |
| 155 | +def compute_queue_load_factor(): |
| 156 | + """ |
| 157 | + Compute worker load per queue. |
| 158 | +
|
| 159 | + Load factor is the ratio of the total compute required to run all active pipelines |
| 160 | + in a queue to the available worker capacity for that queue over a 24-hour period. |
| 161 | + A value greater than 1 indicates that the number of workers is insufficient to |
| 162 | + run all pipelines within the schedule. |
| 163 | +
|
| 164 | + Also compute the additional workers needed to balance each queue |
| 165 | + """ |
| 166 | + field = models.PipelineSchedule._meta.get_field("run_priority") |
| 167 | + label_to_value = {label: value for value, label in field.choices} |
| 168 | + total_compute_seconds_per_queue = {} |
| 169 | + worker_per_queue = {} |
| 170 | + load_per_queue = {} |
| 171 | + seconds_in_24_hr = 86400 |
| 172 | + |
| 173 | + with suppress(ConnectionError): |
| 174 | + redis_conn = django_rq.get_connection() |
| 175 | + queue_names = [ |
| 176 | + w.queue_names()[0] for w in Worker.all(connection=redis_conn) if w.queue_names() |
| 177 | + ] |
| 178 | + worker_per_queue = dict(Counter(queue_names)) |
| 179 | + |
| 180 | + for queue in RQ_QUEUES.keys(): |
| 181 | + total_compute_seconds_per_queue[queue] = sum( |
| 182 | + (p.latest_successful_run.runtime / (p.run_interval / 24)) |
| 183 | + for p in models.PipelineSchedule.objects.filter( |
| 184 | + is_active=True, run_priority=label_to_value[queue] |
| 185 | + ) |
| 186 | + if p.latest_successful_run |
| 187 | + ) |
| 188 | + if queue not in worker_per_queue: |
| 189 | + worker_per_queue[queue] = 0 |
| 190 | + |
| 191 | + for queue_name, worker_count in worker_per_queue.items(): |
| 192 | + net_load_on_queue = "no_worker" |
| 193 | + total_compute = total_compute_seconds_per_queue.get(queue_name, 0) |
| 194 | + if total_compute == 0: |
| 195 | + continue |
| 196 | + |
| 197 | + unit_load_on_queue = total_compute / seconds_in_24_hr |
| 198 | + |
| 199 | + num_of_worker_for_balanced_queue = round(unit_load_on_queue) |
| 200 | + addition_worker_needed = max(num_of_worker_for_balanced_queue - worker_count, 0) |
| 201 | + |
| 202 | + if worker_count > 0: |
| 203 | + net_load_on_queue = unit_load_on_queue / worker_count |
| 204 | + |
| 205 | + load_per_queue[queue_name] = { |
| 206 | + "load_factor": net_load_on_queue, |
| 207 | + "additional_worker": addition_worker_needed, |
| 208 | + } |
| 209 | + |
| 210 | + return dict(sorted(load_per_queue.items(), key=lambda x: x[0], reverse=True)) |
0 commit comments