Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[14.0][FIX] shopfloor_reception: Fix qty todo computation #726

Merged
merged 13 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions shopfloor_base/actions/lock.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright 2022 Michael Tietz (MT Software) <mtietz@mt-software.de>
# Copyright 2023 Camptocamp SA (http://www.camptocamp.com)
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html).
import hashlib
import struct
Expand Down Expand Up @@ -26,7 +27,23 @@ def advisory(self, name):
self.env.cr.execute("SELECT pg_advisory_xact_lock(%s);", (int_lock,))
self.env.cr.fetchone()[0]

def for_update(self, records, log_exceptions=False):
"""Lock a table FOR UPDATE"""
sql = "SELECT id FROM %s WHERE ID IN %%s FOR UPDATE" % records._table
self.env.cr.execute(sql, (tuple(records.ids),), log_exceptions=False)
def for_update(self, records, log_exceptions=False, skip_locked=False):
"""Lock rows for update on a specific table.

This function will try to obtain a lock on the rows (records parameter) and
wait until they are available for update.

Using the SKIP LOCKED parameter (better used with only one record), it will
not wait for the row to be available but return False if the lock could not
be obtained.

"""
query = "SELECT id FROM %s WHERE ID IN %%s FOR UPDATE"
if skip_locked:
query += " SKIP LOCKED"
sql = query % records._table
self.env.cr.execute(sql, (tuple(records.ids),), log_exceptions=log_exceptions)
if skip_locked:
rows = self.env.cr.fetchall()
return len(rows) == len(records)
return True
1 change: 1 addition & 0 deletions shopfloor_base/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from . import test_actions_data
from . import test_actions_lock
from . import test_menu_service
from . import test_profile_service
from . import test_scan_anything_service
Expand Down
29 changes: 29 additions & 0 deletions shopfloor_base/tests/test_actions_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2023 Camptocamp SA (http://www.camptocamp.com)
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html).
from contextlib import closing

from .common import CommonCase


class ActionsLockCase(CommonCase):
@classmethod
def setUpClassBaseData(cls):
super().setUpClassBaseData()
cls.partner = cls.env.ref("base.res_partner_12")
with cls.work_on_actions(cls) as work:
cls.lock = work.component(usage="lock")

def test_select_for_update_skip_locked_ok(self):
"""Check the lock is obtained and True is returned."""
result = self.lock.for_update(self.partner, skip_locked=True)
self.assertTrue(result)

def test_select_for_update_skip_locked_not_ok(self):
"""Check the lock is NOT obtained and False is returned."""
with closing(self.registry.cursor()) as cr:
# Simulate another user locked a row
cr.execute(
"SELECT id FROM res_partner WHERE id=%s FOR UPDATE", (self.partner.id,)
)
result = self.lock.for_update(self.partner, skip_locked=True)
self.assertFalse(result)
3 changes: 2 additions & 1 deletion shopfloor_reception/data/shopfloor_scenario_data.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
<field name="options_edit">
{
"auto_post_line": true,
"allow_return": true
"allow_return": true,
"scan_location_or_pack_first": true
mmequignon marked this conversation as resolved.
Show resolved Hide resolved
}
</field>
</record>
Expand Down
Loading
Loading