Skip to content

Commit 0b64d37

Browse files
committed
Use session-level advisory lock for job locking
1 parent 0c95fd2 commit 0b64d37

3 files changed

Lines changed: 43 additions & 68 deletions

File tree

queue_job/controllers/main.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,15 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None:
7878
return None
7979
job = Job.load(env, job_uuid)
8080
assert job and job.state == ENQUEUED
81-
job.set_started()
82-
job.store()
83-
env.cr.commit()
8481
if not job.lock():
8582
_logger.warning(
8683
"was requested to run job %s, but it could not be locked",
8784
job_uuid,
8885
)
8986
return None
87+
job.set_started()
88+
job.store()
89+
env.cr.commit()
9090
return job
9191

9292
@classmethod
@@ -218,7 +218,10 @@ def runjob(self, db, job_uuid, **kw):
218218
job = self._acquire_job(env, job_uuid)
219219
if not job:
220220
return ""
221-
self._runjob(env, job)
221+
try:
222+
self._runjob(env, job)
223+
finally:
224+
job.unlock()
222225
return ""
223226

224227
# flake8: noqa: C901

queue_job/job.py

Lines changed: 21 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
DEFAULT_MAX_RETRIES = 5
3838
RETRY_INTERVAL = 10 * 60 # seconds
3939

40+
PG_ADVISORY_EXECUTION_LOCK_ID = 965655780
41+
4042
_logger = logging.getLogger(__name__)
4143

4244

@@ -221,56 +223,31 @@ def load_many(cls, env, job_uuids):
221223
recordset = cls.db_records_from_uuids(env, job_uuids)
222224
return {cls._load_from_db_record(record) for record in recordset}
223225

224-
def add_lock_record(self) -> None:
225-
"""
226-
Create row in db to be locked while the job is being performed.
227-
"""
228-
self.env.cr.execute(
229-
"""
230-
INSERT INTO
231-
queue_job_lock (id, queue_job_id)
232-
SELECT
233-
id, id
234-
FROM
235-
queue_job
236-
WHERE
237-
uuid = %s
238-
ON CONFLICT(id)
239-
DO NOTHING;
240-
""",
241-
[self.uuid],
242-
)
243-
244226
def lock(self) -> bool:
245-
"""Lock row of job that is being performed.
227+
"""Lock job that is being performed using a session-level advisory lock.
246228
247229
Return False if a job cannot be locked: it means that the job is not in
248230
STARTED state or is already locked by another worker.
249231
"""
232+
# 2147483647 is the max value for int, which is the max value accepted
233+
# by pg_try_advisory_lock when using two arguments, as the job id might
234+
# be higher than that, we use a modulo to be sure to never exceed the limit.
235+
# A collision is highly unlikely because ids are sequential so a modulo
236+
# should not cause two different jobs to have the same lock id at the
237+
# same time. Even then, they would run one after the other.
250238
self.env.cr.execute(
251-
"""
252-
SELECT
253-
*
254-
FROM
255-
queue_job_lock
256-
WHERE
257-
queue_job_id in (
258-
SELECT
259-
id
260-
FROM
261-
queue_job
262-
WHERE
263-
uuid = %s
264-
AND state = %s
265-
)
266-
FOR NO KEY UPDATE SKIP LOCKED;
267-
""",
268-
[self.uuid, STARTED],
239+
"SELECT pg_try_advisory_lock(%s, (%s %% 2147483647)::integer)",
240+
(PG_ADVISORY_EXECUTION_LOCK_ID, self._record_id),
269241
)
270-
271-
# 1 job should be locked
272242
return bool(self.env.cr.fetchall())
273243

244+
def unlock(self) -> None:
245+
"""Release the session-level advisory lock."""
246+
self.env.cr.execute(
247+
"SELECT pg_advisory_unlock(%s, (%s %% 2147483647)::integer)",
248+
(PG_ADVISORY_EXECUTION_LOCK_ID, self._record_id),
249+
)
250+
274251
@classmethod
275252
def _load_from_db_record(cls, job_db_record):
276253
stored = job_db_record
@@ -298,6 +275,8 @@ def _load_from_db_record(cls, job_db_record):
298275
identity_key=stored.identity_key,
299276
)
300277

278+
job_._record_id = stored.id
279+
301280
if stored.date_created:
302281
job_.date_created = stored.date_created
303282

@@ -424,6 +403,7 @@ def __init__(
424403

425404
self._uuid = job_uuid
426405
self.graph_uuid = None
406+
self._record_id = None
427407

428408
self.args = args
429409
self.kwargs = kwargs
@@ -788,7 +768,6 @@ def set_started(self):
788768
self.state = STARTED
789769
self.date_started = datetime.now()
790770
self.worker_pid = os.getpid()
791-
self.add_lock_record()
792771

793772
def set_done(self, result=None):
794773
self.state = DONE

queue_job/jobrunner/runner.py

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import odoo
3434
from odoo.tools import config
3535

36+
from ..job import PG_ADVISORY_EXECUTION_LOCK_ID
3637
from . import queue_job_config
3738
from .channels import ENQUEUED, NOT_DONE, ChannelManager
3839

@@ -213,7 +214,8 @@ def set_job_enqueued(self, uuid):
213214
)
214215

215216
def _query_requeue_dead_jobs(self):
216-
return """
217+
return [
218+
"""
217219
UPDATE
218220
queue_job
219221
SET
@@ -255,27 +257,18 @@ def _query_requeue_dead_jobs(self):
255257
WHERE
256258
state IN ('enqueued','started')
257259
AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
258-
AND (
259-
id in (
260-
SELECT
261-
queue_job_id
262-
FROM
263-
queue_job_lock
264-
WHERE
265-
queue_job_lock.queue_job_id = queue_job.id
266-
FOR NO KEY UPDATE SKIP LOCKED
267-
)
268-
OR NOT EXISTS (
269-
SELECT
270-
1
271-
FROM
272-
queue_job_lock
273-
WHERE
274-
queue_job_lock.queue_job_id = queue_job.id
275-
)
260+
AND NOT EXISTS (
261+
SELECT 1
262+
FROM pg_locks
263+
WHERE locktype = 'advisory'
264+
AND classid = %s
265+
AND objid = (queue_job.id %% 2147483647)::integer
266+
AND granted = true
276267
)
277268
RETURNING uuid
278-
"""
269+
""",
270+
(PG_ADVISORY_EXECUTION_LOCK_ID,),
271+
]
279272

280273
def requeue_dead_jobs(self):
281274
"""
@@ -304,9 +297,9 @@ def requeue_dead_jobs(self):
304297
"""
305298

306299
with closing(self.conn.cursor()) as cr:
307-
query = self._query_requeue_dead_jobs()
300+
query, query_args = self._query_requeue_dead_jobs()
308301

309-
cr.execute(query)
302+
cr.execute(query, query_args)
310303

311304
for (uuid,) in cr.fetchall():
312305
_logger.warning("Re-queued dead job with uuid: %s", uuid)

0 commit comments

Comments
 (0)