Skip to content

Commit 3a5f306

Browse files
KFilippopolitisThanKarab
authored andcommitted
Added controller ip/port at the worker's config.
Added a mount on the data_path for the worker pod too.
1 parent b75e75f commit 3a5f306

32 files changed

+194
-53
lines changed

.github/workflows/standalone_tests.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
- name: Check out repository
1616
uses: actions/checkout@v3
1717

18-
- name: Set up python
18+
- name: Set up Python
1919
uses: actions/setup-python@v4
2020
with:
2121
python-version: 3.8
@@ -96,6 +96,7 @@ jobs:
9696
id: non_smpc_tests
9797
run: poetry run pytest -s -m "not smpc" --cov=exareme2 --cov-report=xml:non_smpc_cov.xml tests/standalone_tests --verbosity=4
9898
env:
99+
PYTHONPATH: ${{ github.workspace }}/exareme2:${{ github.workspace }}
99100
PULL_DOCKER_IMAGES: false
100101

101102
- name: Run SMPC specific standalone tests after releasing previous test resources

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@
5858
celery_cleanup_task_timeout=2
5959
celery_run_udf_task_timeout = 120
6060
61+
[controller]
62+
port = 5000
63+
6164
[privacy]
6265
minimum_row_count = 10
6366
protect_local_data = false

exareme2/algorithms/flower/flower_data_processing.py

+6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212
from sklearn.impute import SimpleImputer
1313

1414
# Constants for project directories and environment configurations
15+
CONTROLLER_IP = os.getenv("CONTROLLER_IP", "127.0.0.1")
16+
CONTROLLER_PORT = os.getenv("CONTROLLER_PORT", 5000)
17+
RESULT_URL = f"http://{CONTROLLER_IP}:{CONTROLLER_PORT}/flower/result"
18+
INPUT_URL = f"http://{CONTROLLER_IP}:{CONTROLLER_PORT}/flower/input"
19+
CDES_URL = f"http://{CONTROLLER_IP}:{CONTROLLER_PORT}/cdes_metadata"
20+
HEADERS = {"Content-type": "application/json", "Accept": "text/plain"}
1521
PROJECT_ROOT = Path(__file__).resolve().parents[3]
1622

1723

exareme2/algorithms/flower/process_manager.py

+31
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,31 @@ def terminate_process(proc, logger, max_attempts=3, wait_time=10):
7070
logger.error(f"Failed to terminate PID {proc.pid} after {max_attempts} attempts.")
7171

7272

73+
def should_terminate_process(cmdline):
74+
"""Check if the process should be terminated based on its command line."""
75+
return cmdline and (
76+
cmdline[-1].endswith("client.py") or cmdline[-1].endswith("server.py")
77+
)
78+
79+
80+
def process_garbage_collect(proc, logger):
81+
"""Terminate a process and handle errors."""
82+
try:
83+
pid = proc.pid
84+
cmdline = proc.cmdline()
85+
if should_terminate_process(cmdline):
86+
logger.info(f"PID: {pid} - Name: {proc.name()} - Cmdline: {cmdline}")
87+
terminate_process(proc, logger)
88+
except psutil.NoSuchProcess:
89+
logger.warn(
90+
f"No process found with PID {proc.pid}. It may have already exited."
91+
)
92+
except psutil.AccessDenied:
93+
logger.error(f"Access denied when attempting to terminate PID {proc.pid}.")
94+
except Exception as e:
95+
logger.error(f"An error occurred while managing PID {proc.pid}: {e}")
96+
97+
7398
class FlowerProcess:
7499
def __init__(self, file, parameters=None, env_vars=None, stdout=None, stderr=None):
75100
self.file = file
@@ -107,3 +132,9 @@ def kill_process(cls, pid, algorithm_name, logger):
107132
logger.error(f"Access denied when attempting to terminate PID {pid}.")
108133
except Exception as e:
109134
logger.error(f"An error occurred while managing PID {pid}: {e}")
135+
136+
@classmethod
137+
def garbage_collect(cls, logger):
138+
"""Garbage collect processes matching specific criteria."""
139+
for proc in psutil.process_iter(["pid", "name", "cmdline"]):
140+
process_garbage_collect(proc, logger)

exareme2/controller/celery/tasks_handler.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -297,24 +297,24 @@ def queue_healthcheck_task(
297297
)
298298

