Skip to content

Commit

Permalink
Merge pull request #431 from opencybersecurityalliance/kestrel2-inter…
Browse files Browse the repository at this point in the history
…face-init

initialize interface and cache
  • Loading branch information
subbyte authored Nov 30, 2023
2 parents 379cd68 + 9ab79c4 commit e26dce6
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
from kestrel.cache.data import Cache
43 changes: 43 additions & 0 deletions packages-nextgen/kestrel_core/src/kestrel/cache/abc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from abc import ABC, abstractmethod
from pandas import DataFrame
from uuid import UUID
from typing import Union


class AbstractCache(ABC):
"""Abstract cache class for typing purpose
This class is for internal typing use to avoid circular import.
Use Cache from kestrel.cache.base to subclass concrete cache class.
"""

def __contains__(self, instruction_id: UUID) -> bool:
"""Whether an instruction result is in the cache
Parameters:
instruction_id: id of the instruction
"""
...

@abstractmethod
def __getitem__(self, instruction_id: UUID) -> DataFrame:
"""Get the dataframe for the cached instruction
Parameters:
instruction_id: id of the instruction
Returns:
dataframe of the given (likely Variable) instruction
"""
...

@abstractmethod
def __delitem__(self, instruction_id: UUID):
"""Delete cached item
Parameters:
instruction_id: id of the instruction
"""
...
8 changes: 8 additions & 0 deletions packages-nextgen/kestrel_core/src/kestrel/cache/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from kestrel.cache.abc import AbstractCache
from kestrel.interface.datasource import AbstractDataSourceInterface


class Cache(AbstractDataSourceInterface, AbstractCache):
"""Every concrete Kestrel cache class should be a subclass of this."""

...
8 changes: 0 additions & 8 deletions packages-nextgen/kestrel_core/src/kestrel/cache/data.py

This file was deleted.

41 changes: 41 additions & 0 deletions packages-nextgen/kestrel_core/src/kestrel/cache/inmemory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from pandas import DataFrame
from typeguard import typechecked
from uuid import UUID
from typing import (
Mapping,
Union,
)

from kestrel.cache.base import Cache
from kestrel.ir.graph import IRGraphSoleInterface


@typechecked
class InMemoryCache(Cache):
def __init__(self, initial_cache: Union[None, Mapping[UUID, DataFrame]] = None):
super().__init__()
self.cache: Mapping[UUID, DataFrame] = {}
if initial_cache:
for k, v in initial_cache.items():
self.store(k, v)

def __getitem__(self, instruction_id: UUID) -> DataFrame:
return self.cache[self.cache_catalog[instruction_id]]

def __delitem__(self, instruction_id: UUID):
del self.cache[instruction_id]
del self.cache_catalog[instruction_id]

def store(
self,
instruction_id: UUID,
data: DataFrame,
session_id: Union[None, UUID] = None,
):
self.cache[instruction_id] = data
self.cache_catalog[instruction_id] = instruction_id

def evaluate_graph(
self, g: IRGraphSoleInterface, all_variables_in_return: bool = False
) -> Mapping[UUID, DataFrame]:
...
Empty file.
6 changes: 6 additions & 0 deletions packages-nextgen/kestrel_core/src/kestrel/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,9 @@ class DuplicatedSourceInstruction(KestrelError):

class MultiInterfacesInGraph(KestrelError):
pass

class UnresolvedReference(KestrelError):
pass

class InvalidSerializedDatasourceInterfaceCacheCatalog(KestrelError):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from kestrel.interface.datasource.abc import AbstractDataSourceInterface
107 changes: 107 additions & 0 deletions packages-nextgen/kestrel_core/src/kestrel/interface/datasource/abc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import json
from abc import ABC, abstractmethod
from pandas import DataFrame
from uuid import UUID
from typing import (
Mapping,
Union,
)

from kestrel.ir.instructions import Reference
from kestrel.ir.graph import IRGraphSoleInterface
from kestrel.exceptions import (
UnresolvedReference,
InvalidSerializedDatasourceInterfaceCacheCatalog,
)


class AbstractDataSourceInterface(ABC):
"""Abstract class for datasource interface
Concepts:
- Think an interface as a datalake
- Think a datasource as a table in the datalake
Attributes:
datasources: map a datasource name to datalake table name
cache_catalog: map a cached item (instruction.id) to datalake table/view name
"""

def __init__(self, serialized_cache_catalog: Union[None, str] = None):
self.datasources: Mapping[str, str] = {}
self.cache_catalog: Mapping[UUID, str] = {}

if serialized_cache_catalog:
try:
self.cache_catalog = json.loads(serialized_cache_catalog)
except:
raise InvalidSerializedDatasourceInterfaceCacheCatalog()

