Skip to content

Commit 701a149

Browse files
authored
Merge pull request #50 from pedohorse/environment-resolver-interface-refactor
Environment resolver interface refactor
2 parents ff4f301 + f11e606 commit 701a149

File tree

6 files changed

+148
-45
lines changed

6 files changed

+148
-45
lines changed

src/lifeblood/environment_resolver.py

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,25 @@
1818
import inspect
1919
import pathlib
2020
import re
21+
import shutil
2122
from copy import deepcopy
2223
from semantic_version import Version, SimpleSpec
2324
from types import MappingProxyType
2425
from . import invocationjob, paths, logging
2526
from .config import get_config
2627
from .toml_coders import TomlFlatConfigEncoder
27-
from .process_utils import oh_no_its_windows
28+
from .process_utils import create_process, oh_no_its_windows
29+
from .exceptions import ProcessInitializationError
2830

29-
from typing import Dict, Mapping, Optional, Type, Iterable
31+
from typing import Dict, Iterable, List, Mapping, Optional, Type
3032

3133

3234
_resolvers: Dict[str, Type["BaseEnvironmentResolver"]] = {} # this should be loaded from plugins
3335

3436

3537
def _populate_resolvers():
38+
# TODO: This horrible logic needs refactoring. All plugins must be loaded with pluginloader
39+
# everything is tightly coupled here, interdependent... no planing
3640
for k, v in dict(globals()).items():
3741
if not inspect.isclass(v) \
3842
or not issubclass(v, BaseEnvironmentResolver) \
@@ -53,7 +57,7 @@ class ResolutionImpossibleError(RuntimeError):
5357

