From 3b38ecefe4c816360db13d9965aebe2657e4170d Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Fri, 7 Feb 2025 11:12:52 +0100 Subject: [PATCH 1/5] Add local store --- dkg/modules/asset/asset.py | 11 +++++++++++ dkg/modules/asset/async_asset.py | 11 +++++++++++ dkg/services/input_service.py | 4 ++++ dkg/services/node_services/async_node_service.py | 8 ++++++++ dkg/services/node_services/node_service.py | 8 ++++++++ dkg/utils/node_request.py | 11 +++++++++++ examples/async_demo.py | 2 +- examples/demo.py | 2 +- 8 files changed, 55 insertions(+), 2 deletions(-) diff --git a/dkg/modules/asset/asset.py b/dkg/modules/asset/asset.py index 90d5ccd..ad7ab0a 100644 --- a/dkg/modules/asset/asset.py +++ b/dkg/modules/asset/asset.py @@ -206,6 +206,7 @@ def create( minimum_number_of_node_replications = arguments.get( "minimum_number_of_node_replications" ) + local_store = arguments.get("local_store") blockchain_id = self.manager.blockchain_provider.blockchain_id dataset = {} @@ -448,6 +449,15 @@ def create( frequency, ) + local_store_result = {} + if local_store: + retry = 0 + while (not local_store_result.get("status")) and retry < 6: + local_store_result = self.node_service.local_store( + dataset_root, dataset, blockchain_id, ual + ) + retry += 1 + return json.loads( Web3.to_json( { @@ -471,6 +481,7 @@ def create( }, "numberOfConfirmations": finality_status_result, "requiredConfirmations": minimum_number_of_finalization_confirmations, + **({"localStore": local_store_result} if local_store else {}), }, } ) diff --git a/dkg/modules/asset/async_asset.py b/dkg/modules/asset/async_asset.py index 939b539..5680c18 100644 --- a/dkg/modules/asset/async_asset.py +++ b/dkg/modules/asset/async_asset.py @@ -237,6 +237,7 @@ async def create( minimum_number_of_node_replications = arguments.get( "minimum_number_of_node_replications" ) + local_store = arguments.get("local_store") blockchain_id = self.manager.blockchain_provider.blockchain_id dataset = {} @@ -462,6 +463,15 @@ async def create( frequency, ) + local_store_result = {} + if local_store: + retry = 0 + while (not local_store_result.get("status")) and retry < 6: + local_store_result = await self.node_service.local_store( + dataset_root, dataset, blockchain_id, ual + ) + retry += 1 + return json.loads( Web3.to_json( { @@ -483,6 +493,7 @@ async def create( }, "numberOfConfirmations": finality_status_result, "requiredConfirmations": minimum_number_of_finalization_confirmations, + **({"localStore": local_store_result} if local_store else {}), }, } ) diff --git a/dkg/services/input_service.py b/dkg/services/input_service.py index e8147ed..433d3a1 100644 --- a/dkg/services/input_service.py +++ b/dkg/services/input_service.py @@ -40,6 +40,7 @@ def get_asset_create_arguments(self, options): "minimum_number_of_node_replications": self.get_minimum_number_of_node_replications( options ), + "local_store": self.get_local_store(options), } def get_query_arguments(self, options): @@ -181,3 +182,6 @@ def get_repository(self, options): or self.config.get("repository") or DefaultParameters.REPOSITORY.value ) + + def get_local_store(self, options): + return options.get("local_store") or False diff --git a/dkg/services/node_services/async_node_service.py b/dkg/services/node_services/async_node_service.py index 1641988..71004db 100644 --- a/dkg/services/node_services/async_node_service.py +++ b/dkg/services/node_services/async_node_service.py @@ -21,6 +21,7 @@ def __init__(self, manager: AsyncRequestManager): _publish = Method(NodeRequest.publish) _get = Method(NodeRequest.get) _query = Method(NodeRequest.query) + _local_store = Method(NodeRequest.local_store) async def info(self) -> NodeResponseDict: return await self._info() @@ -182,3 +183,10 @@ async def query( paranet_ual, ): return await self._query(query, query_type, repository, paranet_ual) + + async def local_store(self, dataset_root, dataset, blockchain_id, ual): + try: + result = await self._local_store(dataset_root, dataset, blockchain_id, ual) + return result.get("data") + except Exception as e: + raise Exception(f"Unable to local store: {e}") diff --git a/dkg/services/node_services/node_service.py b/dkg/services/node_services/node_service.py index e447055..b99771f 100644 --- a/dkg/services/node_services/node_service.py +++ b/dkg/services/node_services/node_service.py @@ -22,6 +22,7 @@ def __init__(self, manager: DefaultRequestManager): _publish = Method(NodeRequest.publish) _get = Method(NodeRequest.get) _query = Method(NodeRequest.query) + _local_store = Method(NodeRequest.local_store) def get_operation_result( self, operation_id: str, operation: str, max_retries: int, frequency: int @@ -165,3 +166,10 @@ def query( paranet_ual, ): return self._query(query, query_type, repository, paranet_ual) + + def local_store(self, dataset_root, dataset, blockchain_id, ual): + try: + result = self._local_store(dataset_root, dataset, blockchain_id, ual) + return result.get("data") + except Exception as e: + raise Exception(f"Unable to local store: {e}") diff --git a/dkg/utils/node_request.py b/dkg/utils/node_request.py index 28f7972..dd8ec8c 100644 --- a/dkg/utils/node_request.py +++ b/dkg/utils/node_request.py @@ -103,6 +103,17 @@ class NodeRequest: }, ) + local_store = NodeCall( + method=HTTPRequestMethod.POST, + path="local-store", + data={ + "datasetRoot": str, + "dataset": dict[str, list[str]], + "blockchain": str, + "ual": UAL, + }, + ) + class LocalStoreOperationStatus(AutoStrEnumUpperCase): LOCAL_STORE_INIT_START = auto() diff --git a/examples/async_demo.py b/examples/async_demo.py index dffa96f..5fc5d7b 100644 --- a/examples/async_demo.py +++ b/examples/async_demo.py @@ -72,7 +72,7 @@ def print_json(json_dict: dict): "epochs_num": 2, "minimum_number_of_finalization_confirmations": 3, "minimum_number_of_node_replications": 1, - "token_amount": 100, + "local_store": True, }, ) print( diff --git a/examples/demo.py b/examples/demo.py index 00060d8..9f716c7 100644 --- a/examples/demo.py +++ b/examples/demo.py @@ -90,7 +90,7 @@ def print_json(json_dict: dict): "epochs_num": 2, "minimum_number_of_finalization_confirmations": 3, "minimum_number_of_node_replications": 1, - "token_amount": 100, + "local_store": True, }, ) print( From d556d9bf02fdbba210b7fcee9a690e1f81ad99b6 Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Fri, 7 Feb 2025 11:35:32 +0100 Subject: [PATCH 2/5] Update local store param name --- dkg/utils/node_request.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dkg/utils/node_request.py b/dkg/utils/node_request.py index dd8ec8c..ea67311 100644 --- a/dkg/utils/node_request.py +++ b/dkg/utils/node_request.py @@ -110,7 +110,7 @@ class NodeRequest: "datasetRoot": str, "dataset": dict[str, list[str]], "blockchain": str, - "ual": UAL, + "UAL": UAL, }, ) From 7a3ac11f2e822545b53af70b1e3e4b457143d01c Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Fri, 7 Feb 2025 11:56:54 +0100 Subject: [PATCH 3/5] Fix local_store_result --- dkg/modules/asset/asset.py | 6 +++--- dkg/modules/asset/async_asset.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dkg/modules/asset/asset.py b/dkg/modules/asset/asset.py index ad7ab0a..6b7dc3b 100644 --- a/dkg/modules/asset/asset.py +++ b/dkg/modules/asset/asset.py @@ -449,10 +449,10 @@ def create( frequency, ) - local_store_result = {} + local_store_result = None if local_store: retry = 0 - while (not local_store_result.get("status")) and retry < 6: + while (not (local_store_result or {}).get("status")) and retry < 6: local_store_result = self.node_service.local_store( dataset_root, dataset, blockchain_id, ual ) @@ -487,7 +487,7 @@ def create( ) ) - _submit_knowledge_asset = Method(BlockchainRequest.submit_knowledge_asset) + _submit_knowledge_asset = Method(BlockchainRequest.submit_knowledge_collection) def submit_to_paranet( self, ual: UAL, paranet_ual: UAL diff --git a/dkg/modules/asset/async_asset.py b/dkg/modules/asset/async_asset.py index 5680c18..79c15aa 100644 --- a/dkg/modules/asset/async_asset.py +++ b/dkg/modules/asset/async_asset.py @@ -463,10 +463,10 @@ async def create( frequency, ) - local_store_result = {} + local_store_result = None if local_store: retry = 0 - while (not local_store_result.get("status")) and retry < 6: + while (not (local_store_result or {}).get("status")) and retry < 6: local_store_result = await self.node_service.local_store( dataset_root, dataset, blockchain_id, ual ) @@ -499,7 +499,7 @@ async def create( ) ) - _submit_knowledge_asset = Method(BlockchainRequest.submit_knowledge_asset) + _submit_knowledge_asset = Method(BlockchainRequest.submit_knowledge_collection) def submit_to_paranet( self, ual: UAL, paranet_ual: UAL From b63c2336b053703e894e57ab699641bc8412d89c Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Fri, 7 Feb 2025 12:01:51 +0100 Subject: [PATCH 4/5] change req name --- dkg/modules/asset/async_asset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dkg/modules/asset/async_asset.py b/dkg/modules/asset/async_asset.py index 79c15aa..d91c803 100644 --- a/dkg/modules/asset/async_asset.py +++ b/dkg/modules/asset/async_asset.py @@ -499,7 +499,7 @@ async def create( ) ) - _submit_knowledge_asset = Method(BlockchainRequest.submit_knowledge_collection) + _submit_knowledge_asset = Method(BlockchainRequest.submit_knowledge_asset) def submit_to_paranet( self, ual: UAL, paranet_ual: UAL From 6f770028873cb7d01b8b2a15ca055273d78580d3 Mon Sep 17 00:00:00 2001 From: Zvonimir Date: Fri, 7 Feb 2025 12:02:45 +0100 Subject: [PATCH 5/5] change req name --- dkg/modules/asset/asset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dkg/modules/asset/asset.py b/dkg/modules/asset/asset.py index 6b7dc3b..ec5c892 100644 --- a/dkg/modules/asset/asset.py +++ b/dkg/modules/asset/asset.py @@ -487,7 +487,7 @@ def create( ) ) - _submit_knowledge_asset = Method(BlockchainRequest.submit_knowledge_collection) + _submit_knowledge_asset = Method(BlockchainRequest.submit_knowledge_asset) def submit_to_paranet( self, ual: UAL, paranet_ual: UAL