Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix update of worker last_seen property #875

Merged
merged 3 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions dispatcher/backend/src/routes/requested_tasks/requested_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,20 +223,27 @@ def get(self, session: so.Session, token: AccessToken.Payload):
# record we've seen a worker, if applicable
if token and worker_name:
worker = dbm.Worker.get(session, worker_name, WorkerNotFound)

# Update worker properties only if called as the worker itself, not as an
# admin
if worker.user.username == token.username:
worker.last_seen = getnow()

# IP changed since last encounter
if str(worker.last_ip) != worker_ip:
ip_changed = str(worker.last_ip) != worker_ip
if ip_changed:
logger.info(
f"Worker IP changed detected for {worker_name}: "
f"IP changed from {worker.last_ip} to {worker_ip}"
)
worker.last_ip = worker_ip
# commit explicitely since we are not using an explicit transaction,
# and do it before calling Wasabi so that changes are propagated
# quickly and transaction is not blocking
session.commit()

# commit explicitely last_ip and last_seen changes, since we are not
# using an explicit transaction, and do it before calling Wasabi so
# that changes are propagated quickly and transaction is not blocking
session.commit()

if ip_changed:
if constants.USES_WORKERS_IPS_WHITELIST:
try:
record_ip_change(session=session, worker_name=worker_name)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from typing import List

import pytest

from common import constants
from common.external import ExternalIpUpdater, build_workers_whitelist
from common.external import build_workers_whitelist


