Skip to content

Commit ba0e8a8

Browse files
sbidoulsbejaoui
authored andcommitted
[IMP] queue_job: prevent commit during queue job execution
This would release the job lock, causing spurious restarts by the dead jobs requeuer.
1 parent c25e2d1 commit ba0e8a8

2 files changed

Lines changed: 52 additions & 10 deletions

File tree

queue_job/controllers/main.py

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import random
77
import time
88
import traceback
9+
from contextlib import contextmanager
910
from io import StringIO
1011

1112
from psycopg2 import OperationalError, errorcodes
@@ -25,6 +26,29 @@
2526
DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE = 5
2627

2728

29+
@contextmanager
30+
def _prevent_commit(cr):
31+
"""Context manager to prevent commits on a cursor.
32+
33+
Commiting while the job is not finished would release the job lock, causing
34+
it to be started again by the dead jobs requeuer.
35+
"""
36+
37+
def forbidden_commit(*args, **kwargs):
38+
raise RuntimeError(
39+
"Commit is forbidden in queue jobs. "
40+
"If the current job is a cron running as queue job, "
41+
"modify it to run as a normal cron."
42+
)
43+
44+
original_commit = cr.commit
45+
cr.commit = forbidden_commit
46+
try:
47+
yield
48+
finally:
49+
cr.commit = original_commit
50+
51+
2852
class RunJobController(http.Controller):
2953
@classmethod
3054
def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None:
@@ -68,13 +92,16 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None:
6892
def _try_perform_job(cls, env, job):
6993
"""Try to perform the job, mark it done and commit if successful."""
7094
_logger.debug("%s started", job)
71-
job.perform()
72-
# Triggers any stored computed fields before calling 'set_done'
73-
# so that will be part of the 'exec_time'
74-
env.flush_all()
75-
job.set_done()
76-
job.store()
77-
env.flush_all()
95+
# TODO refactor, the relation between env and job.env is not clear
96+
assert env.cr is job.env.cr
97+
with _prevent_commit(env.cr):
98+
job.perform()
99+
# Triggers any stored computed fields before calling 'set_done'
100+
# so that will be part of the 'exec_time'
101+
env.flush_all()
102+
job.set_done()
103+
job.store()
104+
env.flush_all()
78105
env.cr.commit()
79106
_logger.debug("%s done", job)
80107

@@ -211,6 +238,7 @@ def create_test_job(
211238
size=1,
212239
failure_rate=0,
213240
job_duration=0,
241+
commit_within_job=False,
214242
):
215243
"""Create test jobs
216244
@@ -266,6 +294,7 @@ def create_test_job(
266294
description=description,
267295
failure_rate=failure_rate,
268296
job_duration=job_duration,
297+
commit_within_job=commit_within_job,
269298
)
270299

271300
if size > 1:
@@ -277,6 +306,7 @@ def create_test_job(
277306
description=description,
278307
failure_rate=failure_rate,
279308
job_duration=job_duration,
309+
commit_within_job=commit_within_job,
280310
)
281311
return ""
282312

@@ -289,6 +319,7 @@ def _create_single_test_job(
289319
size=1,
290320
failure_rate=0,
291321
job_duration=0,
322+
commit_within_job=False,
292323
):
293324
delayed = (
294325
http.request.env["queue.job"]
@@ -298,7 +329,11 @@ def _create_single_test_job(
298329
channel=channel,
299330
description=description,
300331
)
301-
._test_job(failure_rate=failure_rate, job_duration=job_duration)
332+
._test_job(
333+
failure_rate=failure_rate,
334+
job_duration=job_duration,
335+
commit_within_job=commit_within_job,
336+
)
302337
)
303338
return "job uuid: %s" % (delayed.db_record().uuid,)
304339

@@ -313,6 +348,7 @@ def _create_graph_test_jobs(
313348
description="Test job",
314349
failure_rate=0,
315350
job_duration=0,
351+
commit_within_job=False,
316352
):
317353
model = http.request.env["queue.job"]
318354
current_count = 0
@@ -335,7 +371,11 @@ def _create_graph_test_jobs(
335371
max_retries=max_retries,
336372
channel=channel,
337373
description="%s #%d" % (description, current_count),
338-
)._test_job(failure_rate=failure_rate, job_duration=job_duration)
374+
)._test_job(
375+
failure_rate=failure_rate,
376+
job_duration=job_duration,
377+
commit_within_job=commit_within_job,
378+
)
339379
)
340380

341381
grouping = random.choice(possible_grouping_methods)

queue_job/models/queue_job.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,9 +459,11 @@ def related_action_open_record(self):
459459
)
460460
return action
461461

462-
def _test_job(self, failure_rate=0, job_duration=0):
462+
def _test_job(self, failure_rate=0, job_duration=0, commit_within_job=False):
463463
_logger.info("Running test job.")
464464
if random.random() <= failure_rate:
465465
raise JobError("Job failed")
466466
if job_duration:
467467
time.sleep(job_duration)
468+
if commit_within_job:
469+
self.env.cr.commit() # pylint: disable=invalid-commit

0 commit comments

Comments
 (0)