299299
def start_flower_client(
300-
self, request_id, algorithm_name, worker_id
300+
self, request_id, algorithm_name, server_address
301301
) -> WorkerTaskResult:
302302
return self._queue_task(
303303
task_signature=TASK_SIGNATURES["start_flower_client"],
304304
request_id=request_id,
305305
algorithm_name=algorithm_name,
306-
worker_id=worker_id,
306+
server_address=server_address,
307307
)
308308

309309
def start_flower_server(
310-
self, request_id, algorithm_name, number_of_clients, worker_id
310+
self, request_id, algorithm_name, number_of_clients, server_address
311311
) -> WorkerTaskResult:
312312
return self._queue_task(
313313
task_signature=TASK_SIGNATURES["start_flower_server"],
314314
request_id=request_id,
315315
algorithm_name=algorithm_name,
316316
number_of_clients=number_of_clients,
317-
worker_id=worker_id,
317+
server_address=server_address,
318318
)
319319

320320
def stop_flower_server(

exareme2/controller/services/flower/controller.py

+20-9
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
from exareme2.controller.uid_generator import UIDGenerator
88
from exareme2.worker_communication import WorkerInfo
99

10+
FLOWER_SERVER_PORT = "8080"
11+
1012

11-
# Base Exception class for Worker-related exceptions
1213
class WorkerException(Exception):
1314
pass
1415

@@ -59,23 +60,30 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto):
5960
self._create_worker_tasks_handler(request_id, worker)
6061
for worker in workers_info
6162
]
62-
server_task_handler = (
63-
task_handlers[0]
64-
if len(task_handlers) == 1
65-
else self._create_global_handler(request_id)
66-
)
63+
64+
server_task_handler, server_ip = task_handlers[0], workers_info[0].ip
65+
if len(task_handlers) > 1:
66+
global_worker = self.worker_landscape_aggregator.get_global_worker()
67+
server_task_handler = self._create_worker_tasks_handler(
68+
request_id, global_worker
69+
)
70+
server_ip = global_worker.ip
71+
6772
self.flower_execution_info.set_inputdata(
6873
inputdata=algorithm_request_dto.inputdata
6974
)
7075
server_pid = None
7176
clients_pids = {}
77+
server_address = f"{server_ip}:{FLOWER_SERVER_PORT}"
7278

7379
try:
7480
server_pid = server_task_handler.start_flower_server(
75-
algorithm_name, len(task_handlers)
81+
algorithm_name, len(task_handlers), str(server_address)
7682
)
7783
clients_pids = {
78-
handler.start_flower_client(algorithm_name): handler
84+
handler.start_flower_client(
85+
algorithm_name, str(server_address)
86+
): handler
7987
for handler in task_handlers
8088
}
8189

@@ -102,7 +110,10 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto):
102110

103111
def _create_global_handler(self, request_id):
104112
global_worker = self.worker_landscape_aggregator.get_global_worker()
105-
return self._create_worker_tasks_handler(request_id, global_worker)
113+
return (
114+
self._create_worker_tasks_handler(request_id, global_worker),
115+
global_worker.ip,
116+
)
106117