5458
class EnvironmentResolverArguments:
5559
"""
56-
this class objects specity requirements a task/invocation have for int's worker environment wrapper.
60+
this class objects specify requirements a task/invocation have for it's worker environment wrapper.
5761
"""
5862
def __init__(self, resolver_name=None, arguments: Optional[Mapping] = None):
5963
"""
@@ -86,19 +90,24 @@ def remove_argument(self, name: str):
8690
def get_resolver(self):
8791
return get_resolver(self.__resolver_name)
8892

89-
def get_environment(self) -> "invocationjob.Environment":
90-
return get_resolver(self.name()).get_environment(self.arguments())
93+
async def get_environment(self) -> "invocationjob.Environment":
94+
return await get_resolver(self.name()).get_environment(self.arguments())
9195

9296
def serialize(self) -> bytes:
93-
return json.dumps(self.__dict__).encode('utf-8')
97+
return json.dumps({
98+
'_EnvironmentResolverArguments__resolver_name': self.__resolver_name,
99+
'_EnvironmentResolverArguments__args': self.__args,
100+
}).encode('utf-8')
94101

95102
async def serialize_async(self):
96103
return await asyncio.get_running_loop().run_in_executor(None, self.serialize)
97104

98105
@classmethod
99106
def deserialize(cls, data: bytes):
100107
wrp = EnvironmentResolverArguments(None)
101-
wrp.__dict__.update(json.loads(data.decode('utf-8')))
108+
data_dict = json.loads(data.decode('utf-8'))
109+
wrp.__resolver_name = data_dict['_EnvironmentResolverArguments__resolver_name']
110+
wrp.__args = data_dict['_EnvironmentResolverArguments__args']
102111
return wrp
103112

104113
@classmethod
@@ -107,7 +116,7 @@ async def deserialize_async(cls, data: bytes):
107116

108117

109118
class BaseEnvironmentResolver:
110-
def get_environment(self, arguments: Mapping) -> "invocationjob.Environment":
119+
async def get_environment(self, arguments: Mapping) -> "invocationjob.Environment":
111120
"""
112121
this is the main reason for environment wrapper's existance.
113122
give it your specific arguments
@@ -118,17 +127,65 @@ def get_environment(self, arguments: Mapping) -> "invocationjob.Environment":
118127
"""
119128
raise NotImplementedError()
120129

130+
async def create_process(self, arguments: Mapping, call_args: List[str], *, env: Optional[invocationjob.Environment] = None, cwd: Optional[str] = None) -> asyncio.subprocess.Process:
131+
"""
132+
this should create process, maybe in a special way
133+
134+
:param arguments: EnvironmentResolverArguments for the resolver
135+
:param call_args: what to call: process and arguments
136+
:param env: optional environment to launch process in. If None - get_environment should be called
137+
:param cwd: current working directory for the process
138+
"""
139+
raise NotImplementedError()
140+
141+
142+
class BaseSimpleProcessSpawnEnvironmentResolver(BaseEnvironmentResolver):
143+
async def create_process(self, arguments: Mapping, call_args: List[str], *, env: Optional[invocationjob.Environment] = None, cwd: Optional[str] = None) -> asyncio.subprocess.Process:
144+
if env is None:
145+
env = await self.get_environment(arguments)
146+
147+
if os.path.isabs(call_args[0]):
148+
bin_path = call_args[0]
149+
else:
150+
bin_path = shutil.which(call_args[0], path=env.get('PATH', ''))
151+
if bin_path is None:
152+
raise ProcessInitializationError(f'"{call_args[0]}" was not found. Check environment resolver arguments and system setup')
153+
154+
if cwd is None:
155+
cwd = os.path.dirname(bin_path)
121156

122-
class TrivialEnvironmentResolver(BaseEnvironmentResolver):
157+
return await create_process(call_args, env, cwd)
158+
159+
160+
class BaseSimpleProcessSpawnEnvironmentResolverWithPythonCheat(BaseSimpleProcessSpawnEnvironmentResolver):
161+
# even though lifeblood worker runs with python interpreter - it does NOT expect
162+
# python to be available in run environment. User might want to have multiple versions of packaged python.
163+
# However, simple one user artist might not care, and would just want python code to work without any packages setup.
164+
# So to simplify the life of smaller setup users, this hack was introduced.
165+
async def create_process(self, arguments: Mapping, call_args: List[str], *, env: Optional[invocationjob.Environment] = None, cwd: Optional[str] = None) -> asyncio.subprocess.Process:
166+
"""
167+
This introduces path to sys.executable if no python is found in PATH, yet `python` is first arg in call_args
168+
"""
169+
if env is None:
170+
env = await self.get_environment(arguments)
171+
if call_args[0] in ('python', 'python.exe') and shutil.which(call_args[0], path=env.get('PATH', '')) is None:
172+
env.append('PATH', os.path.dirname(sys.executable))
173+
174+
return await super().create_process(arguments, call_args, env=env, cwd=cwd)
175+
176+
177+
class TrivialEnvironmentResolver(BaseSimpleProcessSpawnEnvironmentResolverWithPythonCheat):
123178
"""
124179
trivial environment wrapper does nothing
125180
"""
126-
def get_environment(self, arguments: dict) -> "invocationjob.Environment":
181+
async def get_environment(self, arguments: Mapping) -> "invocationjob.Environment":
127182
env = invocationjob.Environment(os.environ)
183+
for key, value in arguments.items():
184+
env[key] = value
128185
return env
129186

130187

131-
class StandardEnvironmentResolver(BaseEnvironmentResolver):
188+
class StandardEnvironmentResolver(BaseSimpleProcessSpawnEnvironmentResolverWithPythonCheat):
132189
"""
133190
will initialize environment based on requested software versions and it's own config
134191
will raise ResolutionImpossibleError if he doesn't know how to resolve given configuration
@@ -162,7 +219,7 @@ def __init__(self):
162219
self.logger.error('environment resolver configs found, but all have errors! Aborting!')
163220
raise RuntimeError('all resolver configs are broken')
164221

