From 7a6e94b9a0f8d9f6b0b3f2532cd80204307d442f Mon Sep 17 00:00:00 2001 From: Adam Heinz Date: Mon, 7 Jul 2025 09:22:18 -0400 Subject: [PATCH 01/29] [FIX] queue_job: prevent conflict w/ TestOverrides:test_creates This prevents TestOverrides.test_creates from failing in the Odoo `base` module due to sentinel protections taking effect even for local create invocations. --- queue_job/models/queue_job.py | 7 +---- .../tests/test_queue_job_protected_write.py | 26 ++++++++++++++++--- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index a3fafff0ae..55aa14e739 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -241,13 +241,8 @@ def _compute_graph_jobs_count(self): record.graph_jobs_count = count_per_graph_uuid.get(record.graph_uuid) or 0 @api.model_create_multi + @api.private def create(self, vals_list): - if self.env.context.get("_job_edit_sentinel") is not self.EDIT_SENTINEL: - # Prevent to create a queue.job record "raw" from RPC. - # ``with_delay()`` must be used. - raise exceptions.AccessError( - _("Queue jobs must be created by calling 'with_delay()'.") - ) return super( QueueJob, self.with_context(mail_create_nolog=True, mail_create_nosubscribe=True), diff --git a/queue_job/tests/test_queue_job_protected_write.py b/queue_job/tests/test_queue_job_protected_write.py index 018b3f23f4..eadb16ab9c 100644 --- a/queue_job/tests/test_queue_job_protected_write.py +++ b/queue_job/tests/test_queue_job_protected_write.py @@ -3,15 +3,33 @@ from odoo import exceptions from odoo.tests import common +from odoo.tools import mute_logger -class TestJobWriteProtected(common.TransactionCase): +class TestJobCreatePrivate(common.HttpCase): def test_create_error(self): - with self.assertRaises(exceptions.AccessError): - self.env["queue.job"].create( - {"uuid": "test", "model_name": "res.partner", "method_name": "write"} + self.authenticate("admin", "admin") + with self.assertRaises(common.JsonRpcException) as cm, mute_logger("odoo.http"): + self.make_jsonrpc_request( + "/web/dataset/call_kw", + params={ + "model": "queue.job", + "method": "create", + "args": [], + "kwargs": { + "method_name": "write", + "model_name": "res.partner", + "uuid": "test", + }, + }, + headers={ + "Cookie": f"session_id={self.session.sid};", + }, ) + self.assertEqual("odoo.exceptions.AccessError", str(cm.exception)) + +class TestJobWriteProtected(common.TransactionCase): def test_write_protected_field_error(self): job_ = self.env["res.partner"].with_delay().create({"name": "test"}) db_job = job_.db_record() From f16afac384be4a3473c9fc568ba87b21b4e79078 Mon Sep 17 00:00:00 2001 From: Zina Rasoamanana Date: Tue, 3 Jun 2025 16:11:21 +0200 Subject: [PATCH 02/29] queue_job: remove DB commits within test of requeue --- .../pre-migration.py | 0 test_queue_job/__manifest__.py | 1 + test_queue_job/data/queue_job_test_job.xml | 18 ++++ test_queue_job/models/test_models.py | 31 ++++++ test_queue_job/tests/__init__.py | 1 + test_queue_job/tests/test_autovacuum.py | 20 +++- test_queue_job/tests/test_requeue_dead_job.py | 101 ++++++++++++++++++ 7 files changed, 167 insertions(+), 5 deletions(-) rename queue_job/migrations/{17.0.1.3.1 => 17.0.1.6.0}/pre-migration.py (100%) create mode 100644 test_queue_job/data/queue_job_test_job.xml create mode 100644 test_queue_job/tests/test_requeue_dead_job.py diff --git a/queue_job/migrations/17.0.1.3.1/pre-migration.py b/queue_job/migrations/17.0.1.6.0/pre-migration.py similarity index 100% rename from queue_job/migrations/17.0.1.3.1/pre-migration.py rename to queue_job/migrations/17.0.1.6.0/pre-migration.py diff --git a/test_queue_job/__manifest__.py b/test_queue_job/__manifest__.py index 2e5fa8d44e..9784bd69fe 100644 --- a/test_queue_job/__manifest__.py +++ b/test_queue_job/__manifest__.py @@ -13,6 +13,7 @@ "data/queue_job_channel_data.xml", "data/queue_job_function_data.xml", "security/ir.model.access.csv", + "data/queue_job_test_job.xml", ], "installable": True, } diff --git a/test_queue_job/data/queue_job_test_job.xml b/test_queue_job/data/queue_job_test_job.xml new file mode 100644 index 0000000000..8a28ab70a0 --- /dev/null +++ b/test_queue_job/data/queue_job_test_job.xml @@ -0,0 +1,18 @@ + + + + + + diff --git a/test_queue_job/models/test_models.py b/test_queue_job/models/test_models.py index 4c0dd6b2d3..03e8e8a8f9 100644 --- a/test_queue_job/models/test_models.py +++ b/test_queue_job/models/test_models.py @@ -1,6 +1,8 @@ # Copyright 2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +from datetime import datetime, timedelta + from odoo import api, fields, models from odoo.addons.queue_job.delay import chain @@ -28,6 +30,35 @@ def testing_related__url(self, **kwargs): "url": kwargs["url"].format(subject=subject), } + @api.model + def _create_test_started_job(self, uuid): + """Create started jobs to be used within tests""" + self.env["queue.job"].with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ).create( + { + "uuid": uuid, + "state": "started", + "model_name": "queue.job", + "method_name": "write", + } + ) + + @api.model + def _create_test_enqueued_job(self, uuid): + """Create enqueued jobs to be used within tests""" + self.env["queue.job"].with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ).create( + { + "uuid": uuid, + "state": "enqueued", + "model_name": "queue.job", + "method_name": "write", + "date_enqueued": datetime.now() - timedelta(minutes=1), + } + ) + class ModelTestQueueJob(models.Model): _name = "test.queue.job" diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index 0405022ce0..62347148e5 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -7,3 +7,4 @@ from . import test_job_function from . import test_related_actions from . import test_delay_mocks +from . import test_requeue_dead_job diff --git a/test_queue_job/tests/test_autovacuum.py b/test_queue_job/tests/test_autovacuum.py index 09730a4fea..97aebcba1e 100644 --- a/test_queue_job/tests/test_autovacuum.py +++ b/test_queue_job/tests/test_autovacuum.py @@ -28,12 +28,16 @@ def test_autovacuum(self): date_done = datetime.now() - timedelta(days=29) stored.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 1) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 1 + ) date_done = datetime.now() - timedelta(days=31) stored.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 0) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 0 + ) def test_autovacuum_multi_channel(self): root_channel = self.env.ref("queue_job.channel_root") @@ -48,11 +52,17 @@ def test_autovacuum_multi_channel(self): {"channel": channel_60days.complete_name, "date_done": date_done} ) - self.assertEqual(len(self.env["queue.job"].search([])), 2) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 2 + ) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 1) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 1 + ) date_done = datetime.now() - timedelta(days=61) job_60days.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 0) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 0 + ) diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py new file mode 100644 index 0000000000..a6328fed76 --- /dev/null +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -0,0 +1,101 @@ +# Copyright 2025 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +from contextlib import closing +from datetime import datetime, timedelta + +from odoo.tests import tagged + +from odoo.addons.queue_job.job import Job +from odoo.addons.queue_job.jobrunner.runner import Database + +from .common import JobCommonCase + + +@tagged("post_install", "-at_install") +class TestRequeueDeadJob(JobCommonCase): + def _get_demo_job(self, uuid): + # job created during load of demo data + job = self.env["queue.job"].search( + [ + ("uuid", "=", uuid), + ], + limit=1, + ) + + self.assertTrue( + job, + f"Demo data queue job {uuid} should be loaded in order" + " to make this tests work", + ) + + return job + + def get_locks(self, uuid, cr=None): + """ + Retrieve lock rows + """ + if cr is None: + cr = self.env.cr + + cr.execute( + """ + SELECT + queue_job_id + FROM + queue_job_lock + WHERE + queue_job_id IN ( + SELECT + id + FROM + queue_job + WHERE + uuid = %s + ) + FOR UPDATE SKIP LOCKED + """, + [uuid], + ) + + return cr.fetchall() + + def test_add_lock_record(self): + queue_job = self._get_demo_job("test_started_job") + self.assertEqual(len(queue_job), 1) + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_started() + self.assertEqual(job_obj.state, "started") + + locks = self.get_locks(job_obj.uuid) + + self.assertEqual(1, len(locks)) + + def test_lock(self): + queue_job = self._get_demo_job("test_started_job") + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_started() + job_obj.lock() + + with closing(self.env.registry.cursor()) as new_cr: + locks = self.get_locks(job_obj.uuid, new_cr) + + # Row should be locked + self.assertEqual(0, len(locks)) + + def test_requeue_dead_jobs(self): + queue_job = self._get_demo_job("test_enqueued_job") + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_enqueued() + job_obj.set_started() + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.store() + + # requeue dead jobs using current cursor + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + + uuids_requeued = self.env.cr.fetchall() + self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued) From 86356d4029ebf261f8908ec8a32ee3a6d28af3b9 Mon Sep 17 00:00:00 2001 From: Zina Rasoamanana Date: Mon, 15 Sep 2025 10:45:39 +0200 Subject: [PATCH 03/29] [RMV] queue_job: remove test_requeue_dead_job --- queue_job/tests/test_requeue_dead_job.py | 158 ----------------------- 1 file changed, 158 deletions(-) delete mode 100644 queue_job/tests/test_requeue_dead_job.py diff --git a/queue_job/tests/test_requeue_dead_job.py b/queue_job/tests/test_requeue_dead_job.py deleted file mode 100644 index 180e1294eb..0000000000 --- a/queue_job/tests/test_requeue_dead_job.py +++ /dev/null @@ -1,158 +0,0 @@ -# Copyright 2025 ACSONE SA/NV -# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). -from contextlib import closing -from datetime import datetime, timedelta - -from odoo.tests.common import TransactionCase - -from odoo.addons.queue_job.job import Job -from odoo.addons.queue_job.jobrunner.runner import Database - - -class TestRequeueDeadJob(TransactionCase): - def create_dummy_job(self, uuid): - """ - Create dummy job for tests - """ - return ( - self.env["queue.job"] - .with_context( - _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, - ) - .create( - { - "uuid": uuid, - "user_id": self.env.user.id, - "state": "pending", - "model_name": "queue.job", - "method_name": "write", - } - ) - ) - - def get_locks(self, uuid, cr=None): - """ - Retrieve lock rows - """ - if cr is None: - cr = self.env.cr - - cr.execute( - """ - SELECT - queue_job_id - FROM - queue_job_lock - WHERE - queue_job_id IN ( - SELECT - id - FROM - queue_job - WHERE - uuid = %s - ) - FOR UPDATE SKIP LOCKED - """, - [uuid], - ) - - return cr.fetchall() - - def test_add_lock_record(self): - queue_job = self.create_dummy_job("test_add_lock") - job_obj = Job.load(self.env, queue_job.uuid) - - job_obj.set_started() - self.assertEqual(job_obj.state, "started") - - locks = self.get_locks(job_obj.uuid) - - self.assertEqual(1, len(locks)) - - def test_lock(self): - queue_job = self.create_dummy_job("test_lock") - job_obj = Job.load(self.env, queue_job.uuid) - - job_obj.set_started() - job_obj.store() - - locks = self.get_locks(job_obj.uuid) - - self.assertEqual(1, len(locks)) - - # commit to update queue_job records in DB - self.env.cr.commit() # pylint: disable=E8102 - - job_obj.lock() - - with closing(self.env.registry.cursor()) as new_cr: - locks = self.get_locks(job_obj.uuid, new_cr) - - # Row should be locked - self.assertEqual(0, len(locks)) - - # clean up - queue_job.unlink() - - self.env.cr.commit() # pylint: disable=E8102 - - # because we committed the cursor, the savepoint of the test method is - # gone, and this would break TransactionCase cleanups - self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) - - def test_requeue_dead_jobs(self): - uuid = "test_requeue_dead_jobs" - - queue_job = self.create_dummy_job(uuid) - job_obj = Job.load(self.env, queue_job.uuid) - - job_obj.set_enqueued() - # simulate enqueuing was in the past - job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) - job_obj.set_started() - - job_obj.store() - self.env.cr.commit() # pylint: disable=E8102 - - # requeue dead jobs using current cursor - query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() - self.env.cr.execute(query) - - uuids_requeued = self.env.cr.fetchall() - - self.assertEqual(len(uuids_requeued), 1) - self.assertEqual(uuids_requeued[0][0], uuid) - - # clean up - queue_job.unlink() - self.env.cr.commit() # pylint: disable=E8102 - - # because we committed the cursor, the savepoint of the test method is - # gone, and this would break TransactionCase cleanups - self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) - - def test_requeue_orphaned_jobs(self): - uuid = "test_enqueued_job" - queue_job = self.create_dummy_job(uuid) - job_obj = Job.load(self.env, queue_job.uuid) - - # Only enqueued job, don't set it to started to simulate the scenario - # that system shutdown before job is starting - job_obj.set_enqueued() - job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) - job_obj.store() - - # job is now picked up by the requeue query (which includes orphaned jobs) - query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() - self.env.cr.execute(query) - uuids_requeued = self.env.cr.fetchall() - self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued) - - # clean up - queue_job.unlink() - self.env.cr.commit() # pylint: disable=E8102 - - # because we committed the cursor, the savepoint of the test method is - # gone, and this would break TransactionCase cleanups - self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) From a4084a585b0b84c6ef5214d166aa97ff6377e8ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Honor=C3=A9?= Date: Tue, 12 Aug 2025 14:20:30 +0200 Subject: [PATCH 04/29] [IMP] queue_job: avoid deprecation warning about datetime utcnow() datetime.datetime.utcnow() is now deprecated and should be replaced by datetime.datetime.now() (optional TZ parameter). As the original _odoo_now() doesn't contain the Timezone, the parameter datetime.UTC is not added into this improvement --- queue_job/jobrunner/runner.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 18a46222a7..2d4f802c66 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -123,7 +123,6 @@ of running Odoo is obviously not for production purposes. """ -import datetime import logging import os import selectors @@ -170,15 +169,10 @@ def _channels(): ) -def _datetime_to_epoch(dt): +def _odoo_now(): # important: this must return the same as postgresql # EXTRACT(EPOCH FROM TIMESTAMP dt) - return (dt - datetime.datetime(1970, 1, 1)).total_seconds() - - -def _odoo_now(): - dt = datetime.datetime.utcnow() - return _datetime_to_epoch(dt) + return time.time() def _connection_info_for(db_name): From db724d7e8d2727e7fbbc2eaac244360ef1f25e9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 22 Nov 2025 11:52:55 +0100 Subject: [PATCH 05/29] [DOC] queue_job: remove dead jobs caveat from roadmap The automatic dead jobs requeuer now works out of the box. --- queue_job/readme/ROADMAP.md | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/queue_job/readme/ROADMAP.md b/queue_job/readme/ROADMAP.md index a13be6beb3..df33142b88 100644 --- a/queue_job/readme/ROADMAP.md +++ b/queue_job/readme/ROADMAP.md @@ -1,17 +1,2 @@ - After creating a new database or installing `queue_job` on an existing database, Odoo must be restarted for the runner to detect it. -- When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - `started` or `enqueued` state after the Odoo server is halted. Since - the runner has no way to know if they are actually running or not, and - does not know for sure if it is safe to restart the jobs, it does not - attempt to restart them automatically. Such stale jobs therefore fill - the running queue and prevent other jobs to start. You must therefore - requeue them manually, either from the Jobs view, or by running the - following SQL statement *before starting Odoo*: - -``` sql -update queue_job set state='pending' where state in ('started', 'enqueued') -``` From 32f2a06b71451cf5e788072ebb3ce6e0453598e4 Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Mon, 2 Jun 2025 14:51:45 +0200 Subject: [PATCH 06/29] [IMP] queue_job: prevent invalid change of job status --- queue_job/models/queue_job.py | 17 +++++-- queue_job/tests/test_wizards.py | 52 ++++++++++++++++++++ queue_job/wizards/queue_jobs_to_cancelled.py | 6 +-- queue_job/wizards/queue_jobs_to_done.py | 2 + queue_job/wizards/queue_requeue_job.py | 1 + 5 files changed, 71 insertions(+), 7 deletions(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 55aa14e739..eb025c76f6 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -16,6 +16,7 @@ from ..job import ( CANCELLED, DONE, + ENQUEUED, FAILED, PENDING, STARTED, @@ -331,18 +332,26 @@ def _change_job_state(self, state, result=None): raise ValueError("State not supported: %s" % state) def button_done(self): + # If job was set to STARTED or CANCELLED, do not set it to DONE + states_from = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, FAILED) result = _("Manually set to done by %s") % self.env.user.name - self._change_job_state(DONE, result=result) + records = self.filtered(lambda job_: job_.state in states_from) + records._change_job_state(DONE, result=result) return True def button_cancelled(self): + # If job was set to DONE do not cancel it + states_from = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, FAILED) result = _("Cancelled by %s") % self.env.user.name - self._change_job_state(CANCELLED, result=result) + records = self.filtered(lambda job_: job_.state in states_from) + records._change_job_state(CANCELLED, result=result) return True def requeue(self): - jobs_to_requeue = self.filtered(lambda job_: job_.state != WAIT_DEPENDENCIES) - jobs_to_requeue._change_job_state(PENDING) + # If job is already in queue or started, do not requeue it + states_from = (FAILED, DONE, CANCELLED) + records = self.filtered(lambda job_: job_.state in states_from) + records._change_job_state(PENDING) return True def _message_post_on_failure(self): diff --git a/queue_job/tests/test_wizards.py b/queue_job/tests/test_wizards.py index 2ac162d313..b8fa81c5ac 100644 --- a/queue_job/tests/test_wizards.py +++ b/queue_job/tests/test_wizards.py @@ -46,3 +46,55 @@ def test_03_done(self): wizard = self._wizard("queue.jobs.to.done") wizard.set_done() self.assertEqual(self.job.state, "done") + + def test_04_requeue_forbidden(self): + wizard = self._wizard("queue.requeue.job") + + # State WAIT_DEPENDENCIES is not requeued + self.job.state = "wait_dependencies" + wizard.requeue() + self.assertEqual(self.job.state, "wait_dependencies") + + # State PENDING, ENQUEUED or STARTED are ignored too + for test_state in ("pending", "enqueued", "started"): + self.job.state = test_state + wizard.requeue() + self.assertEqual(self.job.state, test_state) + + # States CANCELLED, DONE or FAILED will change status + self.job.state = "cancelled" + wizard.requeue() + self.assertEqual(self.job.state, "pending") + + def test_05_cancel_forbidden(self): + wizard = self._wizard("queue.jobs.to.cancelled") + + # State DONE is not cancelled + self.job.state = "done" + wizard.set_cancelled() + self.assertEqual(self.job.state, "done") + + # State PENDING, ENQUEUED or FAILED will be cancelled + for test_state in ("pending", "enqueued", "wait_dependencies", "failed"): + self.job.state = test_state + wizard.set_cancelled() + self.assertEqual(self.job.state, "cancelled") + + def test_06_done_forbidden(self): + wizard = self._wizard("queue.jobs.to.done") + + # State STARTED is not set DONE manually + self.job.state = "started" + wizard.set_done() + self.assertEqual(self.job.state, "started") + + # State CANCELLED is not cancelled + self.job.state = "cancelled" + wizard.set_done() + self.assertEqual(self.job.state, "cancelled") + + # State WAIT_DEPENDENCIES, PENDING, ENQUEUED or FAILED will be set to DONE + for test_state in ("wait_dependencies", "pending", "enqueued", "failed"): + self.job.state = test_state + wizard.set_done() + self.assertEqual(self.job.state, "done") diff --git a/queue_job/wizards/queue_jobs_to_cancelled.py b/queue_job/wizards/queue_jobs_to_cancelled.py index 6fa2a32400..955a0cb36b 100644 --- a/queue_job/wizards/queue_jobs_to_cancelled.py +++ b/queue_job/wizards/queue_jobs_to_cancelled.py @@ -10,8 +10,8 @@ class SetJobsToCancelled(models.TransientModel): _description = "Cancel all selected jobs" def set_cancelled(self): - jobs = self.job_ids.filtered( - lambda x: x.state in ("wait_dependencies", "pending", "failed", "enqueued") - ) + # Only jobs with state WAIT_DEPENDENCIES, PENDING, FAILED, ENQUEUED + # will change to CANCELLED + jobs = self.job_ids jobs.button_cancelled() return {"type": "ir.actions.act_window_close"} diff --git a/queue_job/wizards/queue_jobs_to_done.py b/queue_job/wizards/queue_jobs_to_done.py index ff1366ffed..caf8129213 100644 --- a/queue_job/wizards/queue_jobs_to_done.py +++ b/queue_job/wizards/queue_jobs_to_done.py @@ -10,6 +10,8 @@ class SetJobsToDone(models.TransientModel): _description = "Set all selected jobs to done" def set_done(self): + # Only jobs with state WAIT_DEPENDENCIES, PENDING, ENQUEUED or FAILED + # will change to DONE jobs = self.job_ids jobs.button_done() return {"type": "ir.actions.act_window_close"} diff --git a/queue_job/wizards/queue_requeue_job.py b/queue_job/wizards/queue_requeue_job.py index 67d2ffcbdc..a88256300f 100644 --- a/queue_job/wizards/queue_requeue_job.py +++ b/queue_job/wizards/queue_requeue_job.py @@ -20,6 +20,7 @@ def _default_job_ids(self): ) def requeue(self): + # Only jobs with state FAILED, DONE or CANCELLED will change to PENDING jobs = self.job_ids jobs.requeue() return {"type": "ir.actions.act_window_close"} From 1e0e7eaad6385e848ba1a1837e9658d8e95399ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Fri, 2 Jan 2026 13:54:38 +0100 Subject: [PATCH 07/29] [IMP] queue_job: add job_duration parameter to test job This allows creating test job with a long duration for stress testing. --- queue_job/controllers/main.py | 15 +++++++++++++-- queue_job/models/queue_job.py | 5 ++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 5bebf823ca..ab84b5ec6d 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -187,6 +187,7 @@ def create_test_job( description="Test job", size=1, failure_rate=0, + job_duration=0, ): if not http.request.env.user.has_group("base.group_erp_manager"): raise Forbidden(_("Access Denied")) @@ -197,6 +198,12 @@ def create_test_job( except (ValueError, TypeError): failure_rate = 0 + if job_duration is not None: + try: + job_duration = float(job_duration) + except (ValueError, TypeError): + job_duration = 0 + if not (0 <= failure_rate <= 1): raise BadRequest("failure_rate must be between 0 and 1") @@ -225,6 +232,7 @@ def create_test_job( channel=channel, description=description, failure_rate=failure_rate, + job_duration=job_duration, ) if size > 1: @@ -235,6 +243,7 @@ def create_test_job( channel=channel, description=description, failure_rate=failure_rate, + job_duration=job_duration, ) return "" @@ -246,6 +255,7 @@ def _create_single_test_job( description="Test job", size=1, failure_rate=0, + job_duration=0, ): delayed = ( http.request.env["queue.job"] @@ -255,7 +265,7 @@ def _create_single_test_job( channel=channel, description=description, ) - ._test_job(failure_rate=failure_rate) + ._test_job(failure_rate=failure_rate, job_duration=job_duration) ) return f"job uuid: {delayed.db_record().uuid}" @@ -269,6 +279,7 @@ def _create_graph_test_jobs( channel=None, description="Test job", failure_rate=0, + job_duration=0, ): model = http.request.env["queue.job"] current_count = 0 @@ -291,7 +302,7 @@ def _create_graph_test_jobs( max_retries=max_retries, channel=channel, description="%s #%d" % (description, current_count), - )._test_job(failure_rate=failure_rate) + )._test_job(failure_rate=failure_rate, job_duration=job_duration) ) grouping = random.choice(possible_grouping_methods) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index eb025c76f6..a6058663de 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -3,6 +3,7 @@ import logging import random +import time from datetime import datetime, timedelta from odoo import _, api, exceptions, fields, models @@ -469,7 +470,9 @@ def related_action_open_record(self): ) return action - def _test_job(self, failure_rate=0): + def _test_job(self, failure_rate=0, job_duration=0): _logger.info("Running test job.") if random.random() <= failure_rate: raise JobError("Job failed") + if job_duration: + time.sleep(job_duration) From 93c308e8abba0399546f61a691571cbfc29a266b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Fri, 2 Jan 2026 15:12:40 +0100 Subject: [PATCH 08/29] queue_job: declare sbidoul as maintainer --- queue_job/__manifest__.py | 2 +- test_queue_job/__manifest__.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 9d215d5069..3a0997d2ce 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -29,7 +29,7 @@ }, "installable": True, "development_status": "Mature", - "maintainers": ["guewen"], + "maintainers": ["guewen", "sbidoul"], "post_init_hook": "post_init_hook", "post_load": "post_load", } diff --git a/test_queue_job/__manifest__.py b/test_queue_job/__manifest__.py index 9784bd69fe..dc96f6619e 100644 --- a/test_queue_job/__manifest__.py +++ b/test_queue_job/__manifest__.py @@ -15,5 +15,6 @@ "security/ir.model.access.csv", "data/queue_job_test_job.xml", ], + "maintainers": ["guewen", "sbidoul"], "installable": True, } From a431427b0e26cb723b72498c2bcab8cd6cba3306 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Fri, 2 Jan 2026 15:00:27 +0100 Subject: [PATCH 09/29] [FIX] queue_job: fix retry mechanisme for job dependencies When a SerializationFailure occurs when updating the state of dependent jobs, the cursor is not usable anymore so the retry failed with `current transaction is aborted`. A savepoint fixes that. --- queue_job/controllers/main.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index ab84b5ec6d..d8685e38c9 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -49,14 +49,15 @@ def _enqueue_dependent_jobs(self, env, job): tries = 0 while True: try: - job.enqueue_waiting() + with job.env.cr.savepoint(): + job.enqueue_waiting() except OperationalError as err: # Automatically retry the typical transaction serialization # errors if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: raise if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE: - _logger.info( + _logger.error( "%s, maximum number of tries reached to update dependencies", errorcodes.lookup(err.pgcode), ) From a929fe206db14f786f5a5c84ecd8ede9170d0d2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Fri, 2 Jan 2026 19:36:25 +0100 Subject: [PATCH 10/29] [FIX] queue_job: set exec_time readonly --- queue_job/models/queue_job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index a6058663de..d47f2acb3b 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -106,6 +106,7 @@ class QueueJob(models.Model): exec_time = fields.Float( string="Execution Time (avg)", group_operator="avg", + readonly=True, help="Time required to execute this job in seconds. Average when grouped.", ) date_cancelled = fields.Datetime(readonly=True) From e0f61a85a52965d83f1dc0583c8ab9d623efc9bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 3 Jan 2026 11:34:18 +0100 Subject: [PATCH 11/29] [IMP] queue_job: use state constant in lock function --- queue_job/job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/queue_job/job.py b/queue_job/job.py index 594f1948ab..054b672a06 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -278,11 +278,11 @@ def lock(self): queue_job WHERE uuid = %s - AND state='started' + AND state = %s ) FOR UPDATE; """, - [self.uuid], + [self.uuid, STARTED], ) # 1 job should be locked From 3d0b8bd7be1b49d2c38f26d32279f26715a4d158 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Fri, 2 Jan 2026 12:18:38 +0100 Subject: [PATCH 12/29] queue_job: refactor job acquisition In this commit we cleanly separate the job acquisition (i.e. verifying the job is in the exepected state, marking it started and locking it) from job execution. We also avoid trying to start the job if it is already locked by using SKIP LOCKED and exiting early. Indeed in such situations the job is likely already being handled by another worker so there is no point trying to start it, so we exit early and let it be handled either by the other worker or the dead job requeuer. --- queue_job/controllers/main.py | 57 ++++++++++++------- queue_job/job.py | 19 +++---- queue_job/tests/__init__.py | 1 - test_queue_job/tests/__init__.py | 1 + test_queue_job/tests/common.py | 10 ++++ test_queue_job/tests/test_acquire_job.py | 51 +++++++++++++++++ test_queue_job/tests/test_requeue_dead_job.py | 17 ------ 7 files changed, 107 insertions(+), 49 deletions(-) create mode 100644 test_queue_job/tests/test_acquire_job.py diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index d8685e38c9..d079b51b02 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -26,15 +26,47 @@ class RunJobController(http.Controller): - def _try_perform_job(self, env, job): - """Try to perform the job.""" + @classmethod + def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: + """Acquire a job for execution. + + - make sure it is in ENQUEUED state + - mark it as STARTED and commit the state change + - acquire the job lock + + If successful, return the Job instance, otherwise return None. This + function may fail to acquire the job is not in the expected state or is + already locked by another worker. + """ + env.cr.execute( + "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s " + "FOR UPDATE SKIP LOCKED", + (job_uuid, ENQUEUED), + ) + if not env.cr.fetchone(): + _logger.warning( + "was requested to run job %s, but it does not exist, " + "or is not in state %s, or is being handled by another worker", + job_uuid, + ENQUEUED, + ) + return None + job = Job.load(env, job_uuid) + assert job and job.state == ENQUEUED job.set_started() job.store() env.cr.commit() - job.lock() + if not job.lock(): + _logger.warning( + "was requested to run job %s, but it could not be locked", + job_uuid, + ) + return None + return job + def _try_perform_job(self, env, job): + """Try to perform the job, mark it done and commit if successful.""" _logger.debug("%s started", job) - job.perform() # Triggers any stored computed fields before calling 'set_done' # so that will be part of the 'exec_time' @@ -94,23 +126,10 @@ def retry_postpone(job, message, seconds=None): job.set_pending(reset_retry=False) job.store() - # ensure the job to run is in the correct state and lock the record - env.cr.execute( - "SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE", - (job_uuid, ENQUEUED), - ) - if not env.cr.fetchone(): - _logger.warning( - "was requested to run job %s, but it does not exist, " - "or is not in state %s", - job_uuid, - ENQUEUED, - ) + job = self._acquire_job(env, job_uuid) + if not job: return "" - job = Job.load(env, job_uuid) - assert job and job.state == ENQUEUED - try: try: self._try_perform_job(env, job) diff --git a/queue_job/job.py b/queue_job/job.py index 054b672a06..875170ce28 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -236,7 +236,7 @@ def load_many(cls, env, job_uuids): recordset = cls.db_records_from_uuids(env, job_uuids) return {cls._load_from_db_record(record) for record in recordset} - def add_lock_record(self): + def add_lock_record(self) -> None: """ Create row in db to be locked while the job is being performed. """ @@ -256,13 +256,11 @@ def add_lock_record(self): [self.uuid], ) - def lock(self): - """ - Lock row of job that is being performed + def lock(self) -> bool: + """Lock row of job that is being performed. - If a job cannot be locked, - it means that the job wasn't started, - a RetryableJobError is thrown. + Return False if a job cannot be locked: it means that the job is not in + STARTED state or is already locked by another worker. """ self.env.cr.execute( """ @@ -280,16 +278,13 @@ def lock(self): uuid = %s AND state = %s ) - FOR UPDATE; + FOR UPDATE SKIP LOCKED; """, [self.uuid, STARTED], ) # 1 job should be locked - if 1 != len(self.env.cr.fetchall()): - raise RetryableJobError( - f"Trying to lock job that wasn't started, uuid: {self.uuid}" - ) + return bool(self.env.cr.fetchall()) @classmethod def _load_from_db_record(cls, job_db_record): diff --git a/queue_job/tests/__init__.py b/queue_job/tests/__init__.py index 1062acdc25..16bcdff96b 100644 --- a/queue_job/tests/__init__.py +++ b/queue_job/tests/__init__.py @@ -8,4 +8,3 @@ from . import test_model_job_function from . import test_queue_job_protected_write from . import test_wizards -from . import test_requeue_dead_job diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index 62347148e5..0cfacebdf3 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -1,3 +1,4 @@ +from . import test_acquire_job from . import test_autovacuum from . import test_delayable from . import test_dependencies diff --git a/test_queue_job/tests/common.py b/test_queue_job/tests/common.py index a32fcc380a..c1f7d88ca0 100644 --- a/test_queue_job/tests/common.py +++ b/test_queue_job/tests/common.py @@ -20,3 +20,13 @@ def _create_job(self): stored = Job.db_record_from_uuid(self.env, test_job.uuid) self.assertEqual(len(stored), 1) return stored + + def _get_demo_job(self, uuid): + # job created during load of demo data + job = self.env["queue.job"].search([("uuid", "=", uuid)], limit=1) + self.assertTrue( + job, + f"Demo data queue job {uuid!r} should be loaded in order " + "to make this test work", + ) + return job diff --git a/test_queue_job/tests/test_acquire_job.py b/test_queue_job/tests/test_acquire_job.py new file mode 100644 index 0000000000..3f0c92a2be --- /dev/null +++ b/test_queue_job/tests/test_acquire_job.py @@ -0,0 +1,51 @@ +# Copyright 2026 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +import logging +from unittest import mock + +from odoo.tests import tagged + +from odoo.addons.queue_job.controllers.main import RunJobController + +from .common import JobCommonCase + + +@tagged("post_install", "-at_install") +class TestRequeueDeadJob(JobCommonCase): + def test_acquire_enqueued_job(self): + job_record = self._get_demo_job(uuid="test_enqueued_job") + self.assertFalse( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)], + ), + "A job lock record should not exist at this point", + ) + with mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit: + job = RunJobController._acquire_job(self.env, job_uuid="test_enqueued_job") + mock_commit.assert_called_once() + self.assertIsNotNone(job) + self.assertEqual(job.uuid, "test_enqueued_job") + self.assertEqual(job.state, "started") + self.assertTrue( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)] + ), + "A job lock record should exist at this point", + ) + + def test_acquire_started_job(self): + with ( + mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit, + self.assertLogs(level=logging.WARNING) as logs, + ): + job = RunJobController._acquire_job(self.env, "test_started_job") + mock_commit.assert_not_called() + self.assertIsNone(job) + self.assertIn( + "was requested to run job test_started_job, but it does not exist", + logs.output[0], + ) diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py index a6328fed76..3be5f6ffc6 100644 --- a/test_queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -13,23 +13,6 @@ @tagged("post_install", "-at_install") class TestRequeueDeadJob(JobCommonCase): - def _get_demo_job(self, uuid): - # job created during load of demo data - job = self.env["queue.job"].search( - [ - ("uuid", "=", uuid), - ], - limit=1, - ) - - self.assertTrue( - job, - f"Demo data queue job {uuid} should be loaded in order" - " to make this tests work", - ) - - return job - def get_locks(self, uuid, cr=None): """ Retrieve lock rows From 5408820a3982d0f941fce6ba697a310289b9e209 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 3 Jan 2026 11:26:02 +0100 Subject: [PATCH 13/29] [IMP] queue_job: refactor runjob Extract the logic to run one job out of the /queue_job/runjob route. Towards making this logic reusable in other job executors. --- queue_job/controllers/main.py | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index d079b51b02..3a0feddebc 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -107,17 +107,7 @@ def _enqueue_dependent_jobs(self, env, job): else: break - @http.route( - "/queue_job/runjob", - type="http", - auth="none", - save_session=False, - readonly=False, - ) - def runjob(self, db, job_uuid, **kw): - http.request.session.db = db - env = http.request.env(user=SUPERUSER_ID) - + def _runjob(self, env: api.Environment, job: Job) -> None: def retry_postpone(job, message, seconds=None): job.env.clear() with registry(job.env.cr.dbname).cursor() as new_cr: @@ -126,10 +116,6 @@ def retry_postpone(job, message, seconds=None): job.set_pending(reset_retry=False) job.store() - job = self._acquire_job(env, job_uuid) - if not job: - return "" - try: try: self._try_perform_job(env, job) @@ -161,7 +147,6 @@ def retry_postpone(job, message, seconds=None): # traceback in the logs we should have the traceback when all # retries are exhausted env.cr.rollback() - return "" except (FailedJobError, Exception) as orig_exception: buff = StringIO() @@ -181,8 +166,6 @@ def retry_postpone(job, message, seconds=None): self._enqueue_dependent_jobs(env, job) _logger.debug("%s enqueue depends done", job) - return "" - def _get_failure_values(self, job, traceback_txt, orig_exception): """Collect relevant data from exception.""" exception_name = orig_exception.__class__.__name__ @@ -197,6 +180,22 @@ def _get_failure_values(self, job, traceback_txt, orig_exception): "exc_message": exc_message, } + @http.route( + "/queue_job/runjob", + type="http", + auth="none", + save_session=False, + readonly=False, + ) + def runjob(self, db, job_uuid, **kw): + http.request.session.db = db + env = http.request.env(user=SUPERUSER_ID) + job = self._acquire_job(env, job_uuid) + if not job: + return "" + self._runjob(env, job) + return "" + # flake8: noqa: C901 @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( From cfea6a5ee53b8e309295694702ef5fdc9c6d19bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sat, 3 Jan 2026 11:39:13 +0100 Subject: [PATCH 14/29] [IMP] queue_job: convert job execution logic to class method Towards making this logic reusable. --- queue_job/controllers/main.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 3a0feddebc..9eab3bc17a 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -64,7 +64,8 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: return None return job - def _try_perform_job(self, env, job): + @classmethod + def _try_perform_job(cls, env, job): """Try to perform the job, mark it done and commit if successful.""" _logger.debug("%s started", job) job.perform() @@ -77,7 +78,8 @@ def _try_perform_job(self, env, job): env.cr.commit() _logger.debug("%s done", job) - def _enqueue_dependent_jobs(self, env, job): + @classmethod + def _enqueue_dependent_jobs(cls, env, job): tries = 0 while True: try: @@ -107,7 +109,8 @@ def _enqueue_dependent_jobs(self, env, job): else: break - def _runjob(self, env: api.Environment, job: Job) -> None: + @classmethod + def _runjob(cls, env: api.Environment, job: Job) -> None: def retry_postpone(job, message, seconds=None): job.env.clear() with registry(job.env.cr.dbname).cursor() as new_cr: @@ -118,7 +121,7 @@ def retry_postpone(job, message, seconds=None): try: try: - self._try_perform_job(env, job) + cls._try_perform_job(env, job) except OperationalError as err: # Automatically retry the typical transaction serialization # errors @@ -156,17 +159,18 @@ def retry_postpone(job, message, seconds=None): job.env.clear() with registry(job.env.cr.dbname).cursor() as new_cr: job.env = job.env(cr=new_cr) - vals = self._get_failure_values(job, traceback_txt, orig_exception) + vals = cls._get_failure_values(job, traceback_txt, orig_exception) job.set_failed(**vals) job.store() buff.close() raise _logger.debug("%s enqueue depends started", job) - self._enqueue_dependent_jobs(env, job) + cls._enqueue_dependent_jobs(env, job) _logger.debug("%s enqueue depends done", job) - def _get_failure_values(self, job, traceback_txt, orig_exception): + @classmethod + def _get_failure_values(cls, job, traceback_txt, orig_exception): """Collect relevant data from exception.""" exception_name = orig_exception.__class__.__name__ if hasattr(orig_exception, "__module__"): From 7fdadd626e71053b3396c297c66be53f9d263e5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Sun, 4 Jan 2026 13:38:48 +0100 Subject: [PATCH 15/29] [IMP] queue_job: take weaker locks Since we are not going to delete records nor modify foreign keys, we can take a weaker lock. --- queue_job/controllers/main.py | 2 +- queue_job/job.py | 2 +- queue_job/jobrunner/runner.py | 2 +- test_queue_job/tests/test_requeue_dead_job.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 9eab3bc17a..2d42497355 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -40,7 +40,7 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: """ env.cr.execute( "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s " - "FOR UPDATE SKIP LOCKED", + "FOR NO KEY UPDATE SKIP LOCKED", (job_uuid, ENQUEUED), ) if not env.cr.fetchone(): diff --git a/queue_job/job.py b/queue_job/job.py index 875170ce28..7e3fe98e00 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -278,7 +278,7 @@ def lock(self) -> bool: uuid = %s AND state = %s ) - FOR UPDATE SKIP LOCKED; + FOR NO KEY UPDATE SKIP LOCKED; """, [self.uuid, STARTED], ) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 2d4f802c66..e499af08f7 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -369,7 +369,7 @@ def _query_requeue_dead_jobs(self): queue_job_lock WHERE queue_job_lock.queue_job_id = queue_job.id - FOR UPDATE SKIP LOCKED + FOR NO KEY UPDATE SKIP LOCKED ) OR NOT EXISTS ( SELECT diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py index 3be5f6ffc6..cf469be97b 100644 --- a/test_queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -35,7 +35,7 @@ def get_locks(self, uuid, cr=None): WHERE uuid = %s ) - FOR UPDATE SKIP LOCKED + FOR NO KEY UPDATE SKIP LOCKED """, [uuid], ) From 319c0bcada5439d73c4192c89ce9452a0ed3937b Mon Sep 17 00:00:00 2001 From: RLeeOSI <51208020+RLeeOSI@users.noreply.github.com> Date: Mon, 26 Jan 2026 08:52:40 -0800 Subject: [PATCH 16/29] [REF] queue_job: documentation cleanup --- queue_job/README.rst | 45 +++++----- queue_job/jobrunner/runner.py | 106 +----------------------- queue_job/readme/CONFIGURE.md | 16 +++- queue_job/static/description/index.html | 88 +++++++++----------- 4 files changed, 72 insertions(+), 183 deletions(-) diff --git a/queue_job/README.rst b/queue_job/README.rst index f05207e79c..009951ba0f 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -1,7 +1,3 @@ -.. image:: https://odoo-community.org/readme-banner-image - :target: https://odoo-community.org/get-involved?utm_source=readme - :alt: Odoo Community Association - ========= Job Queue ========= @@ -17,7 +13,7 @@ Job Queue .. |badge1| image:: https://img.shields.io/badge/maturity-Mature-brightgreen.png :target: https://odoo-community.org/page/development-status :alt: Mature -.. |badge2| image:: https://img.shields.io/badge/license-LGPL--3-blue.png +.. |badge2| image:: https://img.shields.io/badge/licence-LGPL--3-blue.png :target: http://www.gnu.org/licenses/lgpl-3.0-standalone.html :alt: License: LGPL-3 .. |badge3| image:: https://img.shields.io/badge/github-OCA%2Fqueue-lightgray.png?logo=github @@ -129,10 +125,14 @@ Configuration - ``ODOO_QUEUE_JOB_CHANNELS=root:4`` or any other channels configuration. The default is ``root:1`` - - if ``xmlrpc_port`` is not set: ``ODOO_QUEUE_JOB_PORT=8069`` - - - Start Odoo with ``--load=web,queue_job`` and ``--workers`` greater - than 1. [1]_ + - ``ODOO_QUEUE_JOB_PORT=8069``, default ``--http-port`` + - ``ODOO_QUEUE_JOB_SCHEME=https``, default ``http`` + - ``ODOO_QUEUE_JOB_HOST=load-balancer``, default + ``--http-interface`` or ``localhost`` if unset + - ``ODOO_QUEUE_JOB_HTTP_AUTH_USER=jobrunner``, default empty + - ``ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD=s3cr3t``, default empty + - Start Odoo with ``--load=web,queue_job`` and ``--workers`` greater + than 1. [1]_ - Using the Odoo configuration file: @@ -146,6 +146,11 @@ Configuration (...) [queue_job] channels = root:2 + scheme = https + host = load-balancer + port = 443 + http_auth_user = jobrunner + http_auth_password = s3cr3t - Confirm the runner is starting correctly by checking the odoo log file: @@ -657,21 +662,6 @@ Known issues / Roadmap - After creating a new database or installing ``queue_job`` on an existing database, Odoo must be restarted for the runner to detect it. -- When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, it - does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. You - must therefore requeue them manually, either from the Jobs view, or by - running the following SQL statement *before starting Odoo*: - -.. code:: sql - - update queue_job set state='pending' where state in ('started', 'enqueued') Changelog ========= @@ -737,10 +727,13 @@ promote its widespread use. .. |maintainer-guewen| image:: https://github.com/guewen.png?size=40px :target: https://github.com/guewen :alt: guewen +.. |maintainer-sbidoul| image:: https://github.com/sbidoul.png?size=40px + :target: https://github.com/sbidoul + :alt: sbidoul -Current `maintainer `__: +Current `maintainers `__: -|maintainer-guewen| +|maintainer-guewen| |maintainer-sbidoul| This module is part of the `OCA/queue `_ project on GitHub. diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index e499af08f7..c4a98aa8fd 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -16,111 +16,7 @@ * It maintains an in-memory priority queue of jobs that is populated from the queue_job tables in all databases. * It does not run jobs itself, but asks Odoo to run them through an - anonymous ``/queue_job/runjob`` HTTP request. [1]_ - -How to use it? --------------- - -* Optionally adjust your configuration through environment variables: - - - ``ODOO_QUEUE_JOB_CHANNELS=root:4`` (or any other channels - configuration), default ``root:1``. - - ``ODOO_QUEUE_JOB_SCHEME=https``, default ``http``. - - ``ODOO_QUEUE_JOB_HOST=load-balancer``, default ``http_interface`` - or ``localhost`` if unset. - - ``ODOO_QUEUE_JOB_PORT=443``, default ``http_port`` or 8069 if unset. - - ``ODOO_QUEUE_JOB_HTTP_AUTH_USER=jobrunner``, default empty. - - ``ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD=s3cr3t``, default empty. - - ``ODOO_QUEUE_JOB_JOBRUNNER_DB_HOST=master-db``, default ``db_host`` - or ``False`` if unset. - - ``ODOO_QUEUE_JOB_JOBRUNNER_DB_PORT=5432``, default ``db_port`` - or ``False`` if unset. - - ``ODOO_QUEUE_JOB_JOBRUNNER_DB_USER=userdb``, default ``db_user`` - or ``False`` if unset. - - ``ODOO_QUEUE_JOB_JOBRUNNER_DB_PASSWORD=passdb``, default ``db_password`` - or ``False`` if unset. - -* Alternatively, configure the channels through the Odoo configuration - file, like: - -.. code-block:: ini - - [queue_job] - channels = root:4 - scheme = https - host = load-balancer - port = 443 - http_auth_user = jobrunner - http_auth_password = s3cr3t - jobrunner_db_host = master-db - jobrunner_db_port = 5432 - jobrunner_db_user = userdb - jobrunner_db_password = passdb - -* Or, if using ``anybox.recipe.odoo``, add this to your buildout configuration: - -.. code-block:: ini - - [odoo] - recipe = anybox.recipe.odoo - (...) - queue_job.channels = root:4 - queue_job.scheme = https - queue_job.host = load-balancer - queue_job.port = 443 - queue_job.http_auth_user = jobrunner - queue_job.http_auth_password = s3cr3t - -* Start Odoo with ``--load=web,web_kanban,queue_job`` - and ``--workers`` greater than 1 [2]_, or set the ``server_wide_modules`` - option in The Odoo configuration file: - -.. code-block:: ini - - [options] - (...) - workers = 4 - server_wide_modules = web,web_kanban,queue_job - (...) - -* Or, if using ``anybox.recipe.odoo``: - -.. code-block:: ini - - [odoo] - recipe = anybox.recipe.odoo - (...) - options.workers = 4 - options.server_wide_modules = web,web_kanban,queue_job - -* Confirm the runner is starting correctly by checking the odoo log file: - -.. code-block:: none - - ...INFO...queue_job.jobrunner.runner: starting - ...INFO...queue_job.jobrunner.runner: initializing database connections - ...INFO...queue_job.jobrunner.runner: queue job runner ready for db - ...INFO...queue_job.jobrunner.runner: database connections ready - -* Create jobs (eg using base_import_async) and observe they - start immediately and in parallel. - -* Tip: to enable debug logging for the queue job, use - ``--log-handler=odoo.addons.queue_job:DEBUG`` - -Caveat ------- - -* After creating a new database or installing queue_job on an - existing database, Odoo must be restarted for the runner to detect it. - -.. rubric:: Footnotes - -.. [1] From a security standpoint, it is safe to have an anonymous HTTP - request because this request only accepts to run jobs that are - enqueued. -.. [2] It works with the threaded Odoo server too, although this way - of running Odoo is obviously not for production purposes. + anonymous ``/queue_job/runjob`` HTTP request. """ import logging diff --git a/queue_job/readme/CONFIGURE.md b/queue_job/readme/CONFIGURE.md index 216b5358af..7239106218 100644 --- a/queue_job/readme/CONFIGURE.md +++ b/queue_job/readme/CONFIGURE.md @@ -2,9 +2,14 @@ - Adjust environment variables (optional): - `ODOO_QUEUE_JOB_CHANNELS=root:4` or any other channels configuration. The default is `root:1` - - if `xmlrpc_port` is not set: `ODOO_QUEUE_JOB_PORT=8069` - - Start Odoo with `--load=web,queue_job` and `--workers` greater than - 1.[^1] + - `ODOO_QUEUE_JOB_PORT=8069`, default `--http-port` + - `ODOO_QUEUE_JOB_SCHEME=https`, default `http` + - `ODOO_QUEUE_JOB_HOST=load-balancer`, default `--http-interface` + or `localhost` if unset + - `ODOO_QUEUE_JOB_HTTP_AUTH_USER=jobrunner`, default empty + - `ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD=s3cr3t`, default empty + - Start Odoo with `--load=web,queue_job` and `--workers` greater than + 1.[^1] - Using the Odoo configuration file: ``` ini @@ -16,6 +21,11 @@ server_wide_modules = web,queue_job (...) [queue_job] channels = root:2 +scheme = https +host = load-balancer +port = 443 +http_auth_user = jobrunner +http_auth_password = s3cr3t ``` - Confirm the runner is starting correctly by checking the odoo log diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index a6c54ab739..1f6653aaf4 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -3,7 +3,7 @@ -README.rst +Job Queue -
+
+

