From aa25d5fee3edbd0e05372340eeece0e51d696c74 Mon Sep 17 00:00:00 2001 From: Xiaokui Shu Date: Fri, 24 Nov 2023 10:28:48 -0500 Subject: [PATCH 1/5] init datasource interface --- .../kestrel_core/src/kestrel/exceptions.py | 3 + .../kestrel/interface/datasource/__init__.py | 1 + .../src/kestrel/interface/datasource/abc.py | 95 +++++++++++++++++++ 3 files changed, 99 insertions(+) create mode 100644 packages-nextgen/kestrel_core/src/kestrel/interface/datasource/abc.py diff --git a/packages-nextgen/kestrel_core/src/kestrel/exceptions.py b/packages-nextgen/kestrel_core/src/kestrel/exceptions.py index af55b363..3a6b1684 100644 --- a/packages-nextgen/kestrel_core/src/kestrel/exceptions.py +++ b/packages-nextgen/kestrel_core/src/kestrel/exceptions.py @@ -40,3 +40,6 @@ class DuplicatedSourceInstruction(KestrelError): class MultiInterfacesInGraph(KestrelError): pass + +class InvalidSerializedDatasourceInterfaceCacheCatalog(KestrelError): + pass diff --git a/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/__init__.py b/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/__init__.py index e69de29b..e84d9d0b 100644 --- a/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/__init__.py +++ b/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/__init__.py @@ -0,0 +1 @@ +from kestrel.interface.datasource.abc import AbstractDataSourceInterface diff --git a/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/abc.py b/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/abc.py new file mode 100644 index 00000000..80bab257 --- /dev/null +++ b/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/abc.py @@ -0,0 +1,95 @@ +import json +from abc import ABC, abstractmethod +from pandas import DataFrame +from uuid import UUID +from typing import ( + Mapping, + Union, +) + +from kestrel.ir.instructions import Reference +from kestrel.ir.graph import IRGraphSoleInterface +from kestrel.exceptions import ( + UnresolvedReference, + InvalidSerializedDatasourceInterfaceCacheCatalog, +) +from kestrel.config import InterfaceConfig + + +class AbstractDataSourceInterface(ABC): + """Abstract class for datasource interface + + Concepts: + + - Think an interface as a datalake + - Think a datasource as a table in the datalake + + Attributes: + + datasources: map a datasource name to datalake table name + + cache: map a cached item (instruction.id) to datalake table name + """ + + def __init__( + self, config: InterfaceConfig, serialized_cache_catalog: Union[None, str] = None + ): + self.datasources: Mapping[str, str] = {} + self.cache: Mapping[UUID, str] = {} + self.__init_from_config(config) + + if serialized_interface_catalog: + try: + self.cache = json.loads(serialized_cache_catalog) + except: + raise InvalidSerializedDatasourceInterfaceCacheCatalog() + + def __init_from_config(self, config: InterfaceConfig): + # TODO: fill self.datasources from config + # TODO: create attributes like self.connection from config + ... + + def __contains__(self, instruction_id: UUID) -> bool: + """Whether a datasource is in the interface + + Parameters: + instruction_id: id of the instruction + """ + return instruction_id in self.cache + + @abstractmethod + def create( + self, datasource: str, df: DataFrame, session_id: Union[None, UUID] = None + ): + """Create datasource (table) given a dataframe + + In the implementation, recommend use `session_id` in the table + name/path to isolate intermediate results of one session from another. + + Need to update self.cache at the end. + """ + ... + + @abstractmethod + def evaluate_graph( + self, g: IRGraphSoleInterface, all_variables_in_return: bool = False + ) -> Mapping[UUID, DataFrame]: + """Evaluate the IRGraph + + Parameters: + g: The IRGraph with zero or one interface + all_variables_in_return: include evaluation results on all variables in return + + Returns: + By default, return the dataframes for each sink node in the graph. + If all_variables_in_return == True, also include dataframes for + each variable node in the return. + """ + # requirement: g should not have any Reference node + refs = self.get_nodes_by_type(Reference) + if refs: + raise UnresolvedReference(refs) + + def cache_catalog_to_json(self) -> str: + """Serialize the cache catalog to a JSON string""" + return json.dumps(self.cache) From df58cfce2f61c0ee062e01273d98610f7e50edab Mon Sep 17 00:00:00 2001 From: Xiaokui Shu Date: Fri, 24 Nov 2023 10:35:41 -0500 Subject: [PATCH 2/5] minor structure edit --- packages-nextgen/kestrel_core/src/kestrel/cache/parquet.py | 0 packages-nextgen/kestrel_core/src/kestrel/cache/sqlite.py | 0 .../kestrel_core/src/kestrel/interface/datasource/abc.py | 4 ++-- 3 files changed, 2 insertions(+), 2 deletions(-) create mode 100644 packages-nextgen/kestrel_core/src/kestrel/cache/parquet.py create mode 100644 packages-nextgen/kestrel_core/src/kestrel/cache/sqlite.py diff --git a/packages-nextgen/kestrel_core/src/kestrel/cache/parquet.py b/packages-nextgen/kestrel_core/src/kestrel/cache/parquet.py new file mode 100644 index 00000000..e69de29b diff --git a/packages-nextgen/kestrel_core/src/kestrel/cache/sqlite.py b/packages-nextgen/kestrel_core/src/kestrel/cache/sqlite.py new file mode 100644 index 00000000..e69de29b diff --git a/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/abc.py b/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/abc.py index 80bab257..e34f5351 100644 --- a/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/abc.py +++ b/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/abc.py @@ -28,7 +28,7 @@ class AbstractDataSourceInterface(ABC): datasources: map a datasource name to datalake table name - cache: map a cached item (instruction.id) to datalake table name + cache: map a cached item (instruction.id) to datalake table/view name """ def __init__( @@ -61,7 +61,7 @@ def __contains__(self, instruction_id: UUID) -> bool: def create( self, datasource: str, df: DataFrame, session_id: Union[None, UUID] = None ): - """Create datasource (table) given a dataframe + """Create cached entries (table) from a dataframe In the implementation, recommend use `session_id` in the table name/path to isolate intermediate results of one session from another. From f96e2622186861bdf87c6f43ec01ec3e706248da Mon Sep 17 00:00:00 2001 From: Xiaokui Shu Date: Wed, 29 Nov 2023 17:44:11 -0500 Subject: [PATCH 3/5] implement basic kestrel.cache.inmemory --- .../src/kestrel/cache/__init__.py | 1 - .../kestrel_core/src/kestrel/cache/abc.py | 43 ++++++++++++++ .../kestrel_core/src/kestrel/cache/base.py | 8 +++ .../kestrel_core/src/kestrel/cache/data.py | 8 --- .../src/kestrel/cache/inmemory.py | 39 +++++++++++++ .../kestrel_core/src/kestrel/cache/parquet.py | 0 .../kestrel_core/src/kestrel/exceptions.py | 3 + .../src/kestrel/interface/datasource/abc.py | 56 +++++++++++-------- .../kestrel_core/src/kestrel/ir/graph.py | 6 +- .../kestrel_core/tests/test_ir_graph.py | 7 ++- 10 files changed, 134 insertions(+), 37 deletions(-) create mode 100644 packages-nextgen/kestrel_core/src/kestrel/cache/abc.py create mode 100644 packages-nextgen/kestrel_core/src/kestrel/cache/base.py delete mode 100644 packages-nextgen/kestrel_core/src/kestrel/cache/data.py create mode 100644 packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py delete mode 100644 packages-nextgen/kestrel_core/src/kestrel/cache/parquet.py diff --git a/packages-nextgen/kestrel_core/src/kestrel/cache/__init__.py b/packages-nextgen/kestrel_core/src/kestrel/cache/__init__.py index 0294be40..e69de29b 100644 --- a/packages-nextgen/kestrel_core/src/kestrel/cache/__init__.py +++ b/packages-nextgen/kestrel_core/src/kestrel/cache/__init__.py @@ -1 +0,0 @@ -from kestrel.cache.data import Cache diff --git a/packages-nextgen/kestrel_core/src/kestrel/cache/abc.py b/packages-nextgen/kestrel_core/src/kestrel/cache/abc.py new file mode 100644 index 00000000..523544a6 --- /dev/null +++ b/packages-nextgen/kestrel_core/src/kestrel/cache/abc.py @@ -0,0 +1,43 @@ +from abc import ABC, abstractmethod +from pandas import DataFrame +from uuid import UUID +from typing import Union + + +class AbstractCache(ABC): + """Abstract cache class for typing purpose + + This class is for internal typing use to avoid circular import. + + Use Cache from kestrel.cache.base to subclass concrete cache class. + """ + + def __contains__(self, instruction_id: UUID) -> bool: + """Whether an instruction result is in the cache + + Parameters: + + instruction_id: id of the instruction + """ + ... + + @abstractmethod + def __getitem__(self, instruction_id: UUID) -> DataFrame: + """Get the dataframe for the cached instruction + + Parameters: + instruction_id: id of the instruction + + Returns: + dataframe of the given (likely Variable) instruction + """ + ... + + @abstractmethod + def __delitem__(self, instruction_id: UUID): + """Delete cached item + + Parameters: + instruction_id: id of the instruction + """ + ... diff --git a/packages-nextgen/kestrel_core/src/kestrel/cache/base.py b/packages-nextgen/kestrel_core/src/kestrel/cache/base.py new file mode 100644 index 00000000..20df7d4b --- /dev/null +++ b/packages-nextgen/kestrel_core/src/kestrel/cache/base.py @@ -0,0 +1,8 @@ +from kestrel.cache.abc import AbstractCache +from kestrel.interface.datasource import AbstractDataSourceInterface + + +class Cache(AbstractDataSourceInterface, AbstractCache): + """Every concrete Kestrel cache class should be a subclass of this.""" + + ... diff --git a/packages-nextgen/kestrel_core/src/kestrel/cache/data.py b/packages-nextgen/kestrel_core/src/kestrel/cache/data.py deleted file mode 100644 index 2a2e2ffe..00000000 --- a/packages-nextgen/kestrel_core/src/kestrel/cache/data.py +++ /dev/null @@ -1,8 +0,0 @@ -class Cache(dict): - # node id to cache object mapping - - def dump(self): - ... - - def load(self): - ... diff --git a/packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py b/packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py new file mode 100644 index 00000000..821a8244 --- /dev/null +++ b/packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py @@ -0,0 +1,39 @@ +from pandas import DataFrame +from uuid import UUID +from typing import ( + Mapping, + Union, +) + +from kestrel.cache.base import Cache +from kestrel.ir.graph import IRGraphSoleInterface + + +class InMemoryCache(Cache): + def __init__(self, initial_cache: Union[None, Mapping[UUID, DataFrame]] = None): + super().__init__() + self.cache: Mapping[UUID, DataFrame] = {} + if initial_cache: + for k, v in initial_cache.items(): + self.store(k, v) + + def __getitem__(self, instruction_id: UUID) -> DataFrame: + return self.cache[self.cache_catalog[instruction_id]] + + def __delitem__(self, instruction_id: UUID): + del self.cache[instruction_id] + del self.cache_catelog[instruction_id] + + def store( + self, + instruction_id: UUID, + data: DataFrame, + session_id: Union[None, UUID] = None, + ): + self.cache[instruction_id] = data + self.cache_catalog[instruction_id] = instruction_id + + def evaluate_graph( + self, g: IRGraphSoleInterface, all_variables_in_return: bool = False + ) -> Mapping[UUID, DataFrame]: + ... diff --git a/packages-nextgen/kestrel_core/src/kestrel/cache/parquet.py b/packages-nextgen/kestrel_core/src/kestrel/cache/parquet.py deleted file mode 100644 index e69de29b..00000000 diff --git a/packages-nextgen/kestrel_core/src/kestrel/exceptions.py b/packages-nextgen/kestrel_core/src/kestrel/exceptions.py index 3a6b1684..481e1328 100644 --- a/packages-nextgen/kestrel_core/src/kestrel/exceptions.py +++ b/packages-nextgen/kestrel_core/src/kestrel/exceptions.py @@ -41,5 +41,8 @@ class DuplicatedSourceInstruction(KestrelError): class MultiInterfacesInGraph(KestrelError): pass +class UnresolvedReference(KestrelError): + pass + class InvalidSerializedDatasourceInterfaceCacheCatalog(KestrelError): pass diff --git a/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/abc.py b/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/abc.py index e34f5351..e02fd2ea 100644 --- a/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/abc.py +++ b/packages-nextgen/kestrel_core/src/kestrel/interface/datasource/abc.py @@ -13,7 +13,6 @@ UnresolvedReference, InvalidSerializedDatasourceInterfaceCacheCatalog, ) -from kestrel.config import InterfaceConfig class AbstractDataSourceInterface(ABC): @@ -22,51 +21,61 @@ class AbstractDataSourceInterface(ABC): Concepts: - Think an interface as a datalake + - Think a datasource as a table in the datalake Attributes: datasources: map a datasource name to datalake table name - cache: map a cached item (instruction.id) to datalake table/view name + cache_catalog: map a cached item (instruction.id) to datalake table/view name """ - def __init__( - self, config: InterfaceConfig, serialized_cache_catalog: Union[None, str] = None - ): + def __init__(self, serialized_cache_catalog: Union[None, str] = None): self.datasources: Mapping[str, str] = {} - self.cache: Mapping[UUID, str] = {} - self.__init_from_config(config) + self.cache_catalog: Mapping[UUID, str] = {} - if serialized_interface_catalog: + if serialized_cache_catalog: try: - self.cache = json.loads(serialized_cache_catalog) + self.cache_catalog = json.loads(serialized_cache_catalog) except: raise InvalidSerializedDatasourceInterfaceCacheCatalog() - def __init_from_config(self, config: InterfaceConfig): - # TODO: fill self.datasources from config - # TODO: create attributes like self.connection from config - ... - def __contains__(self, instruction_id: UUID) -> bool: """Whether a datasource is in the interface Parameters: + instruction_id: id of the instruction """ - return instruction_id in self.cache + return instruction_id in self.cache_catalog @abstractmethod - def create( - self, datasource: str, df: DataFrame, session_id: Union[None, UUID] = None + def store( + self, + instruction_id: UUID, + data: DataFrame, + session_id: Union[None, UUID] = None, ): - """Create cached entries (table) from a dataframe + """Create a new table in the datalake from a dataframe + + The name of the table is a function of instruction_id (and session_id) + in case there are conflicting tables in the datalake. + + The function can be implemented as a hashtable. If the hash collides + with an existing hash, figure out whether the existing hash/table is + used by the current interface and session. If yes, then replace; if + not, then generate a new random value and record in self.cache_catalog. - In the implementation, recommend use `session_id` in the table - name/path to isolate intermediate results of one session from another. + This method will update self.cache_catalog. - Need to update self.cache at the end. + Parameters: + + instruction_id: the key to be placed in `self.cache_catalog` + + data: the dataframe to store + + session_id: the optional information to derive table name in datalake """ ... @@ -77,10 +86,13 @@ def evaluate_graph( """Evaluate the IRGraph Parameters: + g: The IRGraph with zero or one interface + all_variables_in_return: include evaluation results on all variables in return Returns: + By default, return the dataframes for each sink node in the graph. If all_variables_in_return == True, also include dataframes for each variable node in the return. @@ -92,4 +104,4 @@ def evaluate_graph( def cache_catalog_to_json(self) -> str: """Serialize the cache catalog to a JSON string""" - return json.dumps(self.cache) + return json.dumps(self.cache_catalog) diff --git a/packages-nextgen/kestrel_core/src/kestrel/ir/graph.py b/packages-nextgen/kestrel_core/src/kestrel/ir/graph.py index 223d9548..d555c2f6 100644 --- a/packages-nextgen/kestrel_core/src/kestrel/ir/graph.py +++ b/packages-nextgen/kestrel_core/src/kestrel/ir/graph.py @@ -23,7 +23,7 @@ Return, instruction_from_dict, ) -from kestrel.cache import Cache +from kestrel.cache.abc import AbstractCache from kestrel.exceptions import ( InstructionNotFound, InvalidSeralizedGraph, @@ -380,7 +380,7 @@ def duplicate_dependent_subgraph_of_node(self, node: Instruction) -> IRGraph: return self.subgraph(nodes).copy() def find_cached_dependent_subgraph_of_node( - self, node: Instruction, cache: Cache + self, node: Instruction, cache: AbstractCache ) -> IRGraph: """Return the cached dependent graph of the a node @@ -397,7 +397,7 @@ def find_cached_dependent_subgraph_of_node( return g.duplicate_dependent_subgraph_of_node(node) def find_simple_dependent_subgraphs_of_node( - self, node: Return, cache: Cache + self, node: Return, cache: AbstractCache ) -> Iterable[IRGraphSoleInterface]: """Segment dependent graph of a node and return subgraphs that do not have further dependency diff --git a/packages-nextgen/kestrel_core/tests/test_ir_graph.py b/packages-nextgen/kestrel_core/tests/test_ir_graph.py index 294c87ec..bed6e8d8 100644 --- a/packages-nextgen/kestrel_core/tests/test_ir_graph.py +++ b/packages-nextgen/kestrel_core/tests/test_ir_graph.py @@ -1,5 +1,6 @@ import pytest import networkx.utils +from pandas import DataFrame from kestrel.ir.instructions import ( Variable, @@ -9,7 +10,7 @@ TransformingInstruction, ) from kestrel.ir.graph import IRGraph -from kestrel.cache import Cache +from kestrel.cache.inmemory import InMemoryCache def test_add_source(): @@ -146,10 +147,10 @@ def test_find_cached_dependent_subgraph_of_node(): g.add_edge(b4, c1) c2 = g.add_node(Variable("zxcv"), c1) - g2 = g.find_cached_dependent_subgraph_of_node(c2, Cache()) + g2 = g.find_cached_dependent_subgraph_of_node(c2, InMemoryCache()) assert networkx.utils.graphs_equal(g, g2) - g3 = g.find_cached_dependent_subgraph_of_node(c2, Cache({a2.id: object(), b2.id: object()})) + g3 = g.find_cached_dependent_subgraph_of_node(c2, InMemoryCache({a2.id: DataFrame(), b2.id: DataFrame()})) g.remove_node(a1) g.remove_node(b1) assert networkx.utils.graphs_equal(g, g3) From fd1671e089913a1df521c4dc3c751cbedf3f8957 Mon Sep 17 00:00:00 2001 From: Xiaokui Shu Date: Wed, 29 Nov 2023 21:27:14 -0500 Subject: [PATCH 4/5] add tests for InMemoryCache --- .../src/kestrel/cache/inmemory.py | 2 +- .../kestrel_core/tests/test_cache_inmemory.py | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 packages-nextgen/kestrel_core/tests/test_cache_inmemory.py diff --git a/packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py b/packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py index 821a8244..ded3c729 100644 --- a/packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py +++ b/packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py @@ -22,7 +22,7 @@ def __getitem__(self, instruction_id: UUID) -> DataFrame: def __delitem__(self, instruction_id: UUID): del self.cache[instruction_id] - del self.cache_catelog[instruction_id] + del self.cache_catalog[instruction_id] def store( self, diff --git a/packages-nextgen/kestrel_core/tests/test_cache_inmemory.py b/packages-nextgen/kestrel_core/tests/test_cache_inmemory.py new file mode 100644 index 00000000..0b8d249a --- /dev/null +++ b/packages-nextgen/kestrel_core/tests/test_cache_inmemory.py @@ -0,0 +1,26 @@ +import pytest +from pandas import DataFrame +from uuid import uuid4 + +from kestrel.cache.inmemory import InMemoryCache + + +def test_inmemory_cache_set_get_del(): + c = InMemoryCache() + idx = uuid4() + df = DataFrame([1, 2, 3]) + c.store(idx, df) + assert df.equals(c[idx]) + del c[idx] + assert idx not in c + + +def test_inmemory_cache_constructor(): + ids = [uuid4() for i in range(5)] + df = DataFrame([1, 2, 3]) + c = InMemoryCache({x:df for x in ids}) + for u in ids: + assert df.equals(c[u]) + for u in ids: + del c[u] + assert u not in c From 9ab79c492a508334434c791bb5e1d8f3af979656 Mon Sep 17 00:00:00 2001 From: Xiaokui Shu Date: Wed, 29 Nov 2023 21:53:06 -0500 Subject: [PATCH 5/5] add typeguard for InMemoryCache --- packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py b/packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py index ded3c729..c12a38e9 100644 --- a/packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py +++ b/packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py @@ -1,4 +1,5 @@ from pandas import DataFrame +from typeguard import typechecked from uuid import UUID from typing import ( Mapping, @@ -9,6 +10,7 @@ from kestrel.ir.graph import IRGraphSoleInterface +@typechecked class InMemoryCache(Cache): def __init__(self, initial_cache: Union[None, Mapping[UUID, DataFrame]] = None): super().__init__()