165-
def get_environment(self, arguments: Mapping) -> "invocationjob.Environment":
222+
async def get_environment(self, arguments: Mapping) -> "invocationjob.Environment":
166223
"""
167224
168225
:param arguments: are expected to be in format of package_name: version_specification
@@ -377,21 +434,21 @@ def main(args):
377434

378435
opts = parser.parse_args(args)
379436

437+
logger = logging.get_logger('environment resolver')
438+
logger.info('auto detecting packages...')
380439
packages = StandardEnvironmentResolver.autodetect_software()
381440
if opts.basepath:
382441
for basepath in opts.basepath.split(','):
383442
packages.update(StandardEnvironmentResolver.autodetect_software(basepath))
443+
for pkgname, v in packages.items():
444+
for verstr in v.keys():
445+
logger.info(f'found {pkgname} : {verstr}')
384446

385447
if opts.command == 'generate':
386448
config = get_config('standard_environment_resolver')
387449
if opts.output:
388450
config.override_config_save_location(opts.output)
389451
config.set_toml_encoder_generator(TomlFlatConfigEncoder)
390-
logger = logging.get_logger('environment resolver')
391-
logger.info('standard environment resolver is used, but no configuration found. auto generating configuration...')
392-
for pkgname, v in packages.items():
393-
for verstr in v.keys():
394-
logger.info(f'found {pkgname} : {verstr}')
395452

396453
if opts.override: # do full config override
397454
config.set_option_noasync('packages', packages)
@@ -413,6 +470,7 @@ def main(args):
413470
config.set_option_noasync('packages', conf_packages)
414471

415472
elif opts.command == 'scan':
473+
print('\n')
416474
for pkgname, stuff in packages.items():
417475
print(f'{pkgname}:')
418476
for ver, meta in stuff.items():

src/lifeblood/worker.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from .nethelpers import get_addr_to, get_localhost, get_hostname
1515
from .net_classes import WorkerResources
1616
from .worker_metadata import WorkerMetadata
17-
from .exceptions import ProcessInitializationError, WorkerNotAvailable, AlreadyRunning,\
17+
from .exceptions import WorkerNotAvailable, AlreadyRunning, \
1818
InvocationMessageWrongInvocationId, InvocationMessageAddresseeTimeout, InvocationCancelled
1919
from .worker_messsage_processor import WorkerMessageProcessor
2020
from .scheduler_message_processor import SchedulerWorkerControlClient
@@ -25,7 +25,7 @@
2525
from . import environment_resolver
2626
from .enums import WorkerType, WorkerState, ProcessPriorityAdjustment
2727
from .paths import config_path
28-
from .process_utils import create_process, kill_process_tree
28+
from .process_utils import kill_process_tree
2929
from .misc import event_set_context
3030
from .net_messages.address import AddressChain, DirectAddress
3131
from .net_messages.exceptions import MessageTransferError
@@ -344,10 +344,14 @@ async def run_task(self, task: InvocationJob, report_to: AddressChain):
344344
try:
345345
if task.environment_resolver_arguments() is None:
346346
config = get_config('worker')
347-
env = environment_resolver.get_resolver(config.get_option_noasync('default_env_wrapper.name', 'TrivialEnvironmentResolver'))\
348-
.get_environment(config.get_option_noasync('default_env_wrapper.arguments', {}))
347+
resolver = environment_resolver.get_resolver(config.get_option_noasync('default_env_wrapper.name', 'TrivialEnvironmentResolver'))
348+
resolver_arguments = config.get_option_noasync('default_env_wrapper.arguments', {})
349349
else:
350-
env = task.environment_resolver_arguments().get_environment()
350+
env_res_args = task.environment_resolver_arguments()
351+
resolver = env_res_args.get_resolver()
352+
resolver_arguments = env_res_args.arguments()
353+
354+
env = await resolver.get_environment(resolver_arguments)
351355
except environment_resolver.ResolutionImpossibleError as e:
352356
self.__logger.error(f'cannot run the task: Unable to resolve environment: {str(e)}')
353357
raise
@@ -377,14 +381,11 @@ async def run_task(self, task: InvocationJob, report_to: AddressChain):
377381
# TODO: proper child process priority adjustment should be done, for now it's implemented in constructor.
378382
self.__running_process_start_time = time.time()
379383

380-
if os.path.isabs(args[0]):
381-
bin_path = args[0]
382-
else:
383-
bin_path = shutil.which(args[0], path=env.get('PATH'))
384-
if bin_path is None:
385-
raise ProcessInitializationError(f'"{args[0]}" was not found. Check environment resolver arguments and system setup')
386-
387-
self.__running_process: asyncio.subprocess.Process = await create_process(args, env, os.path.dirname(bin_path))
384+
self.__running_process: asyncio.subprocess.Process = await resolver.create_process(
385+
resolver_arguments,
386+
args,
387+
env=env
388+
)
388389
except Exception as e:
389390
self.__logger.exception('task creation failed with error: %s' % (repr(e),))
390391
raise

src/lifeblood_client/environment_resolver.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
class EnvironmentResolverArguments:
1010
"""
11-
this class objects specity requirements a task/invocation have for int's worker environment wrapper.
11+
this class objects specify requirements a task/invocation have for it's worker environment wrapper.
1212
"""
1313
def __init__(self, resolver_name=None, arguments=None):
1414
"""
@@ -30,10 +30,15 @@ def arguments(self):
3030
return self.__args
3131

