2
2
import datetime
3
3
import functools
4
4
import json
5
+ from pathlib import Path
5
6
from typing import Any , Final
6
7
7
8
from aws_library .ec2 .models import EC2InstanceBootSpecific , EC2InstanceData , EC2Tags
10
11
ClusterState ,
11
12
OnDemandCluster ,
12
13
)
13
- from models_library .clusters import NoAuthentication
14
+ from models_library .clusters import NoAuthentication , TLSAuthentication
14
15
from models_library .users import UserID
15
16
from models_library .wallets import WalletID
16
17
from types_aiobotocore_ec2 .literals import InstanceStateNameType
20
21
from .dask import get_scheduler_url
21
22
22
23
_DOCKER_COMPOSE_FILE_NAME : Final [str ] = "docker-compose.yml"
24
+ _HOST_DOCKER_COMPOSE_PATH : Final [Path ] = Path (f"/{ _DOCKER_COMPOSE_FILE_NAME } " )
25
+ _HOST_CERTIFICATES_BASE_PATH : Final [Path ] = Path ("/.dask-sidecar-certificates" )
26
+ _HOST_TLS_CA_FILE_PATH : Final [Path ] = _HOST_CERTIFICATES_BASE_PATH / "tls_dask_ca.pem"
27
+ _HOST_TLS_CERT_FILE_PATH : Final [Path ] = (
28
+ _HOST_CERTIFICATES_BASE_PATH / "tls_dask_cert.pem"
29
+ )
30
+ _HOST_TLS_KEY_FILE_PATH : Final [Path ] = _HOST_CERTIFICATES_BASE_PATH / "tls_dask_key.pem"
31
+
32
+
33
+ def _base_64_encode (file : Path ) -> str :
34
+ assert file .exists () # nosec
35
+ with file .open ("rb" ) as f :
36
+ return base64 .b64encode (f .read ()).decode ("utf-8" )
23
37
24
38
25
39
@functools .lru_cache
26
40
def _docker_compose_yml_base64_encoded () -> str :
27
41
file_path = PACKAGE_DATA_FOLDER / _DOCKER_COMPOSE_FILE_NAME
28
- assert file_path .exists () # nosec
29
- with file_path .open ("rb" ) as f :
30
- return base64 .b64encode (f .read ()).decode ("utf-8" )
42
+ return _base_64_encode (file_path )
43
+
44
+
45
+ @functools .lru_cache
46
+ def _write_tls_certificates_commands (auth : TLSAuthentication ) -> list [str ]:
47
+ return [
48
+ f"mkdir --parents { _HOST_CERTIFICATES_BASE_PATH } " ,
49
+ f"echo '{ _base_64_encode (auth .tls_ca_file )} ' > { _HOST_TLS_CA_FILE_PATH } " ,
50
+ f"echo '{ _base_64_encode (auth .tls_client_cert )} ' > { _HOST_TLS_CERT_FILE_PATH } " ,
51
+ f"echo '{ _base_64_encode (auth .tls_client_key )} ' > { _HOST_TLS_KEY_FILE_PATH } " ,
52
+ ]
31
53
32
54
33
55
def _prepare_environment_variables (
@@ -47,21 +69,24 @@ def _convert_to_env_dict(entries: dict[str, Any]) -> str:
47
69
return f"'{ json .dumps (jsonable_encoder (entries ))} '"
48
70
49
71
return [
50
- f"DOCKER_IMAGE_TAG={ app_settings .CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG } " ,
51
- f"DASK_NTHREADS={ app_settings .CLUSTERS_KEEPER_DASK_NTHREADS or '' } " ,
52
72
f"CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID={ app_settings .CLUSTERS_KEEPER_EC2_ACCESS .EC2_ACCESS_KEY_ID } " ,
53
73
f"CLUSTERS_KEEPER_EC2_ENDPOINT={ app_settings .CLUSTERS_KEEPER_EC2_ACCESS .EC2_ENDPOINT } " ,
54
74
f"CLUSTERS_KEEPER_EC2_REGION_NAME={ app_settings .CLUSTERS_KEEPER_EC2_ACCESS .EC2_REGION_NAME } " ,
55
75
f"CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY={ app_settings .CLUSTERS_KEEPER_EC2_ACCESS .EC2_SECRET_ACCESS_KEY } " ,
76
+ f"DASK_NTHREADS={ app_settings .CLUSTERS_KEEPER_DASK_NTHREADS or '' } " ,
77
+ f"DASK_TLS_CA_FILE={ _HOST_TLS_CA_FILE_PATH } " ,
78
+ f"DASK_TLS_CERT={ _HOST_TLS_CERT_FILE_PATH } " ,
79
+ f"DASK_TLS_KEY={ _HOST_TLS_KEY_FILE_PATH } " ,
80
+ f"DOCKER_IMAGE_TAG={ app_settings .CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG } " ,
81
+ f"EC2_INSTANCES_NAME_PREFIX={ cluster_machines_name_prefix } " ,
82
+ f"LOG_LEVEL={ app_settings .LOG_LEVEL } " ,
56
83
f"WORKERS_EC2_INSTANCES_ALLOWED_TYPES={ _convert_to_env_dict (app_settings .CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES .WORKERS_EC2_INSTANCES_ALLOWED_TYPES )} " ,
84
+ f"WORKERS_EC2_INSTANCES_CUSTOM_TAGS={ _convert_to_env_dict (app_settings .CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES .WORKERS_EC2_INSTANCES_CUSTOM_TAGS | additional_custom_tags )} " , # type: ignore
57
85
f"WORKERS_EC2_INSTANCES_KEY_NAME={ app_settings .CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES .WORKERS_EC2_INSTANCES_KEY_NAME } " ,
58
86
f"WORKERS_EC2_INSTANCES_MAX_INSTANCES={ app_settings .CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES .WORKERS_EC2_INSTANCES_MAX_INSTANCES } " ,
59
- f"EC2_INSTANCES_NAME_PREFIX={ cluster_machines_name_prefix } " ,
60
87
f"WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS={ _convert_to_env_list (app_settings .CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES .WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS )} " ,
61
88
f"WORKERS_EC2_INSTANCES_SUBNET_ID={ app_settings .CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES .WORKERS_EC2_INSTANCES_SUBNET_ID } " ,
62
89
f"WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION={ app_settings .CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES .WORKERS_EC2_INSTANCES_TIME_BEFORE_TERMINATION } " ,
63
- f"WORKERS_EC2_INSTANCES_CUSTOM_TAGS={ _convert_to_env_dict (app_settings .CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES .WORKERS_EC2_INSTANCES_CUSTOM_TAGS | additional_custom_tags )} " , # type: ignore
64
- f"LOG_LEVEL={ app_settings .LOG_LEVEL } " ,
65
90
]
66
91
67
92
@@ -82,13 +107,23 @@ def create_startup_script(
82
107
)
83
108
84
109
startup_commands = ec2_boot_specific .custom_boot_scripts .copy ()
110
+
111
+ if isinstance (
112
+ app_settings .CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH ,
113
+ TLSAuthentication ,
114
+ ):
115
+ write_certificates_commands = _write_tls_certificates_commands (
116
+ app_settings .CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH
117
+ )
118
+ startup_commands .extend (write_certificates_commands )
119
+
85
120
startup_commands .extend (
86
121
[
87
122
# NOTE: https://stackoverflow.com/questions/41203492/solving-redis-warnings-on-overcommit-memory-and-transparent-huge-pages-for-ubunt
88
123
"sysctl vm.overcommit_memory=1" ,
89
- f"echo '{ _docker_compose_yml_base64_encoded ()} ' | base64 -d > docker-compose.yml " ,
124
+ f"echo '{ _docker_compose_yml_base64_encoded ()} ' | base64 -d > { _HOST_DOCKER_COMPOSE_PATH } " ,
90
125
"docker swarm init" ,
91
- f"{ ' ' .join (environment_variables )} docker stack deploy --with-registry-auth --compose-file=docker-compose.yml dask_stack" ,
126
+ f"{ ' ' .join (environment_variables )} docker stack deploy --with-registry-auth --compose-file={ _HOST_DOCKER_COMPOSE_PATH } dask_stack" ,
92
127
]
93
128
)
94
129
return "\n " .join (startup_commands )
0 commit comments