def __contains__(self, instruction_id: UUID) -> bool:
"""Whether a datasource is in the interface
Parameters:
instruction_id: id of the instruction
"""
return instruction_id in self.cache_catalog

@abstractmethod
def store(
self,
instruction_id: UUID,
data: DataFrame,
session_id: Union[None, UUID] = None,
):
"""Create a new table in the datalake from a dataframe
The name of the table is a function of instruction_id (and session_id)
in case there are conflicting tables in the datalake.
The function can be implemented as a hashtable. If the hash collides
with an existing hash, figure out whether the existing hash/table is
used by the current interface and session. If yes, then replace; if
not, then generate a new random value and record in self.cache_catalog.
This method will update self.cache_catalog.
Parameters:
instruction_id: the key to be placed in `self.cache_catalog`
data: the dataframe to store
session_id: the optional information to derive table name in datalake
"""
...

@abstractmethod
def evaluate_graph(
self, g: IRGraphSoleInterface, all_variables_in_return: bool = False
) -> Mapping[UUID, DataFrame]:
"""Evaluate the IRGraph
Parameters:
g: The IRGraph with zero or one interface
all_variables_in_return: include evaluation results on all variables in return
Returns:
By default, return the dataframes for each sink node in the graph.
If all_variables_in_return == True, also include dataframes for
each variable node in the return.
"""
# requirement: g should not have any Reference node
refs = self.get_nodes_by_type(Reference)
if refs:
raise UnresolvedReference(refs)

def cache_catalog_to_json(self) -> str:
"""Serialize the cache catalog to a JSON string"""
return json.dumps(self.cache_catalog)
6 changes: 3 additions & 3 deletions packages-nextgen/kestrel_core/src/kestrel/ir/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
Return,
instruction_from_dict,
)
from kestrel.cache import Cache
from kestrel.cache.abc import AbstractCache
from kestrel.exceptions import (
InstructionNotFound,
InvalidSeralizedGraph,
Expand Down Expand Up @@ -380,7 +380,7 @@ def duplicate_dependent_subgraph_of_node(self, node: Instruction) -> IRGraph:
return self.subgraph(nodes).copy()

def find_cached_dependent_subgraph_of_node(
self, node: Instruction, cache: Cache
self, node: Instruction, cache: AbstractCache
) -> IRGraph:
"""Return the cached dependent graph of the a node
Expand All @@ -397,7 +397,7 @@ def find_cached_dependent_subgraph_of_node(
return g.duplicate_dependent_subgraph_of_node(node)

def find_simple_dependent_subgraphs_of_node(
self, node: Return, cache: Cache
self, node: Return, cache: AbstractCache
) -> Iterable[IRGraphSoleInterface]:
"""Segment dependent graph of a node and return subgraphs that do not have further dependency
Expand Down
26 changes: 26 additions & 0 deletions packages-nextgen/kestrel_core/tests/test_cache_inmemory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest
from pandas import DataFrame
from uuid import uuid4

from kestrel.cache.inmemory import InMemoryCache


def test_inmemory_cache_set_get_del():
c = InMemoryCache()
idx = uuid4()
df = DataFrame([1, 2, 3])
c.store(idx, df)
assert df.equals(c[idx])
del c[idx]
assert idx not in c


def test_inmemory_cache_constructor():
ids = [uuid4() for i in range(5)]
df = DataFrame([1, 2, 3])
c = InMemoryCache({x:df for x in ids})
for u in ids:
assert df.equals(c[u])
for u in ids:
del c[u]
assert u not in c
7 changes: 4 additions & 3 deletions packages-nextgen/kestrel_core/tests/test_ir_graph.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
import networkx.utils
from pandas import DataFrame

from kestrel.ir.instructions import (
Variable,
Expand All @@ -9,7 +10,7 @@
TransformingInstruction,
)
from kestrel.ir.graph import IRGraph
from kestrel.cache import Cache
from kestrel.cache.inmemory import InMemoryCache


def test_add_source():
Expand Down Expand Up @@ -146,10 +147,10 @@ def test_find_cached_dependent_subgraph_of_node():
g.add_edge(b4, c1)
c2 = g.add_node(Variable("zxcv"), c1)

g2 = g.find_cached_dependent_subgraph_of_node(c2, Cache())
g2 = g.find_cached_dependent_subgraph_of_node(c2, InMemoryCache())
assert networkx.utils.graphs_equal(g, g2)

g3 = g.find_cached_dependent_subgraph_of_node(c2, Cache({a2.id: object(), b2.id: object()}))
g3 = g.find_cached_dependent_subgraph_of_node(c2, InMemoryCache({a2.id: DataFrame(), b2.id: DataFrame()}))
g.remove_node(a1)
g.remove_node(b1)
assert networkx.utils.graphs_equal(g, g3)

0 comments on commit e26dce6

Please sign in to comment.