Job Queue

- - -Odoo Community Association - -
-

Job Queue

-

Mature License: LGPL-3 OCA/queue Translate me on Weblate Try me on Runboat

+

Mature License: LGPL-3 OCA/queue Translate me on Weblate Try me on Runboat

This addon adds an integrated Job Queue to Odoo.

It allows to postpone method calls executed asynchronously.

Jobs are executed in the background by a Jobrunner, in their own @@ -450,7 +445,7 @@

Job Queue

-

Use Cases / Context

+

Use Cases / Context

Odoo treats task synchronously, like when you import a list of products it will treat each line in one big task. “Queue job” gives you the ability to detail big tasks in many smaller ones.

@@ -478,23 +473,28 @@

Use Cases / Context

-

Installation

+

Installation

Be sure to have the requests library.

-

Configuration

+

Configuration

  • Using environment variables and command line:
    • Adjust environment variables (optional):
      • ODOO_QUEUE_JOB_CHANNELS=root:4 or any other channels configuration. The default is root:1
      • -
      • if xmlrpc_port is not set: ODOO_QUEUE_JOB_PORT=8069
      • -
      -
    • +
    • ODOO_QUEUE_JOB_PORT=8069, default --http-port
    • +
    • ODOO_QUEUE_JOB_SCHEME=https, default http
    • +
    • ODOO_QUEUE_JOB_HOST=load-balancer, default +--http-interface or localhost if unset
    • +
    • ODOO_QUEUE_JOB_HTTP_AUTH_USER=jobrunner, default empty
    • +
    • ODOO_QUEUE_JOB_HTTP_AUTH_PASSWORD=s3cr3t, default empty
    • Start Odoo with --load=web,queue_job and --workers greater than 1. [1]
  • +