class TestWorkersCommon:
Expand Down Expand Up @@ -209,130 +206,3 @@ def test_checkin_another_user(
# response.get_json()["error"]
# == "worker with same name already exists for another user"
# )


class TestWorkerRequestedTasks:
def test_requested_task_worker_as_admin(self, client, access_token, worker):
response = client.get(
"/requested-tasks/worker",
query_string={
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
},
headers={"Authorization": access_token},
)
assert response.status_code == 200

def test_requested_task_worker_as_worker(self, client, make_access_token, worker):
response = client.get(
"/requested-tasks/worker",
query_string={
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
},
headers={"Authorization": make_access_token(worker["username"], "worker")},
)
assert response.status_code == 200

@pytest.mark.parametrize(
"prev_ip, new_ip, external_update_enabled, external_update_fails,"
" external_update_called",
[
("77.77.77.77", "88.88.88.88", False, False, False), # ip update disabled
("77.77.77.77", "77.77.77.77", True, False, False), # ip did not changed
("77.77.77.77", "88.88.88.88", True, False, True), # ip should be updated
("77.77.77.77", "88.88.88.88", True, True, False), # ip update fails
],
)
def test_requested_task_worker_update_ip_whitelist(
self,
client,
make_access_token,
worker,
prev_ip,
new_ip,
external_update_enabled,
external_update_fails,
external_update_called,
):
# call it once to set prev_ip
response = client.get(
"/requested-tasks/worker",
query_string={
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
},
headers={
"Authorization": make_access_token(worker["username"], "worker"),
"X-Forwarded-For": prev_ip,
},
)
assert response.status_code == 200

# check prev_ip has been set
response = client.get("/workers/")
assert response.status_code == 200
response_data = response.get_json()
for item in response_data["items"]:
if item["name"] != worker["name"]:
continue
assert item["last_ip"] == prev_ip

# setup custom ip updater to intercept Wasabi operations
updater = IpUpdaterAndChecker(should_fail=external_update_fails)
assert new_ip not in updater.ip_addresses
ExternalIpUpdater.update = updater.ip_update
constants.USES_WORKERS_IPS_WHITELIST = external_update_enabled

# call it once to set next_ip
response = client.get(
"/requested-tasks/worker",
query_string={
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
},
headers={
"Authorization": make_access_token(worker["username"], "worker"),
"X-Forwarded-For": new_ip,
},
)
if external_update_fails:
assert response.status_code == 503
else:
assert response.status_code == 200
assert updater.ips_updated == external_update_called
if external_update_called:
assert new_ip in updater.ip_addresses

# check new_ip has been set (even if ip update is disabled or has failed)
response = client.get("/workers/")
assert response.status_code == 200
response_data = response.get_json()
for item in response_data["items"]:
if item["name"] != worker["name"]:
continue
assert item["last_ip"] == new_ip


class IpUpdaterAndChecker:
"""Helper class to intercept Wasabi operations and perform assertions"""

def __init__(self, should_fail: bool) -> None:
self.ips_updated = False
self.should_fail = should_fail
self.ip_addresses = []

def ip_update(self, ip_addresses: List):
if self.should_fail:
raise Exception()
else:
self.ips_updated = True
self.ip_addresses = ip_addresses
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
from typing import Any, Dict, List

import pytest

import db.models as dbm
from common import constants
from common.external import ExternalIpUpdater


class IpUpdaterAndChecker:
"""Helper class to intercept Wasabi operations and perform assertions"""

def __init__(self, should_fail: bool) -> None:
self.ips_updated = False
self.should_fail = should_fail
self.ip_addresses = []

def ip_update(self, ip_addresses: List):
if self.should_fail:
raise Exception()
else:
self.ips_updated = True
self.ip_addresses = ip_addresses


@pytest.fixture()
def req_task_query_string(worker):
return {
"worker": worker["name"],
"avail_cpu": 4,
"avail_memory": 2048,
"avail_disk": 4096,
}


@pytest.fixture
def make_headers():
def _make_headers(access_token: str, client_ip: str) -> Dict[str, Any]:
return {
"Authorization": access_token,
"X-Forwarded-For": client_ip,
}

return _make_headers


@pytest.fixture
def admin_headers(make_headers, access_token, default_ip):
def _admin_headers(
access_token: str = access_token, client_ip: str = default_ip
) -> Dict[str, Any]:
return make_headers(access_token=access_token, client_ip=client_ip)

return _admin_headers


@pytest.fixture
def worker_headers(make_headers, make_access_token, worker, default_ip):
def _worker_headers(
access_token: str = make_access_token(worker["username"], "worker"),
client_ip: str = default_ip,
) -> Dict[str, Any]:
return make_headers(access_token=access_token, client_ip=client_ip)

return _worker_headers


@pytest.fixture
def default_ip():
return "192.168.1.1"


@pytest.fixture
def increase_ip():
def _increase_ip(prev_ip):
return f"{str(prev_ip)[:-1]}{int(str(prev_ip)[-1])+1}"

return _increase_ip


def test_requested_task_worker_as_admin(
client,
worker,
req_task_query_string,
admin_headers,
dbsession,
increase_ip,
):
# Retrieve current object from DB
db_worker = dbm.Worker.get(dbsession, worker["name"])
last_seen = db_worker.last_seen
last_ip = db_worker.last_ip

response = client.get(
"/requested-tasks/worker",
query_string=req_task_query_string,
headers=admin_headers(client_ip=increase_ip(last_ip)),
)
assert response.status_code == 200

# Refresh current object from DB
dbsession.expire(db_worker)
db_worker = dbm.Worker.get(dbsession, worker["name"])
# last_seen and last_ip are not updated when endpoint is called as admin
assert last_seen == db_worker.last_seen
assert last_ip == db_worker.last_ip


def test_requested_task_worker_as_worker(
client,
worker,
worker_headers,
req_task_query_string,
increase_ip,
dbsession,
):
# Retrieve current object from DB
db_worker = dbm.Worker.get(dbsession, worker["name"])
last_seen = db_worker.last_seen
last_ip = db_worker.last_ip
new_ip = increase_ip(last_ip)
# Worker checks for requested tasks
response = client.get(
"/requested-tasks/worker",
query_string=req_task_query_string,
headers=worker_headers(client_ip=new_ip),
)
assert response.status_code == 200

# Refresh current object from DB
dbsession.expire(db_worker)
db_worker = dbm.Worker.get(dbsession, worker["name"])
# last_seen and last_ip are updated in DB when endpoint is called as worker
assert last_seen != db_worker.last_seen
assert last_ip != db_worker.last_ip
assert str(db_worker.last_ip) == new_ip

# second call will update only the last_seen attribute
last_seen = db_worker.last_seen
last_ip = db_worker.last_ip
response = client.get(
"/requested-tasks/worker",
query_string=req_task_query_string,
headers=worker_headers(client_ip=new_ip),
)
assert response.status_code == 200

# Refresh current object from DB again
dbsession.expire(db_worker)
db_worker = dbm.Worker.get(dbsession, worker["name"])
# last_seen has been updated again but not last_ip which did not changed
assert last_seen != db_worker.last_seen
assert str(db_worker.last_ip) == new_ip


@pytest.mark.parametrize(
"prev_ip, new_ip, external_update_enabled, external_update_fails,"
" external_update_called",
[
("77.77.77.77", "88.88.88.88", False, False, False), # ip update disabled
("77.77.77.77", "77.77.77.77", True, False, False), # ip did not changed
("77.77.77.77", "88.88.88.88", True, False, True), # ip should be updated
("77.77.77.77", "88.88.88.88", True, True, False), # ip update fails
],
)
def test_requested_task_worker_update_ip_whitelist(
client,
worker,
req_task_query_string,
prev_ip,
new_ip,
worker_headers,
external_update_enabled,
external_update_fails,
external_update_called,
):
# call it once to set prev_ip
response = client.get(
"/requested-tasks/worker",
query_string=req_task_query_string,
headers=worker_headers(client_ip=prev_ip),
)
assert response.status_code == 200

# check prev_ip has been set
response = client.get("/workers/")
assert response.status_code == 200
response_data = response.get_json()
for item in response_data["items"]:
if item["name"] != worker["name"]:
continue
assert item["last_ip"] == prev_ip

# setup custom ip updater to intercept Wasabi operations
updater = IpUpdaterAndChecker(should_fail=external_update_fails)
assert new_ip not in updater.ip_addresses
ExternalIpUpdater.update = updater.ip_update
constants.USES_WORKERS_IPS_WHITELIST = external_update_enabled

# call it once to set next_ip
response = client.get(
"/requested-tasks/worker",
query_string=req_task_query_string,
headers=worker_headers(client_ip=new_ip),
)
if external_update_fails:
assert response.status_code == 503
else:
assert response.status_code == 200
assert updater.ips_updated == external_update_called
if external_update_called:
assert new_ip in updater.ip_addresses

# check new_ip has been set (even if ip update is disabled or has failed)
response = client.get("/workers/")
assert response.status_code == 200
response_data = response.get_json()
for item in response_data["items"]:
if item["name"] != worker["name"]:
continue
assert item["last_ip"] == new_ip