Skip to content

Commit d75b0a3

Browse files
🎨 release license seats on issues (ITISFoundation#6980)
1 parent 51b214b commit d75b0a3

File tree

4 files changed

+207
-5
lines changed

4 files changed

+207
-5
lines changed

‎services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22
import logging
3-
from datetime import datetime, timedelta, timezone
3+
from datetime import UTC, datetime, timedelta
44

55
from fastapi import FastAPI
66
from models_library.resource_tracker import (
@@ -15,7 +15,11 @@
1515
from ..core.settings import ApplicationSettings
1616
from ..models.credit_transactions import CreditTransactionCreditsAndStatusUpdate
1717
from ..models.service_runs import ServiceRunStoppedAtUpdate
18-
from .modules.db import credit_transactions_db, service_runs_db
18+
from .modules.db import (
19+
credit_transactions_db,
20+
licensed_items_checkouts_db,
21+
service_runs_db,
22+
)
1923
from .utils import compute_service_run_credit_costs, make_negative
2024

2125
_logger = logging.getLogger(__name__)
@@ -116,6 +120,11 @@ async def _close_unhealthy_service(
116120
db_engine, data=update_credit_transaction
117121
)
118122

123+
# 3. Release license seats in case some were checked out but not properly released.
124+
await licensed_items_checkouts_db.force_release_license_seats_by_run_id(
125+
db_engine, service_run_id=service_run_id
126+
)
127+
119128

120129
async def periodic_check_of_running_services_task(app: FastAPI) -> None:
121130
_logger.info("Periodic check started")
@@ -124,7 +133,7 @@ async def periodic_check_of_running_services_task(app: FastAPI) -> None:
124133
app_settings: ApplicationSettings = app.state.settings
125134
_db_engine = app.state.engine
126135

127-
base_start_timestamp = datetime.now(tz=timezone.utc)
136+
base_start_timestamp = datetime.now(tz=UTC)
128137

129138
# Get all current running services (across all products)
130139
total_count: PositiveInt = await service_runs_db.total_service_runs_with_running_status_across_all_products(

‎services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/modules/db/licensed_items_checkouts_db.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
from datetime import datetime
23
from typing import cast
34

@@ -8,6 +9,7 @@
89
LicensedItemCheckoutID,
910
)
1011
from models_library.rest_ordering import OrderBy, OrderDirection
12+
from models_library.services_types import ServiceRunID
1113
from models_library.wallets import WalletID
1214
from pydantic import NonNegativeInt
1315
from servicelib.rabbitmq.rpc_interfaces.resource_usage_tracker.errors import (
@@ -27,6 +29,9 @@
2729
LicensedItemCheckoutDB,
2830
)
2931

32+
_logger = logging.getLogger(__name__)
33+
34+
3035
_SELECTION_ARGS = (
3136
resource_tracker_licensed_items_checkouts.c.licensed_item_checkout_id,
3237
resource_tracker_licensed_items_checkouts.c.licensed_item_id,
@@ -214,3 +219,41 @@ async def get_currently_used_seats_for_item_and_wallet(
214219
if total_sum is None:
215220
return 0
216221
return cast(int, total_sum)
222+
223+
224+
async def force_release_license_seats_by_run_id(
225+
engine: AsyncEngine,
226+
connection: AsyncConnection | None = None,
227+
*,
228+
service_run_id: ServiceRunID,
229+
) -> None:
230+
"""
231+
Purpose: This function is utilized by a periodic heartbeat check task that monitors whether running services are
232+
sending heartbeat signals. If heartbeat signals are not received within a specified timeframe and a service is
233+
deemed unhealthy, this function ensures the proper release of any licensed seats that were not correctly released by
234+
the unhealthy service.
235+
Currently, this functionality is primarily used to handle the release of a single seat allocated to the VIP model.
236+
"""
237+
update_stmt = (
238+
resource_tracker_licensed_items_checkouts.update()
239+
.values(
240+
modified=sa.func.now(),
241+
stopped_at=sa.func.now(),
242+
)
243+
.where(
244+
(
245+
resource_tracker_licensed_items_checkouts.c.service_run_id
246+
== service_run_id
247+
)
248+
& (resource_tracker_licensed_items_checkouts.c.stopped_at.is_(None))
249+
)
250+
.returning(sa.literal_column("*"))
251+
)
252+
253+
async with transaction_context(engine, connection) as conn:
254+
result = await conn.execute(update_stmt)
255+
released_seats = result.fetchall()
256+
if released_seats:
257+
_logger.error(
258+
"Force release of %s seats: %s", len(released_seats), released_seats
259+
)

‎services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/process_message_running_service.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@
3333
ServiceRunLastHeartbeatUpdate,
3434
ServiceRunStoppedAtUpdate,
3535
)
36-
from .modules.db import credit_transactions_db, pricing_plans_db, service_runs_db
36+
from .modules.db import (
37+
credit_transactions_db,
38+
licensed_items_checkouts_db,
39+
pricing_plans_db,
40+
service_runs_db,
41+
)
3742
from .modules.rabbitmq import RabbitMQClient, get_rabbitmq_client
3843
from .utils import (
3944
compute_service_run_credit_costs,
@@ -269,9 +274,15 @@ async def _process_stop_event(
269274
running_service = await service_runs_db.update_service_run_stopped_at(
270275
db_engine, data=update_service_run_stopped_at
271276
)
277+
await licensed_items_checkouts_db.force_release_license_seats_by_run_id(
278+
db_engine, service_run_id=msg.service_run_id
279+
)
272280

273281
if running_service is None:
274-
_logger.error("Nothing to update. This should not happen investigate.")
282+
_logger.error(
283+
"Nothing to update. This should not happen investigate. service_run_id: %s",
284+
msg.service_run_id,
285+
)
275286
return
276287

277288
if running_service.wallet_id and running_service.pricing_unit_cost is not None:
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
# pylint:disable=unused-variable
2+
# pylint:disable=unused-argument
3+
# pylint:disable=redefined-outer-name
4+
# pylint:disable=too-many-arguments
5+
6+
7+
from datetime import UTC, datetime
8+
from typing import Generator
9+
from unittest import mock
10+
11+
import pytest
12+
import sqlalchemy as sa
13+
from models_library.basic_types import IDStr
14+
from models_library.rest_ordering import OrderBy
15+
from simcore_postgres_database.models.resource_tracker_licensed_items_checkouts import (
16+
resource_tracker_licensed_items_checkouts,
17+
)
18+
from simcore_postgres_database.models.resource_tracker_service_runs import (
19+
resource_tracker_service_runs,
20+
)
21+
from simcore_service_resource_usage_tracker.models.licensed_items_checkouts import (
22+
CreateLicensedItemCheckoutDB,
23+
)
24+
from simcore_service_resource_usage_tracker.services.modules.db import (
25+
licensed_items_checkouts_db,
26+
)
27+
28+
pytest_simcore_core_services_selection = [
29+
"postgres",
30+
]
31+
pytest_simcore_ops_services_selection = [
32+
"adminer",
33+
]
34+
35+
36+
_USER_ID_1 = 1
37+
_WALLET_ID = 6
38+
39+
40+
@pytest.fixture()
41+
def resource_tracker_service_run_id(
42+
postgres_db: sa.engine.Engine, random_resource_tracker_service_run
43+
) -> Generator[str, None, None]:
44+
with postgres_db.connect() as con:
45+
result = con.execute(
46+
resource_tracker_service_runs.insert()
47+
.values(
48+
**random_resource_tracker_service_run(
49+
user_id=_USER_ID_1, wallet_id=_WALLET_ID
50+
)
51+
)
52+
.returning(resource_tracker_service_runs.c.service_run_id)
53+
)
54+
row = result.first()
55+
assert row
56+
57+
yield row[0]
58+
59+
con.execute(resource_tracker_licensed_items_checkouts.delete())
60+
con.execute(resource_tracker_service_runs.delete())
61+
62+
63+
async def test_licensed_items_checkouts_db__force_release_license_seats_by_run_id(
64+
mocked_redis_server: None,
65+
mocked_setup_rabbitmq: mock.Mock,
66+
resource_tracker_service_run_id,
67+
initialized_app,
68+
):
69+
engine = initialized_app.state.engine
70+
71+
# SETUP
72+
_create_license_item_checkout_db_1 = CreateLicensedItemCheckoutDB(
73+
licensed_item_id="beb16d18-d57d-44aa-a638-9727fa4a72ef",
74+
wallet_id=_WALLET_ID,
75+
user_id=_USER_ID_1,
76+
user_email="test@test.com",
77+
product_name="osparc",
78+
service_run_id=resource_tracker_service_run_id,
79+
started_at=datetime.now(tz=UTC),
80+
num_of_seats=1,
81+
)
82+
await licensed_items_checkouts_db.create(
83+
engine, data=_create_license_item_checkout_db_1
84+
)
85+
86+
_create_license_item_checkout_db_2 = _create_license_item_checkout_db_1.model_dump()
87+
_create_license_item_checkout_db_2[
88+
"licensed_item_id"
89+
] = "b1b96583-333f-44d6-b1e0-5c0a8af555bf"
90+
await licensed_items_checkouts_db.create(
91+
engine,
92+
data=CreateLicensedItemCheckoutDB.model_construct(
93+
**_create_license_item_checkout_db_2
94+
),
95+
)
96+
97+
_create_license_item_checkout_db_3 = _create_license_item_checkout_db_1.model_dump()
98+
_create_license_item_checkout_db_3[
99+
"licensed_item_id"
100+
] = "38a5ce59-876f-482a-ace1-d3b2636feac6"
101+
checkout = await licensed_items_checkouts_db.create(
102+
engine,
103+
data=CreateLicensedItemCheckoutDB.model_construct(
104+
**_create_license_item_checkout_db_3
105+
),
106+
)
107+
108+
_helper_time = datetime.now(UTC)
109+
await licensed_items_checkouts_db.update(
110+
engine,
111+
licensed_item_checkout_id=checkout.licensed_item_checkout_id,
112+
product_name="osparc",
113+
stopped_at=_helper_time,
114+
)
115+
116+
# TEST FORCE RELEASE LICENSE SEATS
117+
await licensed_items_checkouts_db.force_release_license_seats_by_run_id(
118+
engine, service_run_id=resource_tracker_service_run_id
119+
)
120+
121+
# ASSERT
122+
total, items = await licensed_items_checkouts_db.list_(
123+
engine,
124+
product_name="osparc",
125+
filter_wallet_id=_WALLET_ID,
126+
offset=0,
127+
limit=5,
128+
order_by=OrderBy(field=IDStr("started_at")),
129+
)
130+
assert total == 3
131+
assert len(items) == 3
132+
133+
_helper_count = 0
134+
for item in items:
135+
assert isinstance(item.stopped_at, datetime)
136+
if item.stopped_at > _helper_time:
137+
_helper_count += 1
138+
139+
assert _helper_count == 2

0 commit comments

Comments
 (0)