From f9f14a5e108ef2e90990a7df668feb40a8ba9b5c Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Sun, 31 Mar 2024 21:47:15 +0200 Subject: [PATCH 01/23] catch errors in attrib serialization --- src/lifeblood/scheduler/task_processor.py | 25 ++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/lifeblood/scheduler/task_processor.py b/src/lifeblood/scheduler/task_processor.py index 1447a5ca..ab046095 100644 --- a/src/lifeblood/scheduler/task_processor.py +++ b/src/lifeblood/scheduler/task_processor.py @@ -1,3 +1,4 @@ +import functools import sys import traceback import json @@ -60,6 +61,12 @@ def _my_sleep(self): self.__logger.info('entering DORMANT mode') self.__processing_interval_mult = self.__dormant_mode_processing_interval_multiplier + async def _serialize_attributes(self, attributes: dict) -> str: + return await asyncio.get_event_loop().run_in_executor(None, functools.partial(json.dumps, default=lambda x: repr(x)), attributes) + + async def _deserialize_attributes(self, attributes_serialized: str) -> dict: + return await asyncio.get_event_loop().run_in_executor(None, json.loads, attributes_serialized) + @atimeit() async def _awaiter(self, processor_to_run, task_row, abort_state: TaskState, skip_state: TaskState): # TODO: process task generation errors _bench_point_0 = time.perf_counter() @@ -198,21 +205,25 @@ async def _awaiter(self, processor_to_run, task_row, abort_state: TaskState, ski # and update its attributes if provided if len(process_result.split_attributes_to_set) > 0: async with con.execute('SELECT attributes FROM tasks WHERE "id" = ?', (task_row['split_origin_task_id'],)) as attcur: - attributes = await asyncio.get_event_loop().run_in_executor(None, json.loads, (await attcur.fetchone())['attributes']) + attributes = await self._deserialize_attributes((await attcur.fetchone())['attributes']) attributes.update(process_result.split_attributes_to_set) - result_serialized = await asyncio.get_event_loop().run_in_executor(None, json.dumps, attributes) + result_serialized = await self._serialize_attributes(attributes) await con.execute('UPDATE tasks SET "attributes" = ? WHERE "id" = ?', (result_serialized, task_row['split_origin_task_id'])) _bench_point_6 = time.perf_counter() _bench_point_7 = _bench_point_6 if process_result.attributes_to_set: # not None or {} - attributes = await asyncio.get_event_loop().run_in_executor(None, json.loads, task_row['attributes'] or '{}') + attributes = await self._deserialize_attributes(task_row['attributes'] or '{}') attributes.update(process_result.attributes_to_set) for k, v in process_result.attributes_to_set.items(): # TODO: hmmm, None is a valid value... if v is None: del attributes[k] - result_serialized = await asyncio.get_event_loop().run_in_executor(None, json.dumps, attributes) + try: + result_serialized = await self._serialize_attributes(attributes) + except Exception as e: + self.__logger.exception('failed to serialize attributes, setting empty attributes') + result_serialized = {} _bench_point_7 = time.perf_counter() await con.execute('UPDATE tasks SET "attributes" = ? WHERE "id" = ?', (result_serialized, task_id)) @@ -253,9 +264,9 @@ async def _awaiter(self, processor_to_run, task_row, abort_state: TaskState, ski async with con.execute('SELECT attributes FROM "tasks" WHERE "id" = ?', (split_task_id,)) as cur: split_task_dict = await cur.fetchone() assert split_task_dict is not None - split_task_attrs = await asyncio.get_event_loop().run_in_executor(None, json.loads, split_task_dict['attributes']) + split_task_attrs = await self._deserialize_attributes(split_task_dict['attributes']) split_task_attrs.update(attr_dict) - await con.execute('UPDATE "tasks" SET attributes = ? WHERE "id" = ?', (json.dumps(split_task_attrs), split_task_id)) # TODO: run dumps in executor + await con.execute('UPDATE "tasks" SET attributes = ? WHERE "id" = ?', (await self._serialize_attributes(split_task_attrs), split_task_id)) # TODO: run dumps in executor _bench_point_11 = time.perf_counter() con.add_after_commit_callback(self.scheduler.ui_state_access.scheduler_reports_tasks_updated, [ui_task_delta] if ui_task_delta_split is None else [ui_task_delta, ui_task_delta_split]) # ui event @@ -300,7 +311,7 @@ async def _submitter(self, task_row, worker_row): # get task attributes before starting a transaction async with submit_transaction.execute('SELECT attributes FROM tasks WHERE "id" == ?', (task_id,)) as attcur: task_attributes_raw = ((await attcur.fetchone()) or ['{}'])[0] - task_attributes = await asyncio.get_event_loop().run_in_executor(None, json.loads, task_attributes_raw) + task_attributes = await self._deserialize_attributes(task_attributes_raw) assert not submit_transaction.in_transaction, 'logic failed, something is wrong with submission logic' # From 93b178b94aa6d086c6d0eb825f0c603617987cdd Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Mon, 8 Apr 2024 23:13:14 +0200 Subject: [PATCH 02/23] validate attributes on set --- src/lifeblood/environment_resolver.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/lifeblood/environment_resolver.py b/src/lifeblood/environment_resolver.py index e075d7f5..23031d58 100644 --- a/src/lifeblood/environment_resolver.py +++ b/src/lifeblood/environment_resolver.py @@ -70,6 +70,9 @@ def __init__(self, resolver_name=None, arguments: Optional[Mapping] = None): if resolver_name is None and len(arguments) > 0: raise ValueError('if name is None - no arguments are allowed') self.__resolver_name = resolver_name + if arguments is not None: + # validate args + json.dumps(arguments) self.__args = arguments def name(self): @@ -82,6 +85,8 @@ def arguments(self): return MappingProxyType(self.__args) def add_argument(self, name: str, value): + # validate value + json.dumps(value) self.__args[name] = value def remove_argument(self, name: str): From 3143bef181065fa8b814af2c55f79048363c3870 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Mon, 8 Apr 2024 23:15:52 +0200 Subject: [PATCH 03/23] move attrib serialization here, validate attribs on set --- src/lifeblood/nodethings.py | 30 ++++++++++++++++++++--- src/lifeblood/scheduler/task_processor.py | 30 +++++++++-------------- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/src/lifeblood/nodethings.py b/src/lifeblood/nodethings.py index fb62ebf3..0de44963 100644 --- a/src/lifeblood/nodethings.py +++ b/src/lifeblood/nodethings.py @@ -1,3 +1,5 @@ +import asyncio +import functools import json from .invocationjob import InvocationJob @@ -12,6 +14,22 @@ class ProcessingError(RuntimeError): pass +async def serialize_attributes(attributes: dict) -> str: + return await asyncio.get_event_loop().run_in_executor(None, serialize_attributes_core, attributes) + + +async def deserialize_attributes(attributes_serialized: str) -> dict: + return await asyncio.get_event_loop().run_in_executor(None, deserialize_attributes_core, attributes_serialized) + + +def serialize_attributes_core(attributes: dict) -> str: + return json.dumps(attributes, default=lambda x: repr(x)) + + +def deserialize_attributes_core(attributes_serialized: str) -> dict: + return json.loads(attributes_serialized) + + class ProcessingResult: def __init__(self, job: Optional[InvocationJob] = None, spawn: List[TaskSpawn] = None, node_output_name: Optional[str] = None): self.invocation_job: Optional[InvocationJob] = job @@ -39,9 +57,13 @@ def remove_split(self, attributes_to_set=None): """ self.do_split_remove = True if attributes_to_set is not None: + # validate attributes_to_set + serialize_attributes_core(attributes_to_set) # will raise in case of errors self.split_attributes_to_set.update(attributes_to_set) def set_attribute(self, key: str, value): + # validate value first + serialize_attributes_core({key: value}) # will raise in case of errors self.attributes_to_set[key] = value def remove_attribute(self, key: str): @@ -65,16 +87,18 @@ def split_task(self, into: int): self._split_attribs = [{} for _ in range(into)] def set_split_task_attrib(self, split: int, attr_name: str, attr_value): + # validate attrs try: - json.dumps(attr_value) + serialize_attributes({attr_name: attr_value}) except: - raise ValueError('attribs must be json-serializable dict') + raise ValueError('attr_value must be json-serializable') self._split_attribs[split][attr_name] = attr_value def set_split_task_attribs(self, split: int, attribs: dict): + # validate attrs try: assert isinstance(attribs, dict) - json.dumps(attribs) + serialize_attributes(attribs) except: raise ValueError('attribs must be json-serializable dict') self._split_attribs[split] = attribs diff --git a/src/lifeblood/scheduler/task_processor.py b/src/lifeblood/scheduler/task_processor.py index ab046095..d5009d4d 100644 --- a/src/lifeblood/scheduler/task_processor.py +++ b/src/lifeblood/scheduler/task_processor.py @@ -15,7 +15,7 @@ from ..worker_messsage_processor import WorkerControlClient from ..invocationjob import InvocationJob from ..environment_resolver import EnvironmentResolverArguments -from ..nodethings import ProcessingResult +from ..nodethings import ProcessingResult, serialize_attributes, deserialize_attributes from ..exceptions import * from .. import aiosqlite_overlay from ..config import get_config @@ -61,12 +61,6 @@ def _my_sleep(self): self.__logger.info('entering DORMANT mode') self.__processing_interval_mult = self.__dormant_mode_processing_interval_multiplier - async def _serialize_attributes(self, attributes: dict) -> str: - return await asyncio.get_event_loop().run_in_executor(None, functools.partial(json.dumps, default=lambda x: repr(x)), attributes) - - async def _deserialize_attributes(self, attributes_serialized: str) -> dict: - return await asyncio.get_event_loop().run_in_executor(None, json.loads, attributes_serialized) - @atimeit() async def _awaiter(self, processor_to_run, task_row, abort_state: TaskState, skip_state: TaskState): # TODO: process task generation errors _bench_point_0 = time.perf_counter() @@ -122,6 +116,7 @@ async def _awaiter(self, processor_to_run, task_row, abort_state: TaskState, ski _bench_point_1 = time.perf_counter() # why is there lock? it looks locking manually is waaaay more efficient than relying on transaction locking + # IMPORTANT: note that here we trade the ability to rollback transaction for speed by using SHARED transaction async with self.awaiter_lock, self.scheduler.data_access.lazy_data_transaction('awaiter_con') as con: # con.row_factory = aiosqlite.Row # This implicitly starts transaction @@ -205,25 +200,24 @@ async def _awaiter(self, processor_to_run, task_row, abort_state: TaskState, ski # and update its attributes if provided if len(process_result.split_attributes_to_set) > 0: async with con.execute('SELECT attributes FROM tasks WHERE "id" = ?', (task_row['split_origin_task_id'],)) as attcur: - attributes = await self._deserialize_attributes((await attcur.fetchone())['attributes']) + attributes = await deserialize_attributes((await attcur.fetchone())['attributes']) attributes.update(process_result.split_attributes_to_set) - result_serialized = await self._serialize_attributes(attributes) + result_serialized = await serialize_attributes(attributes) await con.execute('UPDATE tasks SET "attributes" = ? WHERE "id" = ?', (result_serialized, task_row['split_origin_task_id'])) _bench_point_6 = time.perf_counter() _bench_point_7 = _bench_point_6 if process_result.attributes_to_set: # not None or {} - attributes = await self._deserialize_attributes(task_row['attributes'] or '{}') + attributes = await deserialize_attributes(task_row['attributes'] or '{}') attributes.update(process_result.attributes_to_set) for k, v in process_result.attributes_to_set.items(): # TODO: hmmm, None is a valid value... if v is None: del attributes[k] - try: - result_serialized = await self._serialize_attributes(attributes) - except Exception as e: - self.__logger.exception('failed to serialize attributes, setting empty attributes') - result_serialized = {} + + # note, we DON'T expect serialization to fail here, as we expect ProcessingResult to validate its own data + result_serialized = await serialize_attributes(attributes) + _bench_point_7 = time.perf_counter() await con.execute('UPDATE tasks SET "attributes" = ? WHERE "id" = ?', (result_serialized, task_id)) @@ -264,9 +258,9 @@ async def _awaiter(self, processor_to_run, task_row, abort_state: TaskState, ski async with con.execute('SELECT attributes FROM "tasks" WHERE "id" = ?', (split_task_id,)) as cur: split_task_dict = await cur.fetchone() assert split_task_dict is not None - split_task_attrs = await self._deserialize_attributes(split_task_dict['attributes']) + split_task_attrs = await deserialize_attributes(split_task_dict['attributes']) split_task_attrs.update(attr_dict) - await con.execute('UPDATE "tasks" SET attributes = ? WHERE "id" = ?', (await self._serialize_attributes(split_task_attrs), split_task_id)) # TODO: run dumps in executor + await con.execute('UPDATE "tasks" SET attributes = ? WHERE "id" = ?', (await serialize_attributes(split_task_attrs), split_task_id)) # TODO: run dumps in executor _bench_point_11 = time.perf_counter() con.add_after_commit_callback(self.scheduler.ui_state_access.scheduler_reports_tasks_updated, [ui_task_delta] if ui_task_delta_split is None else [ui_task_delta, ui_task_delta_split]) # ui event @@ -311,7 +305,7 @@ async def _submitter(self, task_row, worker_row): # get task attributes before starting a transaction async with submit_transaction.execute('SELECT attributes FROM tasks WHERE "id" == ?', (task_id,)) as attcur: task_attributes_raw = ((await attcur.fetchone()) or ['{}'])[0] - task_attributes = await self._deserialize_attributes(task_attributes_raw) + task_attributes = await deserialize_attributes(task_attributes_raw) assert not submit_transaction.in_transaction, 'logic failed, something is wrong with submission logic' # From 7589387c9e55a4dee7e1a16dd3b78f5b99725530 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Tue, 9 Apr 2024 23:14:25 +0200 Subject: [PATCH 04/23] attrib serialization tests --- tests/data/test_attribserialization.db | Bin 0 -> 159744 bytes ...est_attribute_serialization_integration.py | 33 ++++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 tests/data/test_attribserialization.db create mode 100644 tests/test_attribute_serialization_integration.py diff --git a/tests/data/test_attribserialization.db b/tests/data/test_attribserialization.db new file mode 100644 index 0000000000000000000000000000000000000000..7ae1056eaecaef51863ee3e5546dd1b5be1a5819 GIT binary patch literal 159744 zcmeI5U2GdycE?G5QKVzjPU0|$cGr{fCK4N2Yf7?ZdDpvDEZR$~LOHU8Bv={K5oaWg zP0lbgL)ogMUC7SvwgviBv=4m@nzm1U=mLv&U)n_r6vYN;f$l>e+M<1HpW26_D6lU* z_s%ytlt}Btq53B*nY{PhbMN_``*kjPsk`qjXtpY>8)i+mg{j2ZL|h7!r-`SY1nP1bbwb9UHC8A$snDJLa8?5Ct% znT($@=%l3j{geS$X2?(JcT$G>{FFXdX3$H?CMhL-{=A>SKL4N2{#qjYFWH}F&Ft@G z|10}>_K&llP!b*>00JNY0w4eaAOHd&00JNY0w4ea9TGS<#KQk%5dUY~lpy{ec2k1* zKjq2z@qgM)3F7}jPsWe`2i%k({_po>{BXZ7JH*2Ow4cD<|IZ{B`?K$#`I9q)r&rV4 zr>X-h!+$^YugS#@O@vrL;Iqe>RN}SUKc7g{x9v?scYX_2#nkGyu%zB^*h0mqZ}VS| z#dX81$+ombe~5wpNP~D?m>d^Gd%Lbmx?EG~wdg1iDLIZ8GJRYm-;6OkbUxl4qx`D${G%rYEl{6=hbLF05Udp1sDj)n${K zVp9vL#AC5;YKE!V+k&jzZ&-Fs)oq^cw=O2%Z2_0tlB*477&BCoth!p!$>-KW1( zA#uwk$`#A>=PT}bH7^qoafb>tboEhP))iHeJUy2un1PgRs+P-8Z&a&XyCU0ir7ByN z#C7`m<|&<-wnzPbQC_d}w=#rEy+MAnq6RnTkzQuTkr-Qe4#o;Zc3m}1mG!2_elKh? zJCLHWAbyre(I~j_x9{vT3J!MwwDkQM*B27HzBiQqUv+)ziWfgUlcuhC@vk05yW*Ox zOdY8AF$FChZWbNxdCs-u@3(f}8crq7zx3oA3APNB${l%`i85z;ix?Ga@$I!eeLFj& zu20WS7ba)dvj#BV|lZ4*Ep^Q-jp>3c)W<$wLX7MD*R)a8>cF24$wKj*pV&E2;KLUYmL%5o=O zZk~&%Sc`9`_w;S|bJ6Zw{h@}I%D11Tp;WA;p;L$6(7s4RI~q65L`&t?(c;Hlv0KGw zA)1Udw4*t_wV|yudEtnV0U;+UUI1~a;chafcrAx^lU9#o1s;bi^2dQG{I^BJsYZ+Ga>&(ri;@omkSe|=l zp-7)jZ4wuSu}oT|TOR3BdAWGIxFnPpmxc1m!h&#Tsr2sL(ucx#iXRGdE6aA0_<{6i`yA;F5WKFDXLager>0vJkfJJc$sN(geyjaZuqo_a_^ej z23grbjg~4`>HbKImPmyWPthHtuU2d+qCRJM`vNp9E%e=WIwkP7aT;im(GsoaGN$sfGWo#@x@KN*TT z(oc6>>6jUv3Ios3oiUo?+m0dIu<82SK0++Jrq)_{GhYQ7+KOy+(uRG6Sa#bAoHpzw z5?YWvpJwvH`=#Y?)90IorN#G4^LGY@a&Nz#{E$1r^_r!A)KGQm362xJmxB?9G-Zyl zTO8vgn*ZVb{!}iPOaAthKi%?))*6VE=X0j@tmBgIb=f4W16UtsjW3^nj+e5LLTq% zEr_#JPMO?E+RHT8QI)3xFS(4I|F}Dyr@3f_p&Wl!_d1@Rz)miBY3dKVsh3i@IjU=> zt$9wc5;F3YXq`b$V5A%1@IDjdyFDi`l})VPK=|Orp`?CBOuN7|)<&rioBSfJ!aUafWa~6PQu@s)4mi}&Lr>2{VTN)XgmH2D-e-us zQE|l>*I;m^%Pa2|=>*2#OtWln)@}j6hfi3RsoD*bMN~@QPnKRaHYD{CeM5~d@4Lsh zY=s>w`hf(!rtdZhUe`@ygZ76MR$-1pyG~Wdgh3d4VrHPwq9X?6eP{w>9{hXPu5NJfWQE7B_Y& zh+EicMvhFt2b?9{caCUNb>$R4DtuDn*6{g||9T;6fiqD);~Q`Md%>a`6}H0gZ=La1 zBvym_eQ3eSSt!vvw`YfHnf$jhL%F+`+U+QKvCtcgQlUBJ6h8mo%YzVY2LTWO0T2KI z5C8!X009sH0T2Lz&Iqu-|Cc!1nJlD000ck)1V8`;KmY_l00ck)1V8`;dXoU+|K9W) zS`7jq00JNY0w4eaAOHd&00JNY0^JZm{ND{N$btX}fB*=900@8p2!H?xfB*=9KyMO2 z{NI~?L#sgm1V8`;KmY_l00ck)1V8`;K%g4}i2u8x1z8XP0T2KI5C8!X009sH0T2KI z5a>+;i2r-jZ)i0LfB*=900@8p2!H?xfB*=900?wL0P%k}v>*!tAOHd&00JNY0w4ea zAOHd&00O;90P%ls`VFlH0T2KI5C8!X009sH0T2KI5CDO02q6CNh8AQ&00ck)1V8`; zKmY_l00ck)1VErS2_XLOO~0YlAOHd&00JNY0w4eaAOHd&00JP;4FSae-Oz$82!H?x zfB*=900@8p2!H?xfB*>eCIQ6%z3Df!8U#Q91V8`;KmY_l00ck)1V8`;x*>r0zZ+VR z1pyEM0T2KI5C8!X009sH0T2Lz-Xwtdzc>AcR)YWtfB*=900@8p2!H?xfB*=9KsN*s z|93+RvLFBgAOHd&00JNY0w4eaAOHd&(3=EKQ?pME{w$IG_{{&F`JK~0e&G*Jtz|}r zKTkaw`Y|Qr0RkWZ0wC~3C9rF}nHssB8-9AqHnoin)m*POtWC+5tp}2(*BiEEG;H>t zF4t6xC0jR_igU|FVR@-^`*v|j*t0<9o#L&mkQvS3t|2{%*t#y16BedxU;Ew@YMjH|>ebho(dY6~?{6mAIRm4yYSlIj&%MK^A6 zWt#H!OtCz_yYSW2$fexy6RFi=GSF7{F_O!~*u~T1t>RgpOixu!{Ch@LP3}-&vS=3N#vcv9u8JinYmC9ohKg)lVR7E%EDMCeYdz! zyh)vbx1D+X*3#m;LSTuNNxR*FH_-19qVqD7=N;q8f=EMwNWR&gWJ5n7%Rs>)&}8VE zXfnW^(|k+#-qGQleD?F)-Wia&|?{c5%q&g>p= z)MdEaZ3^ETmcooDJY6}D-L`+Dq*)zqQM1#wE)#O3tLZwaZ9 z0%4qs#n>4{CiM&msWUYrq%L=;2rnH1!jEHh1O#8WkQ%u`OWdMMu-TiMZmFj2Opnc- zv7M|-vsAuYTv~2f%GlcFPj~@Bzh?-gyFz(!nXgpG+9BEccke5yk+0GE_k9^r1Mg?HAA-qcOO$K z&lf+SjY3q=S;%>!BQCOqM&ydbOnx`{rBrS%m)xDpx3s~juc;BdVVOMJ){5h_ z@^05v@w)I>+>)ye_H2?8CCRF*6-};6#@c1a~|nELSwHSyRib1T~|$0RrWGFBz3d^j?deC?`Ops?dHT8 zIz@?lbgPcLwaEJC@$_oHw>I@?o*ri-bHQQQc#N*BH`)%|4KFXw)LxvWJcB|*l1;o# z-i?tuNdWems+sA2{z)`?f6sg7ha`I*#D-xg6p|^bMVl;}whe4A5cK<)>^PGTZ34f1 z3^TZ?+UE8iHpk^3snN$^bKNv5wAG9F)=N85+b#;UbLWRf!j^1m@>-Q%8N!BYf6Y>D z!8z2j#szg^V?tmLwypl|C`%u`M+a1e$*EP{QDRj#Ri#l?$MQ^rLVI9E6|{9>+h_<6 zW!+|}6;q|NMnTqvAjn|pYpS3ax+<(~3(nS)DYZ8>i&i+&@=&W*nT+~KtL~57beC!Zl|*PU@@1 z!&RLX@>G*00@jwK-6`Ld!!)ArQJ4t(kDVi;&r|&gMsMU2Q@px4%tWq~!71yf9+A0l=RndM( z3R#j;HI!9xr>W$+VYnsNsxlS1E{qk%C&#DqtSn`+6jDp=XU_@~lRM2CXqMv|3ak?9 zCal^?_MF$;EWK%{jx-7J=53lP6=@dJDqgc5sN11dHOt-i)Lj?KRCmbWTXNOnucuh? zWwW8~#pVa6?^`u{lV%-nHlX0x zsL}yH4P6@3>L$g|Y_uG;3wCx`{Qs=scE#P9hc+o;Pr8A3j8vR+dvPc4?b0G080QV_ z2qx4gPKvj2-aw~(an8hf!x#EgH0C5BW}1$Xit}|ZPScU|gf^!8Q~60WIy4#YV}3}o z=Rw3AD1@z#$&ND_=M4vV^XE8D$-T3Muy4JzBXuaw8;<$BA#`fs9oINX{JelC(SgCL z?j-V)0!j=Y6?l34jDYgOhXh_8KOLaFPL2n>GW=Y?Dbw-6fLE5E2(Yqkjs!e8eip#x znhpaz0e%YLG@|Jkz?0zdzgyM*2LMMMeA3`V{O_IyKG*pFP~Yz_9OL`_F-HsGRR9MN z{KF7&i{RmHM1;2!^{J3_%_%ruPy2|~~uh+d(mHj;* zBiZvHVon&s{XQl;&SV@w-~b}_leHoaAfC$sgct0ujp~Mei9YBsib$D#=%dZ&X-?8R z(@^=&*KBN-PA z_tf!MOn@J@J|;WPWW@i7{}KOxAuc@dyUM~X!*H)TunQ0peant!OVXG{9~|)RKd@++ zy`GTi-FAMB!MPW~(!Dq1y$>yrx-op#ey str: + return 'data/test_attribserialization.db' + + async def _create_test_tasks(self): + return [ + *await self._create_task(node_name='IN1', attributes={}), + *await self._create_task(node_name='IN2', attributes={}), + *await self._create_task(node_name='IN3', attributes={}), + *await self._create_task(node_name='ING', attributes={}), + ] + + def _expected_asks_state(self) -> Union[Tuple[TaskState, bool, str], Dict[int, Tuple[TaskState, bool, str]]]: + return { + 0: (TaskState.ERROR, False, 'python bad1'), + 1: (TaskState.ERROR, False, 'python bad2'), + 2: (TaskState.ERROR, False, 'python bad3'), + 3: (TaskState.DONE, True, 'OUT'), + } From 6b603d6a08a91c10049b946c3384122193354fe1 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Tue, 9 Apr 2024 23:16:46 +0200 Subject: [PATCH 05/23] common serialization modules --- src/lifeblood/attribute_serialization.py | 20 ++++++++++ src/lifeblood/common_serialization.py | 48 ++++++++++++++++++++++++ 2 files changed, 68 insertions(+) create mode 100644 src/lifeblood/attribute_serialization.py create mode 100644 src/lifeblood/common_serialization.py diff --git a/src/lifeblood/attribute_serialization.py b/src/lifeblood/attribute_serialization.py new file mode 100644 index 00000000..aa4ecba4 --- /dev/null +++ b/src/lifeblood/attribute_serialization.py @@ -0,0 +1,20 @@ +import asyncio +import json + +from .common_serialization import AttribSerializer, AttribDeserializer + + +async def serialize_attributes(attributes: dict) -> str: + return await asyncio.get_event_loop().run_in_executor(None, serialize_attributes_core, attributes) + + +async def deserialize_attributes(attributes_serialized: str) -> dict: + return await asyncio.get_event_loop().run_in_executor(None, deserialize_attributes_core, attributes_serialized) + + +def serialize_attributes_core(attributes: dict) -> str: + return json.dumps(attributes, cls=AttribSerializer) # TODO: allow SOME custom object serialization + + +def deserialize_attributes_core(attributes_serialized: str) -> dict: + return json.loads(attributes_serialized, cls=AttribDeserializer) diff --git a/src/lifeblood/common_serialization.py b/src/lifeblood/common_serialization.py new file mode 100644 index 00000000..8e05c361 --- /dev/null +++ b/src/lifeblood/common_serialization.py @@ -0,0 +1,48 @@ +import json + + +class AttribSerializer(json.JSONEncoder): + def _reform(self, obj): + if type(obj) is set: + return { + '__special_object_type__': 'set', + 'items': self._reform(list(obj)) + } + elif type(obj) is tuple: + return { + '__special_object_type__': 'tuple', + 'items': self._reform(list(obj)) + } + elif type(obj) is dict: # int keys case + if any(isinstance(x, (int, float, tuple)) for x in obj.keys()): + return { + '__special_object_type__': 'kvp', + 'items': self._reform([[k, v] for k, v in obj.items()]) + } + return {k: self._reform(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [self._reform(x) for x in obj] + elif isinstance(obj, (int, float, str, bool)) or obj is None: + return obj + raise NotImplementedError(f'serialization not implemented for type "{type(obj)}"') + + def encode(self, o): + return super().encode(self._reform(o)) + + def default(self, obj): + return super().default(obj) + + +class AttribDeserializer(json.JSONDecoder): + def _dedata(self, obj): + special_type = obj.get('__special_object_type__') + if special_type == 'set': + return set(obj.get('items')) + elif special_type == 'tuple': + return tuple(obj.get('items')) + elif special_type == 'kvp': + return {k: v for k, v in obj.get('items')} + return obj + + def __init__(self): + super().__init__(object_hook=self._dedata) From 5495c7907174a2e3efddc0aa11bd4a5b949fdb04 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Tue, 9 Apr 2024 23:18:35 +0200 Subject: [PATCH 06/23] use base serializers --- src/lifeblood/basenode_serializer_v2.py | 55 +++++-------------------- src/lifeblood/nodethings.py | 25 ++--------- 2 files changed, 13 insertions(+), 67 deletions(-) diff --git a/src/lifeblood/basenode_serializer_v2.py b/src/lifeblood/basenode_serializer_v2.py index a0088f77..de382a83 100644 --- a/src/lifeblood/basenode_serializer_v2.py +++ b/src/lifeblood/basenode_serializer_v2.py @@ -1,5 +1,6 @@ from dataclasses import dataclass, is_dataclass import json +from .common_serialization import AttribSerializer, AttribDeserializer from .basenode_serialization import NodeSerializerBase, IncompatibleDeserializationMethod, FailedToApplyNodeState, FailedToApplyParameters from .basenode import BaseNode, NodeParameterType from .uidata import ParameterFullValue @@ -31,27 +32,10 @@ class NodeSerializerV2(NodeSerializerBase): the final string though is json-compliant """ - class Serializer(json.JSONEncoder): - def __reform(self, obj): - if type(obj) is set: - return { - '__special_object_type__': 'set', - 'items': self.__reform(list(obj)) - } - elif type(obj) is tuple: - return { - '__special_object_type__': 'tuple', - 'items': self.__reform(list(obj)) - } - elif type(obj) is dict: # int keys case - if any(isinstance(x, (int, float, tuple)) for x in obj.keys()): - return { - '__special_object_type__': 'kvp', - 'items': self.__reform([[k, v] for k, v in obj.items()]) - } - return {k: self.__reform(v) for k, v in obj.items()} - elif is_dataclass(obj): - dcs = self.__reform(obj.__dict__) # dataclasses.asdict is recursive, kills inner dataclasses + class Serializer(AttribSerializer): + def _reform(self, obj): + if is_dataclass(obj): + dcs = self._reform(obj.__dict__) # dataclasses.asdict is recursive, kills inner dataclasses dcs['__dataclass__'] = obj.__class__.__name__ dcs['__special_object_type__'] = 'dataclass' return dcs @@ -59,38 +43,19 @@ def __reform(self, obj): return {'value': obj.value, '__special_object_type__': 'NodeParameterType' } - elif isinstance(obj, list): - return [self.__reform(x) for x in obj] - elif isinstance(obj, (int, float, str, bool)) or obj is None: - return obj - raise NotImplementedError(f'serialization not implemented for type "{type(obj)}"') + return super()._reform(obj) - def encode(self, o): - return super().encode(self.__reform(o)) - - def default(self, obj): - return super(NodeSerializerV2.Serializer, self).default(obj) - - class Deserializer(json.JSONDecoder): - def dedata(self, obj): + class Deserializer(AttribDeserializer): + def _dedata(self, obj): special_type = obj.get('__special_object_type__') - if special_type == 'set': - return set(obj.get('items')) - elif special_type == 'tuple': - return tuple(obj.get('items')) - elif special_type == 'kvp': - return {k: v for k, v in obj.get('items')} - elif special_type == 'dataclass': + if special_type == 'dataclass': data = globals()[obj['__dataclass__']](**{k: v for k, v in obj.items() if k not in ('__dataclass__', '__special_object_type__')}) if obj['__dataclass__'] == 'NodeData': data.pos = tuple(data.pos) return data elif special_type == 'NodeParameterType': return NodeParameterType(obj['value']) - return obj - - def __init__(self): - super(NodeSerializerV2.Deserializer, self).__init__(object_hook=self.dedata) + return super()._dedata(obj) def serialize(self, node: BaseNode) -> Tuple[bytes, Optional[bytes]]: param_values = {} diff --git a/src/lifeblood/nodethings.py b/src/lifeblood/nodethings.py index 0de44963..d4cda2e6 100644 --- a/src/lifeblood/nodethings.py +++ b/src/lifeblood/nodethings.py @@ -1,7 +1,4 @@ -import asyncio -import functools -import json - +from .attribute_serialization import serialize_attributes_core from .invocationjob import InvocationJob from .taskspawn import TaskSpawn from .environment_resolver import EnvironmentResolverArguments @@ -14,22 +11,6 @@ class ProcessingError(RuntimeError): pass -async def serialize_attributes(attributes: dict) -> str: - return await asyncio.get_event_loop().run_in_executor(None, serialize_attributes_core, attributes) - - -async def deserialize_attributes(attributes_serialized: str) -> dict: - return await asyncio.get_event_loop().run_in_executor(None, deserialize_attributes_core, attributes_serialized) - - -def serialize_attributes_core(attributes: dict) -> str: - return json.dumps(attributes, default=lambda x: repr(x)) - - -def deserialize_attributes_core(attributes_serialized: str) -> dict: - return json.loads(attributes_serialized) - - class ProcessingResult: def __init__(self, job: Optional[InvocationJob] = None, spawn: List[TaskSpawn] = None, node_output_name: Optional[str] = None): self.invocation_job: Optional[InvocationJob] = job @@ -89,7 +70,7 @@ def split_task(self, into: int): def set_split_task_attrib(self, split: int, attr_name: str, attr_value): # validate attrs try: - serialize_attributes({attr_name: attr_value}) + serialize_attributes_core({attr_name: attr_value}) except: raise ValueError('attr_value must be json-serializable') self._split_attribs[split][attr_name] = attr_value @@ -98,7 +79,7 @@ def set_split_task_attribs(self, split: int, attribs: dict): # validate attrs try: assert isinstance(attribs, dict) - serialize_attributes(attribs) + serialize_attributes_core(attribs) except: raise ValueError('attribs must be json-serializable dict') self._split_attribs[split] = attribs From 1e481373f77cc3880b290b547fcbc8d2d3f37093 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Tue, 9 Apr 2024 23:19:05 +0200 Subject: [PATCH 07/23] use dedicated attribute deserializer --- src/lifeblood/processingcontext.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lifeblood/processingcontext.py b/src/lifeblood/processingcontext.py index eb758ff6..6a392528 100644 --- a/src/lifeblood/processingcontext.py +++ b/src/lifeblood/processingcontext.py @@ -2,6 +2,7 @@ from types import MappingProxyType import re +from .attribute_serialization import deserialize_attributes_core from .config import get_config from .environment_resolver import EnvironmentResolverArguments @@ -59,7 +60,7 @@ def __getitem__(self, item): def __init__(self, node: "BaseNode", task_dict: dict): task_dict = dict(task_dict) - self.__task_attributes = json.loads(task_dict.get('attributes', '{}')) + self.__task_attributes = deserialize_attributes_core(task_dict.get('attributes', '{}')) self.__task_dict = task_dict self.__task_wrapper = ProcessingContext.TaskWrapper(task_dict) self.__node_wrapper = ProcessingContext.NodeWrapper(node, self) From 9fde4691105f75dc1d5557f25bf5fa42e0b91d0e Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Tue, 9 Apr 2024 23:19:34 +0200 Subject: [PATCH 08/23] fix attrib serializers import --- src/lifeblood/scheduler/task_processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lifeblood/scheduler/task_processor.py b/src/lifeblood/scheduler/task_processor.py index d5009d4d..28605a8b 100644 --- a/src/lifeblood/scheduler/task_processor.py +++ b/src/lifeblood/scheduler/task_processor.py @@ -15,7 +15,8 @@ from ..worker_messsage_processor import WorkerControlClient from ..invocationjob import InvocationJob from ..environment_resolver import EnvironmentResolverArguments -from ..nodethings import ProcessingResult, serialize_attributes, deserialize_attributes +from ..nodethings import ProcessingResult +from ..attribute_serialization import serialize_attributes, deserialize_attributes from ..exceptions import * from .. import aiosqlite_overlay from ..config import get_config From 2fad3f4a23c4d6ca3468772af85a2199f3523dc9 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Wed, 10 Apr 2024 19:42:52 +0200 Subject: [PATCH 09/23] use attribute_serialization instead of json directly --- src/lifeblood/environment_resolver.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lifeblood/environment_resolver.py b/src/lifeblood/environment_resolver.py index 23031d58..c4bb9727 100644 --- a/src/lifeblood/environment_resolver.py +++ b/src/lifeblood/environment_resolver.py @@ -14,7 +14,6 @@ import asyncio import os import sys -import json import inspect import pathlib import re @@ -24,6 +23,7 @@ from types import MappingProxyType from . import invocationjob, paths, logging from .config import get_config +from .attribute_serialization import serialize_attributes_core, deserialize_attributes_core from .toml_coders import TomlFlatConfigEncoder from .process_utils import create_process, oh_no_its_windows from .exceptions import ProcessInitializationError @@ -72,7 +72,7 @@ def __init__(self, resolver_name=None, arguments: Optional[Mapping] = None): self.__resolver_name = resolver_name if arguments is not None: # validate args - json.dumps(arguments) + serialize_attributes_core(arguments) self.__args = arguments def name(self): @@ -86,7 +86,7 @@ def arguments(self): def add_argument(self, name: str, value): # validate value - json.dumps(value) + serialize_attributes_core(value) self.__args[name] = value def remove_argument(self, name: str): @@ -99,7 +99,7 @@ async def get_environment(self) -> "invocationjob.Environment": return await get_resolver(self.name()).get_environment(self.arguments()) def serialize(self) -> bytes: - return json.dumps({ + return serialize_attributes_core({ '_EnvironmentResolverArguments__resolver_name': self.__resolver_name, '_EnvironmentResolverArguments__args': self.__args, }).encode('utf-8') @@ -110,7 +110,7 @@ async def serialize_async(self): @classmethod def deserialize(cls, data: bytes): wrp = EnvironmentResolverArguments(None) - data_dict = json.loads(data.decode('utf-8')) + data_dict = deserialize_attributes_core(data.decode('utf-8')) wrp.__resolver_name = data_dict['_EnvironmentResolverArguments__resolver_name'] wrp.__args = data_dict['_EnvironmentResolverArguments__args'] return wrp From d71e4917b0a8664a493eb32804e6c64738588fd3 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Wed, 10 Apr 2024 19:45:18 +0200 Subject: [PATCH 10/23] fixed remaining json attrib deserializations --- src/lifeblood/scheduler/scheduler.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/lifeblood/scheduler/scheduler.py b/src/lifeblood/scheduler/scheduler.py index dd692f84..58ee9f29 100644 --- a/src/lifeblood/scheduler/scheduler.py +++ b/src/lifeblood/scheduler/scheduler.py @@ -14,6 +14,7 @@ from .. import logging from .. import paths from ..nodegraph_holder_base import NodeGraphHolderBase +from ..attribute_serialization import serialize_attributes, deserialize_attributes #from ..worker_task_protocol import WorkerTaskClient from ..worker_messsage_processor import WorkerControlClient from ..scheduler_task_protocol import SchedulerTaskProtocol, SpawnStatus @@ -334,7 +335,7 @@ async def get_task_attributes(self, task_id: int) -> Tuple[Dict[str, Any], Optio env_res_args = None if res['environment_resolver_data'] is not None: env_res_args = await EnvironmentResolverArguments.deserialize_async(res['environment_resolver_data']) - return await asyncio.get_event_loop().run_in_executor(None, json.loads, res['attributes']), env_res_args + return await deserialize_attributes(res['attributes']), env_res_args async def get_task_fields(self, task_id: int) -> Dict[str, Any]: """ @@ -1268,12 +1269,12 @@ async def update_task_attributes(self, task_id: int, attributes_to_update: dict, self.__logger.warning(f'update task attributes for {task_id} failed. task id not found.') await con.commit() return - attributes = await asyncio.get_event_loop().run_in_executor(None, json.loads, row['attributes']) + attributes = await deserialize_attributes(row['attributes']) attributes.update(attributes_to_update) for name in attributes_to_delete: if name in attributes: del attributes[name] - await con.execute('UPDATE tasks SET "attributes" = ? WHERE "id" = ?', (await asyncio.get_event_loop().run_in_executor(None, json.dumps, attributes), + await con.execute('UPDATE tasks SET "attributes" = ? WHERE "id" = ?', (await serialize_attributes(attributes), task_id)) await con.commit() @@ -1544,7 +1545,7 @@ async def _inner_shit() -> Tuple[Tuple[SpawnStatus, Optional[int]], ...]: continue async with con.execute('INSERT INTO tasks ("name", "attributes", "parent_id", "state", "node_id", "node_output_name", "environment_resolver_data") VALUES (?, ?, ?, ?, ?, ?, ?)', - (newtask.name(), json.dumps(newtask._attributes()), parent_task_id, # TODO: run dumps in executor + (newtask.name(), await serialize_attributes(newtask._attributes()), parent_task_id, # TODO: run dumps in executor TaskState.SPAWNED.value if newtask.create_as_spawned() else TaskState.WAITING.value, node_id, newtask.node_output_name(), newtask.environment_arguments().serialize() if newtask.environment_arguments() is not None else None)) as newcur: From b084d65dbc7737664db73f369b8b7038e1e91bf5 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Wed, 10 Apr 2024 19:45:58 +0200 Subject: [PATCH 11/23] use attribute_serialization for attribute serialization instead of json --- src/lifeblood/scheduler_ui_protocol.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/lifeblood/scheduler_ui_protocol.py b/src/lifeblood/scheduler_ui_protocol.py index 4e221caf..1ea19653 100644 --- a/src/lifeblood/scheduler_ui_protocol.py +++ b/src/lifeblood/scheduler_ui_protocol.py @@ -4,6 +4,7 @@ import asyncio from asyncio.exceptions import IncompleteReadError from . import logging +from .attribute_serialization import serialize_attributes_core, deserialize_attributes_core from .uidata import NodeUi, Parameter, ParameterLocked, ParameterReadonly, ParameterNotFound, ParameterCannotHaveExpressions from .ui_protocol_data import NodeGraphStructureData, TaskGroupBatchData, TaskBatchData, WorkerBatchData, UiData, InvocationLogData, IncompleteInvocationLogData from .ui_events import TaskEvent @@ -22,12 +23,12 @@ from .scheduler import Scheduler -def _serialize_json_dict(d: dict) -> bytes: - return json.dumps(d).encode('UTF-8') +def _serialize_attrib_dict(d: dict) -> bytes: + return serialize_attributes_core(d).encode('UTF-8') -def _deserialize_json_dict(data: bytes) -> dict: - return json.loads(data.decode('UTF-8')) +def _deserialize_attrib_dict(data: bytes) -> dict: + return deserialize_attributes_core(data.decode('UTF-8')) class SchedulerUiProtocol(asyncio.StreamReaderProtocol): @@ -146,7 +147,7 @@ async def comm_get_task_attribs(): # elif command == b'gettaskattribs': task_id = struct.unpack('>Q', await reader.readexactly(8))[0] attribs, env_attribs = await self.__scheduler.get_task_attributes(task_id) - data_attirbs: bytes = await asyncio.get_event_loop().run_in_executor(None, _serialize_json_dict, attribs) + data_attirbs: bytes = await asyncio.get_event_loop().run_in_executor(None, _serialize_attrib_dict, attribs) data_env: bytes = b'' if env_attribs is not None: data_env: bytes = await EnvironmentResolverArguments.serialize_async(env_attribs) @@ -899,7 +900,7 @@ def get_task_attribs(self, task_id) -> Tuple[Dict[str, Any], Optional[Environmen w.write(struct.pack('>Q', task_id)) w.flush() rcvsize = struct.unpack('>Q', r.readexactly(8))[0] - attribs = _deserialize_json_dict(r.readexactly(rcvsize)) + attribs = _deserialize_attrib_dict(r.readexactly(rcvsize)) rcvsize = struct.unpack('>Q', r.readexactly(8))[0] env_attrs = None if rcvsize > 0: From ac4a5f4014a9cf1b2c2f60bc7b20e1a854c5f40d Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Wed, 10 Apr 2024 19:46:39 +0200 Subject: [PATCH 12/23] use py expressions instead of wobbly half-json syntax --- src/lifeblood_viewer/create_task_dialog.py | 37 +++++++++++++--------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/lifeblood_viewer/create_task_dialog.py b/src/lifeblood_viewer/create_task_dialog.py index b2052749..154a4073 100644 --- a/src/lifeblood_viewer/create_task_dialog.py +++ b/src/lifeblood_viewer/create_task_dialog.py @@ -1,7 +1,5 @@ -import re -import json import shlex -from PySide2.QtWidgets import QWidget, QDialog, QVBoxLayout, QHBoxLayout, QLineEdit, QLabel, QSpinBox, QPushButton +from PySide2.QtWidgets import QWidget, QDialog, QVBoxLayout, QHBoxLayout, QLineEdit, QLabel, QMessageBox, QSpinBox, QPushButton from PySide2.QtCore import Slot, QSize from typing import TYPE_CHECKING, Optional, Tuple, List, Set @@ -58,7 +56,7 @@ def __init__(self, parent=None, init_attributes: Optional[dict] = None): for i, (name, val) in enumerate(init_attributes.items()): attr_layout = self.__attrs_layout.itemAt(i).layout() attr_layout.itemAt(0).widget().setText(name) - attr_layout.itemAt(1).widget().setText(val if isinstance(val, str) else json.dumps(val)) + attr_layout.itemAt(1).widget().setText(repr(val)) attr_layout.itemAt(0).widget().set_current_text_as_default() attr_layout.itemAt(1).widget().set_current_text_as_default() self.__initial_attrib_names.add(name) @@ -76,9 +74,9 @@ def attribute_count_changed(self, val): for _ in range(old_attr_count, val): attr_layout = QHBoxLayout() name = LineEditWithDefaults() - name.setPlaceholderText('attribute name') + name.setPlaceholderText('attribute name (empty will be deleted)') val = LineEditWithDefaults() - val.setPlaceholderText('json-style value') + val.setPlaceholderText('py expression value') attr_layout.addWidget(name, 1) attr_layout.addWidget(val, 3) self.__attrs_layout.addLayout(attr_layout) @@ -92,10 +90,7 @@ def get_attributes(self) -> dict: continue val = attr_layout.itemAt(1).widget().text() - try: - val = json.loads(val) - except json.JSONDecodeError: - val = json.loads(f'"{val}"') + val = eval(val) # TODO: no context? attrs[name] = val @@ -120,10 +115,7 @@ def get_changed_attributes(self) -> Tuple[dict, set]: if attr_layout.itemAt(0).widget().is_at_default() and attr_layout.itemAt(1).widget().is_at_default(): continue - try: - val = json.loads(val) - except json.JSONDecodeError: - val = json.loads(f'"{val}"') + val = eval(val) # TODO: no context? attrs[name] = val @@ -166,7 +158,7 @@ def __init__(self, parent=None, task: Optional["Task"] = None): self.__main_layout.addLayout(accept_layout) # connec - self.__ok_btn.clicked.connect(self.accept) + self.__ok_btn.clicked.connect(self.validate_and_accept) self.__cancel_btn.clicked.connect(self.reject) # init @@ -186,6 +178,21 @@ def __init__(self, parent=None, task: Optional["Task"] = None): def sizeHint(self) -> QSize: return QSize(384, 128) + @Slot() + def validate_and_accept(self): + try: + task_attribs = self.get_task_attributes() + env_attribs = self.get_task_environment_resolver_and_arguments() + except Exception as e: + if isinstance(e, SyntaxError): + QMessageBox.warning(self, 'attribute value syntax error', f'attribute value has to be a valid python expression: {e}') + elif isinstance(e, NameError): + QMessageBox.warning(self, 'attribute value name error', f'attribute value has to be a valid python expression: {e}') + else: + QMessageBox.warning(self, 'attribute value error', f'attribute value has to be a valid python expression: {e}') + return + self.accept() + def get_task_name(self): name = self.__name_edit.text().strip() if name == '': From f543ce67b114641434a43170f7d150c7354a6698 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Thu, 11 Apr 2024 00:46:42 +0200 Subject: [PATCH 13/23] add env resolver serde special cases tests --- tests/test_envisonment_resolver.py | 42 ++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/test_envisonment_resolver.py b/tests/test_envisonment_resolver.py index e7b72b4e..93d34c8c 100644 --- a/tests/test_envisonment_resolver.py +++ b/tests/test_envisonment_resolver.py @@ -170,3 +170,45 @@ def test_serde1inv(self): self.assertEqual(client_envarg.name(), envarg.name()) self.assertEqual(client_envarg.arguments(), envarg.arguments()) self.assertEqual(client_envarg.serialize(), envarg.serialize()) + + def test_serde2(self): + """ + supported non-json cases + """ + client_envarg = client_environment_resolver.EnvironmentResolverArguments( + 'foobar', + { + 'key': (1, 4.3, (6, 5)), + 42: [11, 22, {2, 5, 7}], + 1: False, + -999: (None, True, 5), + 'kek': {'q': 'we', 'a': 'sd'}, + } + ) + data = client_envarg.serialize() + envarg = environment_resolver.EnvironmentResolverArguments.deserialize(data) + + self.assertEqual(client_envarg.name(), envarg.name()) + self.assertEqual(client_envarg.arguments(), envarg.arguments()) + self.assertEqual(client_envarg.serialize(), envarg.serialize()) + + def test_serde2inv(self): + """ + supported non-json cases + """ + envarg = environment_resolver.EnvironmentResolverArguments( + 'foobar', + { + 'key': (1, 4.3, (6, 5)), + 42: [11, 22, {2, 5, 7}], + 1: False, + -999: (None, True, 5), + 'kek': {'q': 'we', 'a': 'sd'}, + } + ) + data = envarg.serialize() + client_envarg = client_environment_resolver.EnvironmentResolverArguments.deserialize(data) + + self.assertEqual(client_envarg.name(), envarg.name()) + self.assertEqual(client_envarg.arguments(), envarg.arguments()) + self.assertEqual(client_envarg.serialize(), envarg.serialize()) \ No newline at end of file From 622a7bce0d1eee596e107c722871e062c3716c6f Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Thu, 11 Apr 2024 00:47:25 +0200 Subject: [PATCH 14/23] duplicate attrib serialization from main module --- .../attribute_serialization.py | 15 ++++++ src/lifeblood_client/common_serialization.py | 52 +++++++++++++++++++ src/lifeblood_client/environment_resolver.py | 7 ++- 3 files changed, 70 insertions(+), 4 deletions(-) create mode 100644 src/lifeblood_client/attribute_serialization.py create mode 100644 src/lifeblood_client/common_serialization.py diff --git a/src/lifeblood_client/attribute_serialization.py b/src/lifeblood_client/attribute_serialization.py new file mode 100644 index 00000000..05db152a --- /dev/null +++ b/src/lifeblood_client/attribute_serialization.py @@ -0,0 +1,15 @@ +# NOTE: this file is a simplified standalone version of lifeblood.attribute_serialization +# And therefore this file should be kept up to date with the original +# +# note as well: this is still kept py2-3 compatible +import json + +from .common_serialization import AttribSerializer, AttribDeserializer + + +def serialize_attributes_core(attributes: dict) -> str: + return json.dumps(attributes, cls=AttribSerializer) # TODO: allow SOME custom object serialization + + +def deserialize_attributes_core(attributes_serialized: str) -> dict: + return json.loads(attributes_serialized, cls=AttribDeserializer) diff --git a/src/lifeblood_client/common_serialization.py b/src/lifeblood_client/common_serialization.py new file mode 100644 index 00000000..2c6272aa --- /dev/null +++ b/src/lifeblood_client/common_serialization.py @@ -0,0 +1,52 @@ +# NOTE: this file is a simplified standalone version of lifeblood.common_serialization +# And therefore this file should be kept up to date with the original +# +# note as well: this is still kept py2-3 compatible +import json + + +class AttribSerializer(json.JSONEncoder): + def _reform(self, obj): + if type(obj) is set: + return { + '__special_object_type__': 'set', + 'items': self._reform(list(obj)) + } + elif type(obj) is tuple: + return { + '__special_object_type__': 'tuple', + 'items': self._reform(list(obj)) + } + elif type(obj) is dict: # int keys case + if any(isinstance(x, (int, float, tuple)) for x in obj.keys()): + return { + '__special_object_type__': 'kvp', + 'items': self._reform([[k, v] for k, v in obj.items()]) + } + return {k: self._reform(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [self._reform(x) for x in obj] + elif isinstance(obj, (int, float, str, bool)) or obj is None: + return obj + raise NotImplementedError(f'serialization not implemented for type "{type(obj)}"') + + def encode(self, o): + return super(AttribSerializer, self).encode(self._reform(o)) + + def default(self, obj): + return super(AttribSerializer, self).default(obj) + + +class AttribDeserializer(json.JSONDecoder): + def _dedata(self, obj): + special_type = obj.get('__special_object_type__') + if special_type == 'set': + return set(obj.get('items')) + elif special_type == 'tuple': + return tuple(obj.get('items')) + elif special_type == 'kvp': + return {k: v for k, v in obj.get('items')} + return obj + + def __init__(self): + super(AttribDeserializer, self).__init__(object_hook=self._dedata) diff --git a/src/lifeblood_client/environment_resolver.py b/src/lifeblood_client/environment_resolver.py index 251663b0..2f97b34e 100644 --- a/src/lifeblood_client/environment_resolver.py +++ b/src/lifeblood_client/environment_resolver.py @@ -2,8 +2,7 @@ # And therefore this file should be kept up to date with the original # # note as well: this is still kept py2-3 compatible - -import json +from .attribute_serialization import serialize_attributes_core, deserialize_attributes_core class EnvironmentResolverArguments: @@ -30,7 +29,7 @@ def arguments(self): return self.__args def serialize(self): # type: () -> bytes - return json.dumps({ + return serialize_attributes_core({ '_EnvironmentResolverArguments__resolver_name': self.__resolver_name, '_EnvironmentResolverArguments__args': self.__args, }).encode('utf-8') @@ -38,7 +37,7 @@ def serialize(self): # type: () -> bytes @classmethod def deserialize(cls, data): # type: (bytes) -> EnvironmentResolverArguments wrp = EnvironmentResolverArguments(None) - data_dict = json.loads(data.decode('utf-8')) + data_dict = deserialize_attributes_core(data.decode('utf-8')) wrp.__resolver_name = data_dict['_EnvironmentResolverArguments__resolver_name'] wrp.__args = data_dict['_EnvironmentResolverArguments__args'] return wrp From cae3447dadd51681df342cc2ec03952c1da8d00f Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Fri, 12 Apr 2024 15:59:51 +0200 Subject: [PATCH 15/23] removed done TODO --- src/lifeblood/attribute_serialization.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lifeblood/attribute_serialization.py b/src/lifeblood/attribute_serialization.py index aa4ecba4..cefe8767 100644 --- a/src/lifeblood/attribute_serialization.py +++ b/src/lifeblood/attribute_serialization.py @@ -13,7 +13,7 @@ async def deserialize_attributes(attributes_serialized: str) -> dict: def serialize_attributes_core(attributes: dict) -> str: - return json.dumps(attributes, cls=AttribSerializer) # TODO: allow SOME custom object serialization + return json.dumps(attributes, cls=AttribSerializer) def deserialize_attributes_core(attributes_serialized: str) -> dict: From 31664137b04cb111d81914f83b2eb6cd37c5cde7 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Fri, 12 Apr 2024 16:00:20 +0200 Subject: [PATCH 16/23] use context.task_attributes() --- src/lifeblood/core_nodes/parent_children_waiter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lifeblood/core_nodes/parent_children_waiter.py b/src/lifeblood/core_nodes/parent_children_waiter.py index 1de6e5ad..b9e12eff 100644 --- a/src/lifeblood/core_nodes/parent_children_waiter.py +++ b/src/lifeblood/core_nodes/parent_children_waiter.py @@ -1,6 +1,7 @@ import dataclasses from dataclasses import dataclass import json +from lifeblood.attribute_serialization import deserialize_attributes_core from lifeblood.basenode import BaseNode, ProcessingError from lifeblood.nodethings import ProcessingResult from lifeblood.taskspawn import TaskSpawn @@ -177,7 +178,7 @@ def process_task(self, context) -> ProcessingResult: self.__cache_children[parent_id] = ParentChildrenWaiterNode.Entry() if task_id not in self.__cache_children[parent_id].children: self.__cache_children[parent_id].children.add(task_id) - self.__cache_children[parent_id].all_children_dicts[task_id] = json.loads(context.task_field('attributes')) + self.__cache_children[parent_id].all_children_dicts[task_id] = dict(context.task_attributes()) self.__cache_children[parent_id].all_children_dicts[task_id]['_builtin_id'] = task_id # promote children attribs up if recursive and children_count > 0: From f7261a5c8808a5b6969924e198ab747207663cea Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Fri, 12 Apr 2024 16:00:51 +0200 Subject: [PATCH 17/23] use deserialize_attributes instead of json --- src/lifeblood/processingcontext.py | 2 +- src/lifeblood/scheduler/data_access.py | 3 ++- .../houdini_distributed_tracker_runner.py | 3 ++- src/lifeblood_testing_common/nodes_common.py | 18 +++++++++--------- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/lifeblood/processingcontext.py b/src/lifeblood/processingcontext.py index 6a392528..d5f5dac3 100644 --- a/src/lifeblood/processingcontext.py +++ b/src/lifeblood/processingcontext.py @@ -16,7 +16,7 @@ class ProcessingContext: class TaskWrapper: def __init__(self, task_dict: dict): - self.__attributes = json.loads(task_dict.get('attributes', '{}')) + self.__attributes = deserialize_attributes_core(task_dict.get('attributes', '{}')) self.__stuff = task_dict def __getitem__(self, item): diff --git a/src/lifeblood/scheduler/data_access.py b/src/lifeblood/scheduler/data_access.py index 9d9342b3..6ad6d471 100644 --- a/src/lifeblood/scheduler/data_access.py +++ b/src/lifeblood/scheduler/data_access.py @@ -5,6 +5,7 @@ import struct import json from dataclasses import dataclass +from ..attribute_serialization import serialize_attributes from ..db_misc import sql_init_script from ..expiring_collections import ExpiringValuesSetMap from ..config import get_config @@ -114,7 +115,7 @@ async def create_task(self, newtask: TaskSpawnData, *, con: Optional[aiosqlite.C return ret async with con.execute('INSERT INTO tasks ("name", "attributes", "parent_id", "state", "node_id", "node_output_name", "environment_resolver_data") VALUES (?, ?, ?, ?, ?, ?, ?)', - (newtask.name, json.dumps(newtask.attributes), newtask.parent_id, # TODO: run dumps in executor + (newtask.name, await serialize_attributes(newtask.attributes), newtask.parent_id, newtask.state.value, newtask.node_id, newtask.node_output_name, newtask.environment_resolver_arguments.serialize() if newtask.environment_resolver_arguments is not None else None)) as newcur: diff --git a/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_runner.py b/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_runner.py index 977115c9..4ef84a82 100644 --- a/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_runner.py +++ b/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_runner.py @@ -4,6 +4,7 @@ from lifeblood.nodethings import ProcessingResult, InvocationJob from lifeblood.invocationjob import InvocationRequirements from lifeblood.enums import NodeParameterType, WorkerType +from lifeblood.attribute_serialization import serialize_attributes_core from typing import Iterable @@ -51,7 +52,7 @@ def process_task(self, context) -> ProcessingResult: addressee_name = str(uuid.uuid4()) invoc = InvocationJob(['python', ':/work_to_do.py', context.param_value('port'), context.param_value('wport'), addressee_name, ':/task_base_attrs.json']) invoc.set_extra_file('work_to_do.py', code) - invoc.set_extra_file('task_base_attrs.json', json.dumps(dict(context.task_attributes()))) + invoc.set_extra_file('task_base_attrs.json', serialize_attributes_core(dict(context.task_attributes()))) if context.param_value('use helper'): invoc.set_requirements(InvocationRequirements(worker_type=WorkerType.SCHEDULER_HELPER)) diff --git a/src/lifeblood_testing_common/nodes_common.py b/src/lifeblood_testing_common/nodes_common.py index 3e0ff40c..644af94d 100644 --- a/src/lifeblood_testing_common/nodes_common.py +++ b/src/lifeblood_testing_common/nodes_common.py @@ -6,8 +6,8 @@ import tempfile from pathlib import Path import sqlite3 -import json from unittest import mock, IsolatedAsyncioTestCase +from lifeblood.attribute_serialization import serialize_attributes_core, deserialize_attributes_core from lifeblood.enums import TaskState from lifeblood.db_misc import sql_init_script from lifeblood.basenode import BaseNode, ProcessingResult @@ -71,7 +71,7 @@ def __init__(self, pool: PseudoTaskPool, task_id: int, attrs: dict, parent_id: O 'node_input_name': self.__input_name, 'state': self.__state.value, 'parent_id': parent_id, - 'attributes': json.dumps(attrs), + 'attributes': serialize_attributes_core(attrs), **(extra_fields or {}) } @@ -98,15 +98,15 @@ def task_dict(self) -> dict: }} def attributes(self) -> dict: - return json.loads(self.__task_dict['attributes']) + return deserialize_attributes_core(self.__task_dict['attributes']) def update_attribs(self, attribs_to_set: dict, attribs_to_delete: Optional[Set[str]] = None): - attrs: dict = json.loads(self.__task_dict['attributes']) + attrs: dict = deserialize_attributes_core(self.__task_dict['attributes']) attrs.update(attribs_to_set) if attribs_to_delete: for attr in attribs_to_delete: attrs.pop(attr) - self.__task_dict['attributes'] = json.dumps(attrs) + self.__task_dict['attributes'] = serialize_attributes_core(attrs) def set_state(self, state: TaskState): self.__state = state @@ -327,7 +327,7 @@ async def _logic(scheduler, workers, tmp_script_path, done_waiter): 'outimage': out_exr_path, 'frames': [1, 2, 3] } - res = node.process_task(ProcessingContext(node, {'attributes': json.dumps(start_attrs)})) + res = node.process_task(ProcessingContext(node, {'attributes': serialize_attributes_core(start_attrs)})) ij = res.invocation_job self.assertTrue(ij is not None) @@ -354,7 +354,7 @@ async def _logic(scheduler, workers, tmp_script_path, done_waiter): await asyncio.wait([done_waiter], timeout=30) # now postprocess task - res = node.postprocess_task(ProcessingContext(node, {'attributes': json.dumps({ + res = node.postprocess_task(ProcessingContext(node, {'attributes': serialize_attributes_core({ **start_attrs, **updated_attrs })})) @@ -427,7 +427,7 @@ async def _logic(scheduler, workers, script_path, done_waiter): for param, val in params.items(): node.set_param_value(param, val) - res = node.process_task(ProcessingContext(node, {'attributes': json.dumps(task_attrs)})) + res = node.process_task(ProcessingContext(node, {'attributes': serialize_attributes_core(task_attrs)})) if res.attributes_to_set: updated_attrs.update(res.attributes_to_set) @@ -459,7 +459,7 @@ async def _logic(scheduler, workers, script_path, done_waiter): await asyncio.wait([done_waiter], timeout=30) # now postprocess task - res = node.postprocess_task(ProcessingContext(node, {'attributes': json.dumps({ + res = node.postprocess_task(ProcessingContext(node, {'attributes': serialize_attributes_core({ **task_attrs, **updated_attrs })})) From c48ed33d839fc841f5ef6323462b2341c8cec842 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Fri, 12 Apr 2024 17:33:49 +0200 Subject: [PATCH 18/23] change attrib relaed comments --- src/lifeblood/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lifeblood/worker.py b/src/lifeblood/worker.py index b65ded1a..c20978ae 100644 --- a/src/lifeblood/worker.py +++ b/src/lifeblood/worker.py @@ -365,6 +365,7 @@ async def run_task(self, task: InvocationJob, report_to: AddressChain): env['LIFEBLOOD_RUNTIME_IID'] = task.invocation_id() env['LIFEBLOOD_RUNTIME_TID'] = task.task_id() env['LIFEBLOOD_RUNTIME_SCHEDULER_ADDR'] = self.__local_invocation_server_address_string + # we do NOT set all attribs to env - just a frame list can easily hit proc env size limit for aname, aval in task.attributes().items(): if aname.startswith('_'): # skip attributes starting with _ continue @@ -372,7 +373,6 @@ async def run_task(self, task: InvocationJob, report_to: AddressChain): if isinstance(aval, (str, int, float)): env[f'LBATTR_{aname}'] = str(aval) - # env['LBATTRS_JSON'] = json.dumps(dict(task.attributes())) if self.__extra_files_base_dir is not None: env['LB_EF_ROOT'] = self.__extra_files_base_dir try: From 77792d379e13b7b77c77f594eff43d196c074b24 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Sat, 13 Apr 2024 11:14:33 +0200 Subject: [PATCH 19/23] add py2-3 compatible version of attribute serializer --- .../lifeblood_connection.py | 62 ++++++++++++++++++- .../attribute_serialization.py | 6 +- 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/src/lifeblood/worker_runtime_pythonpath/lifeblood_connection.py b/src/lifeblood/worker_runtime_pythonpath/lifeblood_connection.py index 2a29fcee..429e4811 100644 --- a/src/lifeblood/worker_runtime_pythonpath/lifeblood_connection.py +++ b/src/lifeblood/worker_runtime_pythonpath/lifeblood_connection.py @@ -17,6 +17,66 @@ except ImportError: pass +# duplicated from lifeblood_clinet + + +class AttribSerializer(json.JSONEncoder): + def _reform(self, obj): + if type(obj) is set: + return { + '__special_object_type__': 'set', + 'items': self._reform(list(obj)) + } + elif type(obj) is tuple: + return { + '__special_object_type__': 'tuple', + 'items': self._reform(list(obj)) + } + elif type(obj) is dict: # int keys case + if any(isinstance(x, (int, float, tuple)) for x in obj.keys()): + return { + '__special_object_type__': 'kvp', + 'items': self._reform([[k, v] for k, v in obj.items()]) + } + return {k: self._reform(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [self._reform(x) for x in obj] + elif isinstance(obj, (int, float, str, bool)) or obj is None: + return obj + raise NotImplementedError(f'serialization not implemented for type "{type(obj)}"') + + def encode(self, o): + return super(AttribSerializer, self).encode(self._reform(o)) + + def default(self, obj): + return super(AttribSerializer, self).default(obj) + + +class AttribDeserializer(json.JSONDecoder): + def _dedata(self, obj): + special_type = obj.get('__special_object_type__') + if special_type == 'set': + return set(obj.get('items')) + elif special_type == 'tuple': + return tuple(obj.get('items')) + elif special_type == 'kvp': + return {k: v for k, v in obj.get('items')} + return obj + + def __init__(self): + super(AttribDeserializer, self).__init__(object_hook=self._dedata) + + +def serialize_attributes_core(attributes): # type: (dict) -> str + return json.dumps(attributes, cls=AttribSerializer) + + +def deserialize_attributes_core(attributes_serialized): # type: (str) -> dict + return json.loads(attributes_serialized, cls=AttribDeserializer) + + +# + class MessageSendError(RuntimeError): pass @@ -220,7 +280,7 @@ def _send(): sock = _connect_to_worker(timeout=30) send_string(sock, 'tupdateattribs') - updata = json.dumps(attribs).encode('UTF-8') + updata = serialize_attributes_core(attribs).encode('UTF-8') # need to use attrib serialization sock.sendall(struct.pack('>QQQ', task_id, len(updata), 0)) sock.sendall(updata) sock.recv(1) # recv confirmation diff --git a/src/lifeblood_client/attribute_serialization.py b/src/lifeblood_client/attribute_serialization.py index 05db152a..f5d47de9 100644 --- a/src/lifeblood_client/attribute_serialization.py +++ b/src/lifeblood_client/attribute_serialization.py @@ -7,9 +7,9 @@ from .common_serialization import AttribSerializer, AttribDeserializer -def serialize_attributes_core(attributes: dict) -> str: - return json.dumps(attributes, cls=AttribSerializer) # TODO: allow SOME custom object serialization +def serialize_attributes_core(attributes): # type: (dict) -> str + return json.dumps(attributes, cls=AttribSerializer) -def deserialize_attributes_core(attributes_serialized: str) -> dict: +def deserialize_attributes_core(attributes_serialized): # type: (str) -> dict return json.loads(attributes_serialized, cls=AttribDeserializer) From ada24fcb09dd06f71250fa8ccb07225686e694ec Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Sat, 13 Apr 2024 11:16:05 +0200 Subject: [PATCH 20/23] remove unused json, use attribute_serialization --- src/lifeblood/core_nodes/parent_children_waiter.py | 1 - src/lifeblood/core_nodes/split_waiter.py | 4 +--- src/lifeblood/processingcontext.py | 1 - src/lifeblood/scheduler/data_access.py | 1 - .../stock_nodes/houdini_distributed_sim/data/server.py | 4 ++-- .../nodes/houdini_distributed_tracker_runner.py | 1 - 6 files changed, 3 insertions(+), 9 deletions(-) diff --git a/src/lifeblood/core_nodes/parent_children_waiter.py b/src/lifeblood/core_nodes/parent_children_waiter.py index b9e12eff..54dc9577 100644 --- a/src/lifeblood/core_nodes/parent_children_waiter.py +++ b/src/lifeblood/core_nodes/parent_children_waiter.py @@ -1,6 +1,5 @@ import dataclasses from dataclasses import dataclass -import json from lifeblood.attribute_serialization import deserialize_attributes_core from lifeblood.basenode import BaseNode, ProcessingError from lifeblood.nodethings import ProcessingResult diff --git a/src/lifeblood/core_nodes/split_waiter.py b/src/lifeblood/core_nodes/split_waiter.py index c397d507..f8f0620d 100644 --- a/src/lifeblood/core_nodes/split_waiter.py +++ b/src/lifeblood/core_nodes/split_waiter.py @@ -1,6 +1,4 @@ from dataclasses import dataclass -import time -import json from lifeblood.basenode import BaseNode from lifeblood.nodethings import ProcessingResult from lifeblood.taskspawn import TaskSpawn @@ -134,7 +132,7 @@ def process_task(self, context) -> ProcessingResult: #TODO: not finished, attrib if self.__cache[split_id].first_to_arrive is None and len(self.__cache[split_id].arrived) == 0: self.__cache[split_id].first_to_arrive = task_id if context.task_field('split_element') not in self.__cache[split_id].arrived: - self.__cache[split_id].arrived[context.task_field('split_element')] = json.loads(context.task_field('attributes')) + self.__cache[split_id].arrived[context.task_field('split_element')] = dict(context.task_attributes()) self.__cache[split_id].arrived[context.task_field('split_element')]['_builtin_id'] = task_id # we will not wait in loop or we risk deadlocking threadpool diff --git a/src/lifeblood/processingcontext.py b/src/lifeblood/processingcontext.py index d5f5dac3..c24b9016 100644 --- a/src/lifeblood/processingcontext.py +++ b/src/lifeblood/processingcontext.py @@ -1,4 +1,3 @@ -import json from types import MappingProxyType import re diff --git a/src/lifeblood/scheduler/data_access.py b/src/lifeblood/scheduler/data_access.py index 6ad6d471..32e79295 100644 --- a/src/lifeblood/scheduler/data_access.py +++ b/src/lifeblood/scheduler/data_access.py @@ -3,7 +3,6 @@ import sqlite3 import random import struct -import json from dataclasses import dataclass from ..attribute_serialization import serialize_attributes from ..db_misc import sql_init_script diff --git a/src/lifeblood/stock_nodes/houdini_distributed_sim/data/server.py b/src/lifeblood/stock_nodes/houdini_distributed_sim/data/server.py index 50c7b389..6e8592b2 100644 --- a/src/lifeblood/stock_nodes/houdini_distributed_sim/data/server.py +++ b/src/lifeblood/stock_nodes/houdini_distributed_sim/data/server.py @@ -4,10 +4,10 @@ import re import asyncio import lifeblood_connection -import json import shutil import tempfile + class ControlSignalProcessor: def __init__(self, my_addressee: str): self.__my_addressee = my_addressee @@ -88,7 +88,7 @@ async def main(port: int, webport: int, my_addressee: str, attr_file_path: str): print('simtracker started') with open(attr_file_path, 'r') as f: - attrs = json.load(f) + attrs = lifeblood_connection.deserialize_attributes_core(f.read()) attrs['simtracker_host'] = ip attrs['simtracker_port'] = port attrs['tracker_control_iid'] = lifeblood_connection.get_my_invocation_id() diff --git a/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_runner.py b/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_runner.py index 4ef84a82..5daf3b18 100644 --- a/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_runner.py +++ b/src/lifeblood/stock_nodes/houdini_distributed_sim/nodes/houdini_distributed_tracker_runner.py @@ -1,4 +1,3 @@ -import json import uuid from lifeblood.basenode import BaseNode from lifeblood.nodethings import ProcessingResult, InvocationJob From 2fbe2b5afcb8635b4ebf2152e0797f00df59e01c Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Sat, 13 Apr 2024 11:16:49 +0200 Subject: [PATCH 21/23] add attribute_serialization tests to ensure independent suplications behave the same --- tests/test_attribute_serialization.py | 48 +++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 tests/test_attribute_serialization.py diff --git a/tests/test_attribute_serialization.py b/tests/test_attribute_serialization.py new file mode 100644 index 00000000..3c29ede9 --- /dev/null +++ b/tests/test_attribute_serialization.py @@ -0,0 +1,48 @@ +from unittest import TestCase +import json +from lifeblood.attribute_serialization import serialize_attributes_core, deserialize_attributes_core +from lifeblood_client.attribute_serialization import serialize_attributes_core as serialize_attributes_core_client, deserialize_attributes_core as deserialize_attributes_core_client +from lifeblood.worker_runtime_pythonpath.lifeblood_connection import serialize_attributes_core as serialize_attributes_core_lbcon, deserialize_attributes_core as deserialize_attributes_core_lbcon + + +class TestAttributeSerialization(TestCase): + def test_basic(self): + exp_attrs = { + 'ass': 'bar', + 'bar': 123, + 'car': 4.56, + 'doc': [2, 5, 7], + 'ecc': (3, 6, 8), + 'foo': { + 'blob': {'q': 'wer', 'a': 'sdf'}, + 'alala': [1, 'q', ['e']] + }, + 'geo': {12, 23, 34}, + 'haa': { + 12: 'bebe', + "three": 3.3, + }, + 'iso': [(1, ' a ',), 'blb', {1: 1}, {6.6, 5.5}], + } + + self.assertDictEqual(exp_attrs, deserialize_attributes_core(serialize_attributes_core(exp_attrs))) + + self.assertDictEqual(exp_attrs, deserialize_attributes_core(serialize_attributes_core_client(exp_attrs))) + self.assertDictEqual(exp_attrs, deserialize_attributes_core_client(serialize_attributes_core(exp_attrs))) + + self.assertDictEqual(exp_attrs, deserialize_attributes_core(serialize_attributes_core_lbcon(exp_attrs))) + self.assertDictEqual(exp_attrs, deserialize_attributes_core_lbcon(serialize_attributes_core(exp_attrs))) + + def test_json_compatibility(self): + exp_attrs = { + 'ass': 'bar', + 'bar': 123, + 'car': 4.56, + 'doc': [2, 5, 7], + 'foo': { + 'blob': {'q': 'wer', 'a': 'sdf'}, + 'alala': [1, 'q', ['e']] + }, + } + + self.assertDictEqual(exp_attrs, deserialize_attributes_core(json.dumps(exp_attrs))) From b08b99f1d42da5043ce9425854e8971e1ac75d99 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Sat, 13 Apr 2024 13:30:46 +0200 Subject: [PATCH 22/23] use attribute_serialization instead of json, cleanup unused imports --- src/lifeblood/basenode.py | 4 ---- src/lifeblood/worker_invocation_protocol.py | 17 ++++------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/src/lifeblood/basenode.py b/src/lifeblood/basenode.py index a79a9bbd..0fede7f8 100644 --- a/src/lifeblood/basenode.py +++ b/src/lifeblood/basenode.py @@ -221,10 +221,6 @@ def apply_settings(self, settings: Dict[str, Dict[str, Any]]) -> None: self.logger().warning(f'applying settings: skipping parameter "{param_name}": bad value type: {str(e)}') continue - # # some helpers - # def _get_task_attributes(self, task_row): - # return json.loads(task_row.get('attributes', '{}')) - # # Plugin info # diff --git a/src/lifeblood/worker_invocation_protocol.py b/src/lifeblood/worker_invocation_protocol.py index c5e49761..91b6a229 100644 --- a/src/lifeblood/worker_invocation_protocol.py +++ b/src/lifeblood/worker_invocation_protocol.py @@ -1,22 +1,14 @@ import asyncio -import time -import aiofiles import struct -import json -from .enums import TaskScheduleStatus, TaskExecutionStatus, TaskExecutionStatus, WorkerPingReply, SpawnStatus -from .exceptions import NotEnoughResources, ProcessInitializationError, WorkerNotAvailable, AlreadyRunning, CouldNotNegotiateProtocolVersion, InvocationCancelled +from .attribute_serialization import deserialize_attributes +from .exceptions import CouldNotNegotiateProtocolVersion, InvocationCancelled from .scheduler_message_processor import SchedulerExtraControlClient, SchedulerInvocationMessageClient -from .net_messages.exceptions import MessageReceivingError -from .environment_resolver import ResolutionImpossibleError from .taskspawn import TaskSpawn from . import logging -from . import invocationjob -from . import nethelpers -import os -from typing import Dict, Optional, Set, Sequence, Tuple, TYPE_CHECKING +from typing import Dict, Set, Sequence, Tuple, TYPE_CHECKING if TYPE_CHECKING: from .worker import Worker @@ -81,8 +73,7 @@ async def comm_spawn(self, reader: asyncio.StreamReader, writer: asyncio.StreamW async def comm_update_attributes(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): task_id, update_data_size, strcount = struct.unpack('>QQQ', await reader.readexactly(24)) - attribs_to_update = await asyncio.get_event_loop().run_in_executor(None, json.loads, - (await reader.readexactly(update_data_size)).decode('UTF-8')) + attribs_to_update = await deserialize_attributes((await reader.readexactly(update_data_size)).decode('UTF-8')) attribs_to_delete = set() for _ in range(strcount): attribs_to_delete.add(await read_string(reader)) From 8f061efb397a1a700da27ef9d4257bf471dd8e61 Mon Sep 17 00:00:00 2001 From: pedohorse <13556996+pedohorse@users.noreply.github.com> Date: Mon, 15 Apr 2024 15:26:42 +0200 Subject: [PATCH 23/23] remove unused imports --- src/lifeblood/scheduler/task_processor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/lifeblood/scheduler/task_processor.py b/src/lifeblood/scheduler/task_processor.py index 28605a8b..56acc5c8 100644 --- a/src/lifeblood/scheduler/task_processor.py +++ b/src/lifeblood/scheduler/task_processor.py @@ -1,5 +1,3 @@ -import functools -import sys import traceback import json import itertools