+
  • Using the Odoo configuration file:
  • @@ -505,7 +505,12 @@ 

    Configuration

    (...) [queue_job] -channels = root:2 +channels = root:2 +scheme = https +host = load-balancer +port = 443 +http_auth_user = jobrunner +http_auth_password = s3cr3t
    • Confirm the runner is starting correctly by checking the odoo log @@ -535,15 +540,15 @@

      Configuration

    -

    Usage

    +

    Usage

    To use this module, you need to:

    1. Go to Job Queue menu
    -

    Developers

    +

    Developers

    -

    Delaying jobs

    +

    Delaying jobs

    The fast way to enqueue a job for a method is to use with_delay() on a record or model:

    @@ -663,7 +668,7 @@ 

    Delaying jobs

    -

    Enqueing Job Options

    +

    Enqueing Job Options

    • priority: default is 10, the closest it is to 0, the faster it will be executed
    • @@ -682,7 +687,7 @@

      Enqueing Job Options

    -

    Configure default options for jobs

    +

    Configure default options for jobs

    In earlier versions, jobs could be configured using the @job decorator. This is now obsolete, they can be configured using optional queue.job.function and queue.job.channel XML records.

    @@ -810,7 +815,7 @@

    Configure default options for job delaying any jobs.

    -

    Testing

    +

    Testing

    Asserting enqueued jobs

    The recommended way to test jobs, rather than running them directly and synchronously is to split the tests in two parts:

    @@ -925,7 +930,7 @@

    Testing

    synchronously

    -

    Patterns

    +

    Patterns

    Through the time, two main patterns emerged:

    1. For data exposed to users, a model should store the data and the @@ -952,30 +957,16 @@

      Patterns

    -

    Known issues / Roadmap

    +

    Known issues / Roadmap

    • After creating a new database or installing queue_job on an existing database, Odoo must be restarted for the runner to detect it.
    • -
    • When Odoo shuts down normally, it waits for running jobs to finish. -However, when the Odoo server crashes or is otherwise force-stopped, -running jobs are interrupted while the runner has no chance to know -they have been aborted. In such situations, jobs may remain in -started or enqueued state after the Odoo server is halted. -Since the runner has no way to know if they are actually running or -not, and does not know for sure if it is safe to restart the jobs, it -does not attempt to restart them automatically. Such stale jobs -therefore fill the running queue and prevent other jobs to start. You -must therefore requeue them manually, either from the Jobs view, or by -running the following SQL statement before starting Odoo:
    -
    -update queue_job set state='pending' where state in ('started', 'enqueued')
    -
    -

    Changelog

    +

    Changelog

    -

    Bug Tracker

    +

    Bug Tracker

    Bugs are tracked on GitHub Issues. In case of trouble, please check there if your issue has already been reported. If you spotted it first, help us to smash it by providing a detailed and welcomed @@ -994,16 +985,16 @@

    Bug Tracker

    Do not contact contributors directly about support or help with technical issues.

    -

    Credits

    +

    Credits

    -

    Authors

    +

    Authors

    • Camptocamp
    • ACSONE SA/NV
    -

    Contributors

    +

    Contributors

    -

    Maintainers

    +

    Maintainers

    This module is maintained by the OCA.

    Odoo Community Association @@ -1029,13 +1020,12 @@

    Maintainers

    OCA, or the Odoo Community Association, is a nonprofit organization whose mission is to support the collaborative development of Odoo features and promote its widespread use.

    -

    Current maintainer:

    -

    guewen

    +

    Current maintainers:

    +

    guewen sbidoul

    This module is part of the OCA/queue project on GitHub.

    You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute.

    -
    From 62f6f3aee7a8bb10aed2c0feacd87d9beb187786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Wed, 21 Jan 2026 13:19:50 +0100 Subject: [PATCH 17/29] [IMP] queue_job: prevent commit during queue job execution This would release the job lock, causing spurious restarts by the dead jobs requeuer. --- queue_job/controllers/main.py | 58 +++++++++++++++++++++++++++++------ queue_job/models/queue_job.py | 4 ++- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 2d42497355..ed97f01e6c 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -6,6 +6,7 @@ import random import time import traceback +from contextlib import contextmanager from io import StringIO from psycopg2 import OperationalError, errorcodes @@ -25,6 +26,29 @@ DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE = 5 +@contextmanager +def _prevent_commit(cr): + """Context manager to prevent commits on a cursor. + + Commiting while the job is not finished would release the job lock, causing + it to be started again by the dead jobs requeuer. + """ + + def forbidden_commit(*args, **kwargs): + raise RuntimeError( + "Commit is forbidden in queue jobs. " + "If the current job is a cron running as queue job, " + "modify it to run as a normal cron." + ) + + original_commit = cr.commit + cr.commit = forbidden_commit + try: + yield + finally: + cr.commit = original_commit + + class RunJobController(http.Controller): @classmethod def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: @@ -68,13 +92,16 @@ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: def _try_perform_job(cls, env, job): """Try to perform the job, mark it done and commit if successful.""" _logger.debug("%s started", job) - job.perform() - # Triggers any stored computed fields before calling 'set_done' - # so that will be part of the 'exec_time' - env.flush_all() - job.set_done() - job.store() - env.flush_all() + # TODO refactor, the relation between env and job.env is not clear + assert env.cr is job.env.cr + with _prevent_commit(env.cr): + job.perform() + # Triggers any stored computed fields before calling 'set_done' + # so that will be part of the 'exec_time' + env.flush_all() + job.set_done() + job.store() + env.flush_all() env.cr.commit() _logger.debug("%s done", job) @@ -211,6 +238,7 @@ def create_test_job( size=1, failure_rate=0, job_duration=0, + commit_within_job=False, ): if not http.request.env.user.has_group("base.group_erp_manager"): raise Forbidden(_("Access Denied")) @@ -256,6 +284,7 @@ def create_test_job( description=description, failure_rate=failure_rate, job_duration=job_duration, + commit_within_job=commit_within_job, ) if size > 1: @@ -267,6 +296,7 @@ def create_test_job( description=description, failure_rate=failure_rate, job_duration=job_duration, + commit_within_job=commit_within_job, ) return "" @@ -279,6 +309,7 @@ def _create_single_test_job( size=1, failure_rate=0, job_duration=0, + commit_within_job=False, ): delayed = ( http.request.env["queue.job"] @@ -288,7 +319,11 @@ def _create_single_test_job( channel=channel, description=description, ) - ._test_job(failure_rate=failure_rate, job_duration=job_duration) + ._test_job( + failure_rate=failure_rate, + job_duration=job_duration, + commit_within_job=commit_within_job, + ) ) return f"job uuid: {delayed.db_record().uuid}" @@ -303,6 +338,7 @@ def _create_graph_test_jobs( description="Test job", failure_rate=0, job_duration=0, + commit_within_job=False, ): model = http.request.env["queue.job"] current_count = 0 @@ -325,7 +361,11 @@ def _create_graph_test_jobs( max_retries=max_retries, channel=channel, description="%s #%d" % (description, current_count), - )._test_job(failure_rate=failure_rate, job_duration=job_duration) + )._test_job( + failure_rate=failure_rate, + job_duration=job_duration, + commit_within_job=commit_within_job, + ) ) grouping = random.choice(possible_grouping_methods) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index d47f2acb3b..1bde9faf7e 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -471,9 +471,11 @@ def related_action_open_record(self): ) return action - def _test_job(self, failure_rate=0, job_duration=0): + def _test_job(self, failure_rate=0, job_duration=0, commit_within_job=False): _logger.info("Running test job.") if random.random() <= failure_rate: raise JobError("Job failed") if job_duration: time.sleep(job_duration) + if commit_within_job: + self.env.cr.commit() # pylint: disable=invalid-commit From d5d29dfe36088e345bc4f5198361f43ef1441acf Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Thu, 19 Feb 2026 16:31:17 +0100 Subject: [PATCH 18/29] Skip check of dependencies when a done job has no dependents Every time a job is done, even if it is not part of a graph, it runs a query to look for dependents to enqueue. Storing the dependent uuids in the "dependencies" field was on purpose to know that we have no further jobs in the graph and that we can skip the check entirely and have no overhead in this case. It looks like an oversight, we can add the missing condition. --- queue_job/controllers/main.py | 7 +++++-- queue_job/job.py | 3 +++ test_queue_job/tests/test_dependencies.py | 13 +++++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index ed97f01e6c..1c8fbbe56a 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -107,6 +107,10 @@ def _try_perform_job(cls, env, job): @classmethod def _enqueue_dependent_jobs(cls, env, job): + if not job.should_check_dependents(): + return + + _logger.debug("%s enqueue depends started", job) tries = 0 while True: try: @@ -135,6 +139,7 @@ def _enqueue_dependent_jobs(cls, env, job): time.sleep(wait_time) else: break + _logger.debug("%s enqueue depends done", job) @classmethod def _runjob(cls, env: api.Environment, job: Job) -> None: @@ -192,9 +197,7 @@ def retry_postpone(job, message, seconds=None): buff.close() raise - _logger.debug("%s enqueue depends started", job) cls._enqueue_dependent_jobs(env, job) - _logger.debug("%s enqueue depends done", job) @classmethod def _get_failure_values(cls, job, traceback_txt, orig_exception): diff --git a/queue_job/job.py b/queue_job/job.py index 7e3fe98e00..464670fb2b 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -558,6 +558,9 @@ def _get_common_dependent_jobs_query(self): AND state = %s; """ + def should_check_dependents(self): + return any(self.__reverse_depends_on_uuids) + def enqueue_waiting(self): sql = self._get_common_dependent_jobs_query() self.env.cr.execute(sql, (PENDING, self.uuid, DONE, WAIT_DEPENDENCIES)) diff --git a/test_queue_job/tests/test_dependencies.py b/test_queue_job/tests/test_dependencies.py index 4246fdbeba..d8a9253f00 100644 --- a/test_queue_job/tests/test_dependencies.py +++ b/test_queue_job/tests/test_dependencies.py @@ -287,3 +287,16 @@ def test_depends_graph_uuid_group(self): self.assertTrue(jobs[0].graph_uuid) self.assertTrue(jobs[1].graph_uuid) self.assertEqual(jobs[0].graph_uuid, jobs[1].graph_uuid) + + def test_should_check_dependents(self): + job_root = Job(self.method) + job_a = Job(self.method) + job_a.add_depends({job_root}) + + DelayableGraph._ensure_same_graph_uuid([job_root, job_a]) + + job_root.store() + job_a.store() + + self.assertTrue(job_root.should_check_dependents()) + self.assertFalse(job_a.should_check_dependents()) From fd84b86926dde1b79f5b4bb13bd94a36d87f3125 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Thu, 19 Feb 2026 17:02:42 +0100 Subject: [PATCH 19/29] Fix dependents error after retryable job error --- queue_job/controllers/main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 1c8fbbe56a..c7c5cccdeb 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -182,6 +182,7 @@ def retry_postpone(job, message, seconds=None): # traceback in the logs we should have the traceback when all # retries are exhausted env.cr.rollback() + return except (FailedJobError, Exception) as orig_exception: buff = StringIO() From 96782ebbaf01b7bdc1614f82b73961f576490c7f Mon Sep 17 00:00:00 2001 From: Andrii9090-tecnativa Date: Wed, 4 Mar 2026 11:45:50 +0100 Subject: [PATCH 20/29] [FIX] queue_job: Fix TestJson In this case, when a module adds a value in context, the tests fail --- queue_job/tests/test_json_field.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/queue_job/tests/test_json_field.py b/queue_job/tests/test_json_field.py index f5bf760ffe..fe220f305c 100644 --- a/queue_job/tests/test_json_field.py +++ b/queue_job/tests/test_json_field.py @@ -28,9 +28,13 @@ def test_encoder_recordset(self): "model": "res.partner", "ids": [partner.id], "su": False, - "context": expected_context, } - self.assertEqual(json.loads(value_json), expected) + result_dict = json.loads(value_json) + result_context = result_dict.pop("context") + self.assertEqual(result_dict, expected) + # context is tested separately as the order/amount of keys is not guaranteed + for key in result_context: + self.assertEqual(result_context[key], expected_context[key]) def test_encoder_recordset_list(self): demo_user = self.env.ref("base.user_demo") @@ -52,7 +56,20 @@ def test_encoder_recordset_list(self): "context": expected_context, }, ] - self.assertEqual(json.loads(value_json), expected) + result_dict = json.loads(value_json) + for result_value, expected_value in zip(result_dict, expected, strict=False): + if isinstance(expected_value, dict): + for key in result_value: + if key == "context": + for context_key in result_value["context"]: + self.assertEqual( + result_value["context"][context_key], + expected_value["context"][context_key], + ) + else: + self.assertEqual(result_value[key], expected_value[key]) + else: + self.assertEqual(result_value, expected_value) def test_decoder_recordset(self): demo_user = self.env.ref("base.user_demo") From 12a762d7a6443e06a4ad96a545225bf35d3938f4 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Thu, 19 Feb 2026 15:55:48 +0100 Subject: [PATCH 21/29] Add 'Allow Commit' option on job functions It is forbidden to commit inside a job, because it releases the job lock and can cause it to start again, while still being run, by the dead jobs requeuer. For some use cases, it may actually be legitimate, or at least be needed in the short term before actual updates in the code. A new option on the job function, false by default, allow to run the job in a new transaction, at the cost of an additional connection + transaction overhead. Related to #889 --- queue_job/controllers/main.py | 5 ++-- queue_job/job.py | 26 ++++++++++++++++---- queue_job/models/queue_job_function.py | 10 +++++++- queue_job/tests/common.py | 2 +- queue_job/tests/test_model_job_function.py | 2 ++ queue_job/views/queue_job_function_views.xml | 2 ++ 6 files changed, 38 insertions(+), 9 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index c7c5cccdeb..658ef0fad8 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -37,8 +37,9 @@ def _prevent_commit(cr): def forbidden_commit(*args, **kwargs): raise RuntimeError( "Commit is forbidden in queue jobs. " - "If the current job is a cron running as queue job, " - "modify it to run as a normal cron." + 'You may want to enable the "Allow Commit" option on the Job ' + "Function. Alternatively, if the current job is a cron running as " + "queue job, you can modify it to run as a normal cron." ) original_commit = cr.commit diff --git a/queue_job/job.py b/queue_job/job.py index 464670fb2b..3d70d66b3f 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -8,6 +8,7 @@ import sys import uuid import weakref +from contextlib import contextmanager, nullcontext from datetime import datetime, timedelta from random import randint @@ -427,10 +428,6 @@ def __init__( self.method_name = func.__name__ self.recordset = recordset - self.env = env - self.job_model = self.env["queue.job"] - self.job_model_name = "queue.job" - self.job_config = ( self.env["queue.job.function"].sudo().job_config(self.job_function_name) ) @@ -508,7 +505,12 @@ def perform(self): """ self.retry += 1 try: - self.result = self.func(*tuple(self.args), **self.kwargs) + if self.job_config.allow_commit: + env_context_manager = self._with_temporary_env() + else: + env_context_manager = nullcontext() + with env_context_manager: + self.result = self.func(*tuple(self.args), **self.kwargs) except RetryableJobError as err: if err.ignore_retry: self.retry -= 1 @@ -528,6 +530,16 @@ def perform(self): return self.result + @contextmanager + def _with_temporary_env(self): + with self.env.registry.cursor() as new_cr: + env = self.recordset.env + self.recordset = self.recordset.with_env(env(cr=new_cr)) + try: + yield + finally: + self.recordset = self.recordset.with_env(env) + def _get_common_dependent_jobs_query(self): return """ UPDATE queue_job @@ -689,6 +701,10 @@ def __hash__(self): def db_record(self): return self.db_records_from_uuids(self.env, [self.uuid]) + @property + def env(self): + return self.recordset.env + @property def func(self): recordset = self.recordset.with_context(job_uuid=self.uuid) diff --git a/queue_job/models/queue_job_function.py b/queue_job/models/queue_job_function.py index 7cf73ea370..f9c21c9801 100644 --- a/queue_job/models/queue_job_function.py +++ b/queue_job/models/queue_job_function.py @@ -28,7 +28,8 @@ class QueueJobFunction(models.Model): "related_action_enable " "related_action_func_name " "related_action_kwargs " - "job_function_id ", + "job_function_id " + "allow_commit", ) def _default_channel(self): @@ -79,6 +80,11 @@ def _default_channel(self): "enable, func_name, kwargs.\n" "See the module description for details.", ) + allow_commit = fields.Boolean( + help="Allows the job to commit transactions during execution. " + "Under the hood, this executes the job in a new database cursor, " + "which incurs a slight overhead.", + ) @api.depends("model_id.model", "method") def _compute_name(self): @@ -149,6 +155,7 @@ def job_default_config(self): related_action_func_name=None, related_action_kwargs={}, job_function_id=None, + allow_commit=False, ) def _parse_retry_pattern(self): @@ -184,6 +191,7 @@ def job_config(self, name): related_action_func_name=config.related_action.get("func_name"), related_action_kwargs=config.related_action.get("kwargs", {}), job_function_id=config.id, + allow_commit=config.allow_commit, ) def _retry_pattern_format_error_message(self): diff --git a/queue_job/tests/common.py b/queue_job/tests/common.py index 4699c99504..5811473c63 100644 --- a/queue_job/tests/common.py +++ b/queue_job/tests/common.py @@ -275,7 +275,7 @@ def _add_job(self, *args, **kwargs): def _prepare_context(self, job): # pylint: disable=context-overridden - job_model = job.job_model.with_context({}) + job_model = job.env["queue.job"].with_context({}) field_records = job_model._fields["records"] # Filter the context to simulate store/load of the job job.recordset = field_records.convert_to_write(job.recordset, job_model) diff --git a/queue_job/tests/test_model_job_function.py b/queue_job/tests/test_model_job_function.py index 84676fdb65..9095f2a55e 100644 --- a/queue_job/tests/test_model_job_function.py +++ b/queue_job/tests/test_model_job_function.py @@ -42,6 +42,7 @@ def test_function_job_config(self): ' "func_name": "related_action_foo",' ' "kwargs": {"b": 1}}' ), + "allow_commit": True, } ) self.assertEqual( @@ -53,5 +54,6 @@ def test_function_job_config(self): related_action_func_name="related_action_foo", related_action_kwargs={"b": 1}, job_function_id=job_function.id, + allow_commit=True, ), ) diff --git a/queue_job/views/queue_job_function_views.xml b/queue_job/views/queue_job_function_views.xml index a6e2ce402c..6c208b2b67 100644 --- a/queue_job/views/queue_job_function_views.xml +++ b/queue_job/views/queue_job_function_views.xml @@ -11,6 +11,7 @@ + @@ -25,6 +26,7 @@ + From a9bde1471e9e09256d37d0b0bef561d675e7e627 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Thu, 19 Feb 2026 17:31:37 +0100 Subject: [PATCH 22/29] Add parameter to allow commit by default in jobs False on new databases, True on existing databases. Should always be False by default on future versions. --- queue_job/__manifest__.py | 1 + queue_job/data/ir_config_parameter_data.xml | 7 +++++++ .../migrations/18.0.2.2.0/post-migration.py | 13 +++++++++++++ queue_job/models/queue_job_function.py | 16 +++++++++++++++- 4 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 queue_job/data/ir_config_parameter_data.xml create mode 100644 queue_job/migrations/18.0.2.2.0/post-migration.py diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 3a0997d2ce..c631729338 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -21,6 +21,7 @@ "views/queue_job_menus.xml", "data/queue_data.xml", "data/queue_job_function_data.xml", + "data/ir_config_parameter_data.xml", ], "assets": { "web.assets_backend": [ diff --git a/queue_job/data/ir_config_parameter_data.xml b/queue_job/data/ir_config_parameter_data.xml new file mode 100644 index 0000000000..1cfdcd19bc --- /dev/null +++ b/queue_job/data/ir_config_parameter_data.xml @@ -0,0 +1,7 @@ + + + + queue_job.allow_commit_by_default + False + + diff --git a/queue_job/migrations/18.0.2.2.0/post-migration.py b/queue_job/migrations/18.0.2.2.0/post-migration.py new file mode 100644 index 0000000000..216fe49a8d --- /dev/null +++ b/queue_job/migrations/18.0.2.2.0/post-migration.py @@ -0,0 +1,13 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +from openupgradelib import openupgrade + + +@openupgrade.migrate() +def migrate(env, version): + if not version: + return + + env["ir.config_parameter"].sudo().set_param( + "queue_job.allow_commit_by_default", True + ) + env["queue.job.function"].search([]).write({"allow_commit": True}) diff --git a/queue_job/models/queue_job_function.py b/queue_job/models/queue_job_function.py index f9c21c9801..102bcb14e7 100644 --- a/queue_job/models/queue_job_function.py +++ b/queue_job/models/queue_job_function.py @@ -7,6 +7,7 @@ from collections import namedtuple from odoo import _, api, exceptions, fields, models, tools +from odoo.tools import str2bool from ..fields import JobSerialized @@ -81,6 +82,7 @@ def _default_channel(self): "See the module description for details.", ) allow_commit = fields.Boolean( + default=lambda self: self._default_allow_commit_by_default(), help="Allows the job to commit transactions during execution. " "Under the hood, this executes the job in a new database cursor, " "which incurs a slight overhead.", @@ -155,7 +157,19 @@ def job_default_config(self): related_action_func_name=None, related_action_kwargs={}, job_function_id=None, - allow_commit=False, + allow_commit=self._default_allow_commit_by_default(), + ) + + @api.model + def _default_allow_commit_by_default(self): + # We shoud not allow commit by default on job functions, this parameter + # is here for backward compatibility, a migration sets it by default on + # existing databases, but new databases will have it set to False by + # default. + return str2bool( + self.env["ir.config_parameter"] + .sudo() + .get_param("queue_job.allow_commit_by_default") ) def _parse_retry_pattern(self): From b2949eb681ffa1a8b3034e4763875c05df7f9013 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Fri, 20 Feb 2026 15:15:18 +0100 Subject: [PATCH 23/29] Revert "Add parameter to allow commit by default in jobs" This reverts commit b4f3bec9ba7516635c3e8992da724499f708285e. --- queue_job/__manifest__.py | 1 - queue_job/data/ir_config_parameter_data.xml | 7 ------- .../migrations/18.0.2.2.0/post-migration.py | 13 ------------- queue_job/models/queue_job_function.py | 16 +--------------- 4 files changed, 1 insertion(+), 36 deletions(-) delete mode 100644 queue_job/data/ir_config_parameter_data.xml delete mode 100644 queue_job/migrations/18.0.2.2.0/post-migration.py diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index c631729338..3a0997d2ce 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -21,7 +21,6 @@ "views/queue_job_menus.xml", "data/queue_data.xml", "data/queue_job_function_data.xml", - "data/ir_config_parameter_data.xml", ], "assets": { "web.assets_backend": [ diff --git a/queue_job/data/ir_config_parameter_data.xml b/queue_job/data/ir_config_parameter_data.xml deleted file mode 100644 index 1cfdcd19bc..0000000000 --- a/queue_job/data/ir_config_parameter_data.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - queue_job.allow_commit_by_default - False - - diff --git a/queue_job/migrations/18.0.2.2.0/post-migration.py b/queue_job/migrations/18.0.2.2.0/post-migration.py deleted file mode 100644 index 216fe49a8d..0000000000 --- a/queue_job/migrations/18.0.2.2.0/post-migration.py +++ /dev/null @@ -1,13 +0,0 @@ -# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) -from openupgradelib import openupgrade - - -@openupgrade.migrate() -def migrate(env, version): - if not version: - return - - env["ir.config_parameter"].sudo().set_param( - "queue_job.allow_commit_by_default", True - ) - env["queue.job.function"].search([]).write({"allow_commit": True}) diff --git a/queue_job/models/queue_job_function.py b/queue_job/models/queue_job_function.py index 102bcb14e7..f9c21c9801 100644 --- a/queue_job/models/queue_job_function.py +++ b/queue_job/models/queue_job_function.py @@ -7,7 +7,6 @@ from collections import namedtuple from odoo import _, api, exceptions, fields, models, tools -from odoo.tools import str2bool from ..fields import JobSerialized @@ -82,7 +81,6 @@ def _default_channel(self): "See the module description for details.", ) allow_commit = fields.Boolean( - default=lambda self: self._default_allow_commit_by_default(), help="Allows the job to commit transactions during execution. " "Under the hood, this executes the job in a new database cursor, " "which incurs a slight overhead.", @@ -157,19 +155,7 @@ def job_default_config(self): related_action_func_name=None, related_action_kwargs={}, job_function_id=None, - allow_commit=self._default_allow_commit_by_default(), - ) - - @api.model - def _default_allow_commit_by_default(self): - # We shoud not allow commit by default on job functions, this parameter - # is here for backward compatibility, a migration sets it by default on - # existing databases, but new databases will have it set to False by - # default. - return str2bool( - self.env["ir.config_parameter"] - .sudo() - .get_param("queue_job.allow_commit_by_default") + allow_commit=False, ) def _parse_retry_pattern(self): From 1e3b5253d021b41615e143183b2b9499358b5932 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Fri, 20 Feb 2026 15:16:52 +0100 Subject: [PATCH 24/29] Improve documentation on allow commit --- queue_job/controllers/main.py | 3 ++- queue_job/models/queue_job_function.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 658ef0fad8..b34d36b5d2 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -39,7 +39,8 @@ def forbidden_commit(*args, **kwargs): "Commit is forbidden in queue jobs. " 'You may want to enable the "Allow Commit" option on the Job ' "Function. Alternatively, if the current job is a cron running as " - "queue job, you can modify it to run as a normal cron." + "queue job, you can modify it to run as a normal cron. More details on: " + "https://github.com/OCA/queue/wiki/%5BDRAFT%5D-Upgrade-warning:-commits-inside-jobs" ) original_commit = cr.commit diff --git a/queue_job/models/queue_job_function.py b/queue_job/models/queue_job_function.py index f9c21c9801..edf90c9ab7 100644 --- a/queue_job/models/queue_job_function.py +++ b/queue_job/models/queue_job_function.py @@ -83,7 +83,8 @@ def _default_channel(self): allow_commit = fields.Boolean( help="Allows the job to commit transactions during execution. " "Under the hood, this executes the job in a new database cursor, " - "which incurs a slight overhead.", + "which incurs an overhead as it requires an extra connection to " + "the database. " ) @api.depends("model_id.model", "method") From 43836d79434c568b6e862bd0623e02e67bd94cd8 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Tue, 24 Feb 2026 11:35:38 +0100 Subject: [PATCH 25/29] Fix missing job.env setter As the controller changes env on Job instances. --- queue_job/controllers/main.py | 4 +++- queue_job/job.py | 11 +++++++---- queue_job/tests/test_run_rob_controller.py | 6 ++++++ 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index b34d36b5d2..368856db9b 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -14,6 +14,7 @@ from odoo import SUPERUSER_ID, _, api, http, registry, tools from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY +from odoo.tools import config from ..delay import chain, group from ..exception import FailedJobError, NothingToDoJob, RetryableJobError @@ -104,7 +105,8 @@ def _try_perform_job(cls, env, job): job.set_done() job.store() env.flush_all() - env.cr.commit() + if not config["test_enable"]: + env.cr.commit() _logger.debug("%s done", job) @classmethod diff --git a/queue_job/job.py b/queue_job/job.py index 3d70d66b3f..275d7a9dc2 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -424,7 +424,6 @@ def __init__( raise TypeError("Job accepts only methods of Models") recordset = func.__self__ - env = recordset.env self.method_name = func.__name__ self.recordset = recordset @@ -477,10 +476,10 @@ def __init__( self.exc_message = None self.exc_info = None - if "company_id" in env.context: - company_id = env.context["company_id"] + if "company_id" in self.env.context: + company_id = self.env.context["company_id"] else: - company_id = env.company.id + company_id = self.env.company.id self.company_id = company_id self._eta = None self.eta = eta @@ -705,6 +704,10 @@ def db_record(self): def env(self): return self.recordset.env + @env.setter + def env(self, env): + self.recordset = self.recordset.with_env(env) + @property def func(self): recordset = self.recordset.with_context(job_uuid=self.uuid) diff --git a/queue_job/tests/test_run_rob_controller.py b/queue_job/tests/test_run_rob_controller.py index bb63bc82ec..1a15f4363a 100644 --- a/queue_job/tests/test_run_rob_controller.py +++ b/queue_job/tests/test_run_rob_controller.py @@ -15,3 +15,9 @@ def test_get_failure_values(self): self.assertEqual( rslt, {"exc_info": "info", "exc_name": "Exception", "exc_message": "zero"} ) + + def test_runjob_success(self): + job = self.env["queue.job"].with_delay()._test_job() + RunJobController._runjob(self.env, job) + self.assertEqual(job.state, "done") + self.assertEqual(job.db_record().state, "done") From 6f552872bae746d30f98e0302a34070f0d7e7e66 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Tue, 24 Feb 2026 16:00:20 +0100 Subject: [PATCH 26/29] Add failure retry in queue job test job --- queue_job/controllers/main.py | 13 +++++++++++++ queue_job/models/queue_job.py | 19 ++++++++++++++++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 368856db9b..5d2da27fd0 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -247,6 +247,7 @@ def create_test_job( failure_rate=0, job_duration=0, commit_within_job=False, + failure_retry_seconds=0, ): if not http.request.env.user.has_group("base.group_erp_manager"): raise Forbidden(_("Access Denied")) @@ -284,6 +285,12 @@ def create_test_job( except ValueError: max_retries = None + if failure_retry_seconds is not None: + try: + failure_retry_seconds = int(failure_retry_seconds) + except ValueError: + failure_retry_seconds = 0 + if size == 1: return self._create_single_test_job( priority=priority, @@ -293,6 +300,7 @@ def create_test_job( failure_rate=failure_rate, job_duration=job_duration, commit_within_job=commit_within_job, + failure_retry_seconds=failure_retry_seconds, ) if size > 1: @@ -305,6 +313,7 @@ def create_test_job( failure_rate=failure_rate, job_duration=job_duration, commit_within_job=commit_within_job, + failure_retry_seconds=failure_retry_seconds, ) return "" @@ -318,6 +327,7 @@ def _create_single_test_job( failure_rate=0, job_duration=0, commit_within_job=False, + failure_retry_seconds=0, ): delayed = ( http.request.env["queue.job"] @@ -331,6 +341,7 @@ def _create_single_test_job( failure_rate=failure_rate, job_duration=job_duration, commit_within_job=commit_within_job, + failure_retry_seconds=failure_retry_seconds, ) ) return f"job uuid: {delayed.db_record().uuid}" @@ -347,6 +358,7 @@ def _create_graph_test_jobs( failure_rate=0, job_duration=0, commit_within_job=False, + failure_retry_seconds=0, ): model = http.request.env["queue.job"] current_count = 0 @@ -373,6 +385,7 @@ def _create_graph_test_jobs( failure_rate=failure_rate, job_duration=job_duration, commit_within_job=commit_within_job, + failure_retry_seconds=failure_retry_seconds, ) ) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 1bde9faf7e..da6ce212d5 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -12,7 +12,7 @@ from odoo.addons.base_sparse_field.models.fields import Serialized from ..delay import Graph -from ..exception import JobError +from ..exception import JobError, RetryableJobError from ..fields import JobSerialized from ..job import ( CANCELLED, @@ -471,10 +471,23 @@ def related_action_open_record(self): ) return action - def _test_job(self, failure_rate=0, job_duration=0, commit_within_job=False): + def _test_job( + self, + failure_rate=0, + job_duration=0, + commit_within_job=False, + failure_retry_seconds=0, + ): _logger.info("Running test job.") if random.random() <= failure_rate: - raise JobError("Job failed") + if failure_retry_seconds: + raise RetryableJobError( + f"Retryable job failed, will be retried in " + f"{failure_retry_seconds} seconds", + seconds=failure_retry_seconds, + ) + else: + raise JobError("Job failed") if job_duration: time.sleep(job_duration) if commit_within_job: From 9ca6bcc5fdc7dcf00f8eaaf0e5d250a3ceaec1e7 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Tue, 24 Feb 2026 16:00:50 +0100 Subject: [PATCH 27/29] Simplify job env management --- queue_job/controllers/main.py | 8 +++----- queue_job/job.py | 14 +++++++------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 5d2da27fd0..cedeb55133 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -12,7 +12,7 @@ from psycopg2 import OperationalError, errorcodes from werkzeug.exceptions import BadRequest, Forbidden -from odoo import SUPERUSER_ID, _, api, http, registry, tools +from odoo import SUPERUSER_ID, _, api, http, tools from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY from odoo.tools import config @@ -149,8 +149,7 @@ def _enqueue_dependent_jobs(cls, env, job): def _runjob(cls, env: api.Environment, job: Job) -> None: def retry_postpone(job, message, seconds=None): job.env.clear() - with registry(job.env.cr.dbname).cursor() as new_cr: - job.env = api.Environment(new_cr, SUPERUSER_ID, {}) + with job.in_temporary_env(): job.postpone(result=message, seconds=seconds) job.set_pending(reset_retry=False) job.store() @@ -194,8 +193,7 @@ def retry_postpone(job, message, seconds=None): traceback_txt = buff.getvalue() _logger.error(traceback_txt) job.env.clear() - with registry(job.env.cr.dbname).cursor() as new_cr: - job.env = job.env(cr=new_cr) + with job.in_temporary_env(): vals = cls._get_failure_values(job, traceback_txt, orig_exception) job.set_failed(**vals) job.store() diff --git a/queue_job/job.py b/queue_job/job.py index 275d7a9dc2..10d1256ef0 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -505,7 +505,7 @@ def perform(self): self.retry += 1 try: if self.job_config.allow_commit: - env_context_manager = self._with_temporary_env() + env_context_manager = self.in_temporary_env() else: env_context_manager = nullcontext() with env_context_manager: @@ -530,14 +530,14 @@ def perform(self): return self.result @contextmanager - def _with_temporary_env(self): + def in_temporary_env(self): with self.env.registry.cursor() as new_cr: - env = self.recordset.env - self.recordset = self.recordset.with_env(env(cr=new_cr)) + env = self.env + self._env = env(cr=new_cr) try: yield finally: - self.recordset = self.recordset.with_env(env) + self._env = env def _get_common_dependent_jobs_query(self): return """ @@ -705,7 +705,7 @@ def env(self): return self.recordset.env @env.setter - def env(self, env): + def _env(self, env): self.recordset = self.recordset.with_env(env) @property @@ -772,7 +772,7 @@ def model_name(self): @property def user_id(self): - return self.recordset.env.uid + return self.env.uid @property def eta(self): From 89810885fc8a924e6c20c85bbed30646a66b9253 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Fri, 27 Feb 2026 08:56:23 +0100 Subject: [PATCH 28/29] Update upgrade warning link --- queue_job/controllers/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index cedeb55133..ac66690438 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -41,7 +41,7 @@ def forbidden_commit(*args, **kwargs): 'You may want to enable the "Allow Commit" option on the Job ' "Function. Alternatively, if the current job is a cron running as " "queue job, you can modify it to run as a normal cron. More details on: " - "https://github.com/OCA/queue/wiki/%5BDRAFT%5D-Upgrade-warning:-commits-inside-jobs" + "https://github.com/OCA/queue/wiki/Upgrade-warning:-commits-inside-jobs" ) original_commit = cr.commit From cf7d7c82c52841b65e31f482a8b9b17ffae61f62 Mon Sep 17 00:00:00 2001 From: Guewen Baconnier Date: Mon, 23 Mar 2026 15:57:54 +0100 Subject: [PATCH 29/29] oca-port: blacklist PR(s) 701, 715, 726, 804 for queue_job --- .oca/oca-port/blacklist/queue_job.json | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .oca/oca-port/blacklist/queue_job.json diff --git a/.oca/oca-port/blacklist/queue_job.json b/.oca/oca-port/blacklist/queue_job.json new file mode 100644 index 0000000000..8820ff0bb1 --- /dev/null +++ b/.oca/oca-port/blacklist/queue_job.json @@ -0,0 +1,8 @@ +{ + "pull_requests": { + "OCA/queue#701": "only 18.0", + "OCA/queue#715": "already in 17.0", + "OCA/queue#726": "since 18.0", + "OCA/queue#804": "since 18.0" + } +}