3232
def serialize(self): # type: () -> bytes
33-
return json.dumps(self.__dict__).encode('utf-8')
33+
return json.dumps({
34+
'_EnvironmentResolverArguments__resolver_name': self.__resolver_name,
35+
'_EnvironmentResolverArguments__args': self.__args,
36+
}).encode('utf-8')
3437

3538
@classmethod
3639
def deserialize(cls, data): # type: (bytes) -> EnvironmentResolverArguments
3740
wrp = EnvironmentResolverArguments(None)
38-
wrp.__dict__.update(json.loads(data.decode('utf-8')))
41+
data_dict = json.loads(data.decode('utf-8'))
42+
wrp.__resolver_name = data_dict['_EnvironmentResolverArguments__resolver_name']
43+
wrp.__args = data_dict['_EnvironmentResolverArguments__args']
3944
return wrp

src/lifeblood_testing_common/nodes_common.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
from lifeblood.pluginloader import PluginNodeDataProvider
2121
from lifeblood.processingcontext import ProcessingContext
2222
from lifeblood.process_utils import oh_no_its_windows
23-
from lifeblood.environment_resolver import EnvironmentResolverArguments
23+
from lifeblood.environment_resolver import EnvironmentResolverArguments, BaseSimpleProcessSpawnEnvironmentResolver
2424

25-
from typing import Any, Callable, Dict, List, Optional, Set, Union
25+
from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Union
2626

2727

2828
plugin_data_provider = PluginNodeDataProvider()
@@ -34,17 +34,26 @@ def create_node(node_type: str, node_name: str, scheduler, node_id):
3434
return node
3535

3636

37-
class FakeEnvArgs(EnvironmentResolverArguments):
37+
class FakeResolver(BaseSimpleProcessSpawnEnvironmentResolver):
3838
def __init__(self, path_to_bin: str):
3939
super().__init__()
4040
self.__bin_path = Path(path_to_bin)
4141

42-
def get_environment(self):
42+
async def get_environment(self, arguments: Mapping) -> "invocationjob.Environment":
4343
return Environment({**os.environ,
4444
'PATH': os.pathsep.join((str(self.__bin_path), os.environ.get('PATH', ''))),
4545
'PYTHONUNBUFFERED': '1'})
4646

4747

48+
class FakeEnvArgs(EnvironmentResolverArguments):
49+
def __init__(self, path_to_bin: str):
50+
super().__init__()
51+
self.__bin_path = Path(path_to_bin)
52+
53+
def get_resolver(self):
54+
return FakeResolver(self.__bin_path)
55+
56+
4857
class PseudoTaskPool:
4958
def children_ids_for(self, task_id, active_only=False) -> List[int]:
5059
raise NotImplementedError()