107118
async def _cleanup(
108119
self, algorithm_name, server_task_handler, server_pid, clients_pids

exareme2/controller/services/flower/flower_io_registry.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,13 @@ async def get_result(self) -> Dict[str, Any]:
6262
async def get_result_with_timeout(self) -> Dict[str, Any]:
6363
try:
6464
await asyncio.wait_for(self.get_result(), self._timeout)
65+
self._logger.debug(f"Result with timeout: {self._result}")
66+
return self._result.content
6567
except asyncio.TimeoutError:
6668
error = f"Failed to get result: operation timed out after {self._timeout} seconds"
6769
self._logger.error(error)
6870
self._result = Result(content={"error": error}, status=Status.FAILURE)
69-
self._logger.debug(f"Result with timeout: {self._result}")
70-
return self._result.content
71+
raise TimeoutError(error)
7172

7273
def get_status(self) -> Status:
7374
"""Returns the current status of the execution."""

exareme2/controller/services/flower/tasks_handler.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,16 @@ def worker_id(self) -> str:
2929
def worker_data_address(self) -> str:
3030
return self._db_address
3131

32-
def start_flower_client(self, algorithm_name) -> int:
32+
def start_flower_client(self, algorithm_name, server_address) -> int:
3333
return self._worker_tasks_handler.start_flower_client(
34-
self._request_id, algorithm_name, self._worker_id
34+
self._request_id, algorithm_name, server_address
3535
).get(timeout=self._tasks_timeout)
3636

37-
def start_flower_server(self, algorithm_name: str, number_of_clients: int) -> int:
37+
def start_flower_server(
38+
self, algorithm_name: str, number_of_clients: int, server_address
39+
) -> int:
3840
return self._worker_tasks_handler.start_flower_server(
39-
self._request_id, algorithm_name, number_of_clients, self._worker_id
41+
self._request_id, algorithm_name, number_of_clients, server_address
4042
).get(timeout=self._tasks_timeout)
4143

4244
def stop_flower_server(self, pid: int, algorithm_name: str):

exareme2/worker/Dockerfile

+6
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,16 @@ ENV PYTHONUNBUFFERED=1 \
2323
MONETDB_LOCAL_PASSWORD="executor" \
2424
MONETDB_PUBLIC_USERNAME="guest" \
2525
MONETDB_PUBLIC_PASSWORD="guest" \
26+
DATA_PATH="/opt/data" \
2627
CODE_PATH="/opt/code"
2728
ENV PATH="$POETRY_HOME/bin:$CODE_PATH:$PATH"
2829
WORKDIR $CODE_PATH
2930

31+
ENV PYTHONPATH=$CODE_PATH
32+
RUN mkdir $DATA_PATH
33+
VOLUME $DATA_PATH
34+
35+
3036
#######################################################
3137
# Installing poetry and dependencies
3238
#######################################################

exareme2/worker/README.md

+3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ WORKER_IDENTIFIER=globalworker
1717
WORKER_ROLE=GLOBALWORKER
1818
LOG_LEVEL=INFO
1919
FRAMEWORK_LOG_LEVEL=INFO
20+
DATA_PATH=/opt/data/
21+
CONTROLLER_IP=172.17.0.1
22+
CONTROLLER_PORT=5000
2023
PROTECT_LOCAL_DATA=false
2124
RABBITMQ_IP=172.17.0.1
2225
RABBITMQ_PORT=5670

exareme2/worker/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
import os
22
from importlib.resources import open_text
3+
from pathlib import Path
34

45
import envtoml
56

67
from exareme2 import AttrDict
78
from exareme2 import worker
89

10+
PROJECT_ROOT = Path(__file__).parent.parent.parent
11+
TEST_DATA_FOLDER = PROJECT_ROOT / "tests" / "test_data"
12+
913
if config_file := os.getenv("EXAREME2_WORKER_CONFIG_FILE"):
1014
with open(config_file) as fp:
1115
config = AttrDict(envtoml.load(fp))
16+
config.data_path = TEST_DATA_FOLDER
1217
else:
1318
with open_text(worker, "config.toml") as fp:
1419
config = AttrDict(envtoml.load(fp))

exareme2/worker/config.toml

+5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
identifier = "$WORKER_IDENTIFIER"
22
role = "$WORKER_ROLE"
3+
data_path = "$DATA_PATH"
34

45
log_level = "$LOG_LEVEL"
56
framework_log_level = "$FRAMEWORK_LOG_LEVEL"
67

8+
[controller]
9+
ip = "$CONTROLLER_IP"
10+
port = "$CONTROLLER_PORT"
11+
712
[privacy]
813
minimum_row_count = 10
914
protect_local_data = "$PROTECT_LOCAL_DATA"

exareme2/worker/flower/starter/flower_api.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@
44

55

66
@shared_task
7-
def start_flower_client(request_id: str, algorithm_name, worker_id) -> int:
8-
return flower_service.start_flower_client(request_id, algorithm_name, worker_id)
7+
def start_flower_client(request_id: str, algorithm_name, server_address) -> int:
8+
return flower_service.start_flower_client(
9+
request_id, algorithm_name, server_address
10+
)
911

1012

1113
@shared_task
1214
def start_flower_server(
13-
request_id: str, algorithm_name: str, number_of_clients: int, worker_id
15+
request_id: str, algorithm_name: str, number_of_clients: int, server_address
1416
) -> int:
1517
return flower_service.start_flower_server(
16-
request_id, algorithm_name, number_of_clients, worker_id
18+
request_id, algorithm_name, number_of_clients, server_address
1719
)
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,46 @@
1-
from exareme2 import ALGORITHM_FOLDERS
21
from exareme2.algorithms.flower.process_manager import FlowerProcess
32
from exareme2.worker import config as worker_config
43
from exareme2.worker.utils.logger import get_logger
54
from exareme2.worker.utils.logger import initialise_logger
65

7-
# Dictionary to keep track of running processes
8-
running_processes = {}
9-
SERVER_ADDRESS = "0.0.0.0:8080"
10-
116

127
@initialise_logger
13-
def start_flower_client(request_id: str, algorithm_name, worker_id) -> int:
8+
def start_flower_client(request_id: str, algorithm_name, server_address) -> int:
149
env_vars = {
1510
"MONETDB_IP": worker_config.monetdb.ip,
1611
"MONETDB_PORT": worker_config.monetdb.port,
1712
"MONETDB_USERNAME": worker_config.monetdb.local_username,
1813
"MONETDB_PASSWORD": worker_config.monetdb.local_password,
1914
"MONETDB_DB": worker_config.monetdb.database,
20-
"SERVER_ADDRESS": SERVER_ADDRESS,
15+
"SERVER_ADDRESS": server_address,
2116
"NUMBER_OF_CLIENTS": worker_config.monetdb.database,
17+
"CONTROLLER_IP": worker_config.controller.ip,
18+
"CONTROLLER_PORT": worker_config.controller.port,
19+
"DATA_PATH": worker_config.data_path,
2220
}
23-
with open(f"/tmp/exareme2/{worker_id}.out", "a") as f:
24-
process = FlowerProcess(
25-
f"{algorithm_name}/client.py", env_vars=env_vars, stderr=f, stdout=f
26-
)
27-
running_processes[request_id] = process
28-
logger = get_logger()
21+
process = FlowerProcess(f"{algorithm_name}/client.py", env_vars=env_vars)
22+
logger = get_logger()
2923

30-
logger.info("Starting client.py")
31-
pid = process.start(logger)
24+
logger.info("Starting client.py")
25+
pid = process.start(logger)
3226
logger.info(f"Started client.py process id: {pid}")
3327
return pid
3428

3529

3630
@initialise_logger
3731
def start_flower_server(
38-
request_id: str, algorithm_name: str, number_of_clients: int, worker_id
32+
request_id: str, algorithm_name: str, number_of_clients: int, server_address
3933
) -> int:
4034
env_vars = {
41-
"SERVER_ADDRESS": SERVER_ADDRESS,
35+
"SERVER_ADDRESS": server_address,
4236
"NUMBER_OF_CLIENTS": number_of_clients,
37+
"CONTROLLER_IP": worker_config.controller.ip,
38+
"CONTROLLER_PORT": worker_config.controller.port,
39+
"DATA_PATH": worker_config.data_path,
4340
}
44-
with open(f"/tmp/exareme2/{worker_id}.out", "a") as f:
45-
process = FlowerProcess(
46-
f"{algorithm_name}/server.py", env_vars=env_vars, stderr=f, stdout=f
47-
)
48-
running_processes[request_id] = process
49-
logger = get_logger()
50-
logger.info("Starting server.py")
51-
pid = process.start(logger)
41+
process = FlowerProcess(f"{algorithm_name}/server.py", env_vars=env_vars)
42+
logger = get_logger()
43+
logger.info("Starting server.py")
44+
pid = process.start(logger)
5245
logger.info(f"Started server.py process id: {pid}")
5346
return pid

kubernetes/templates/exareme2-globalnode.yaml

+13
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ spec:
2323
- name: db-data
2424
hostPath:
2525
path: {{ .Values.db.storage_location }}
26+
- name: csv-data
27+
hostPath:
28+
path: {{ .Values.db.csvs_location }}
2629
containers:
2730
- name: monetdb
2831
image: {{ .Values.exareme2_images.repository }}/exareme2_db:{{ .Values.exareme2_images.version }}
@@ -97,6 +100,9 @@ spec:
97100

98101
- name: worker
99102
image: {{ .Values.exareme2_images.repository }}/exareme2_worker:{{ .Values.exareme2_images.version }}
103+
volumeMounts:
104+
- mountPath: /opt/data
105+
name: csv-data
100106
env:
101107
- name: WORKER_IDENTIFIER
102108
value: "globalworker"
@@ -106,6 +112,13 @@ spec:
106112
value: {{ .Values.log_level }}
107113
- name: FRAMEWORK_LOG_LEVEL
108114
value: {{ .Values.framework_log_level }}
115+
- name: CONTROLLER_IP
116+
value: exareme2-controller-service
117+
- name: CONTROLLER_PORT
118+
valueFrom:
119+
configMapKeyRef:
120+
name: exareme2-controller-service
121+
key: EXAREME2_CONTROLLER_SERVICE_SERVICE_PORT
109122
- name: PROTECT_LOCAL_DATA
110123
value: "false" # The GLOBALWORKER does not need to secure its data, since they are not private.
111124
- name: CELERY_TASKS_TIMEOUT

0 commit comments

Comments
 (0)