tests/test_envisonment_resolver.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import unittest
33
from unittest import mock
44
from lifeblood import environment_resolver
5+
from lifeblood_client import environment_resolver as client_environment_resolver
56
from lifeblood.invocationjob import Environment
67
from lifeblood.toml_coders import TomlFlatConfigEncoder
78
import toml
@@ -21,7 +22,7 @@ def __set__(self, obj, val):
2122
self(obj, val)
2223

2324

24-
class StandardEnvResTest(unittest.TestCase):
25+
class StandardEnvResTest(unittest.IsolatedAsyncioTestCase):
2526
_stash = None
2627
@classmethod
2728
def setUpClass(cls) -> None:
@@ -33,10 +34,10 @@ def tearDownClass(cls) -> None:
3334
if cls._stash is not None:
3435
os.environ['LIFEBLOOD_CONFIG_LOCATION'] = cls._stash
3536

36-
def test_one(self):
37+
async def test_one(self):
3738
ser = environment_resolver.StandardEnvironmentResolver()
3839
origenv = Environment(os.environ)
39-
env = ser.get_environment({'package.houdini': '>18.0.0,<19.0.0'})
40+
env = await ser.get_environment({'package.houdini': '>18.0.0,<19.0.0'})
4041

4142
self.assertEqual(
4243
os.pathsep.join(["/path/to/hfs/bin", "/some/other/path/dunno", origenv.get('PATH', ''), "/whatever/you/want/to/append"]),
@@ -48,11 +49,11 @@ def test_one(self):
4849
env['PYTHONPATH']
4950
)
5051

51-
def test_two(self):
52+
async def test_two(self):
5253
ser = environment_resolver.StandardEnvironmentResolver()
5354
origenv = Environment(os.environ)
54-
env = ser.get_environment({'package.houdini': '>18.0.0,<19.0.0',
55-
'package.assmouth': '~=2.3.2'})
55+
env = await ser.get_environment({'package.houdini': '>18.0.0,<19.0.0',
56+
'package.assmouth': '~=2.3.2'})
5657

5758
self.assertEqual(
5859
os.pathsep.join(["/path/to/hfs/bin", "/some/other/path/dunno", "bananus", origenv.get('PATH', ''), "who/are/you", "/whatever/you/want/to/append"]),
@@ -149,3 +150,23 @@ def test_autodetect_win(self):
149150
print(result)
150151
self.assertIn('houdini.py3_10', result)
151152
self.assertEqual('C:\\Program Files\\Side Effects Software\\Houdini 19.5.640\\bin', result['houdini.py3_10']['19.5.640']['env']['PATH']['prepend'])
153+
154+
155+
class TestMainVsClientCompatibility(unittest.TestCase):
156+
def test_serde1(self):
157+
client_envarg = client_environment_resolver.EnvironmentResolverArguments('foobar', {'abc': 'qwe', 'def': 2.3, 'ghi': ['q', 2, 3.3, {'4': []}]})
158+
data = client_envarg.serialize()
159+
envarg = environment_resolver.EnvironmentResolverArguments.deserialize(data)
160+
161+
self.assertEqual(client_envarg.name(), envarg.name())
162+
self.assertEqual(client_envarg.arguments(), envarg.arguments())
163+
self.assertEqual(client_envarg.serialize(), envarg.serialize())
164+
165+
def test_serde1inv(self):
166+
envarg = environment_resolver.EnvironmentResolverArguments('foobar', {'abc': 'qwe', 'def': 2.3, 'ghi': ['q', 2, 3.3, {'4': []}]})
167+
data = envarg.serialize()
168+
client_envarg = client_environment_resolver.EnvironmentResolverArguments.deserialize(data)
169+
170+
self.assertEqual(client_envarg.name(), envarg.name())
171+
self.assertEqual(client_envarg.arguments(), envarg.arguments())
172+
self.assertEqual(client_envarg.serialize(), envarg.serialize())

0 commit comments

Comments
 (0)