Skip to content

Commit

Permalink
Closes Bears-R-Us#2541 - DataFrame HDF5 Snapshot Format (Bears-R-Us#2599
Browse files Browse the repository at this point in the history
)

* DataFrame storage implementation for snapshotting.

* clean-up

* Formatting updates.

* Updating function signatures to allow DataFrame return for mypy

* Removing random =

* Addressing review feedback.

---------

Co-authored-by: pierce <48131946+pierce314159@users.noreply.github.com>
  • Loading branch information
Ethan-DeBandi99 and stress-tess authored Jul 21, 2023
1 parent 9899248 commit ca6f8e5
Show file tree
Hide file tree
Showing 8 changed files with 863 additions and 65 deletions.
114 changes: 108 additions & 6 deletions arkouda/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from arkouda.groupbyclass import GroupBy as akGroupBy
from arkouda.groupbyclass import unique
from arkouda.index import Index
from arkouda.io import _dict_recombine_segarrays_categoricals, get_filetype, load_all
from arkouda.numeric import cast as akcast
from arkouda.numeric import cumsum
from arkouda.numeric import isnan as akisnan
Expand Down Expand Up @@ -247,6 +246,8 @@ class DataFrame(UserDict):

COLUMN_CLASSES = (pdarray, Strings, Categorical, SegArray)

objType = "DataFrame"

def __init__(self, initialdata=None, index=None):
super().__init__()

Expand Down Expand Up @@ -350,7 +351,6 @@ def __init__(self, initialdata=None, index=None):
self.update_size()

def __getattr__(self, key):
# print("key =", key)
if key not in self.columns:
raise AttributeError(f"Attribute {key} not found")
# Should this be cached?
Expand Down Expand Up @@ -865,7 +865,7 @@ def index(self):
def _set_index(self, value):
if isinstance(value, Index) or value is None:
self._index = value
elif isinstance(value, pdarray):
elif isinstance(value, (pdarray, Strings)):
self._index = Index(value)
elif isinstance(value, list):
self._index = Index(array(value))
Expand Down Expand Up @@ -1497,9 +1497,82 @@ def to_hdf(self, path, index=False, columns=None, file_type="distribute"):
data = self._prep_data(index=index, columns=columns)
to_hdf(data, prefix_path=path, file_type=file_type)

def _to_hdf_snapshot(self, path, dataset="DataFrame", mode="truncate", file_type="distribute"):
"""
Save a dataframe as a group with columns within the group. This allows saving other
datasets in the HDF5 file without impacting the integrity of the dataframe
This is only used for the snapshot workflow
Parameters
----------
path : str
File path to save data
dataset: str
Name to save the dataframe under within the file
Only used when as_dataset=True
mode: str (truncate | append)
Default: trunate
Indicates whether the dataset should truncate the file and write or append
to the file
Only used when as_dataset=True
file_type: str (single | distribute)
Default: distribute
Whether to save to a single file or distribute across Locales
Only used when as_dataset=True
Returns
-------
None
Raises
------
RuntimeError
Raised if a server-side error is thrown saving the pdarray
"""
from arkouda.categorical import Categorical as Categorical_
from arkouda.io import _file_type_to_int, _mode_str_to_int

column_data = [
obj.name
if not isinstance(obj, (Categorical_, SegArray))
else json.dumps(
{
"codes": obj.codes.name,
"categories": obj.categories.name,
"NA_codes": obj._akNAcode.name,
**({"permutation": obj.permutation.name} if obj.permutation is not None else {}),
**({"segments": obj.segments.name} if obj.segments is not None else {}),
}
)
if isinstance(obj, Categorical_)
else json.dumps({"segments": obj.segments, "values": obj.values})
for k, obj in self.items()
]
dtypes = [
str(obj.categories.dtype) if isinstance(obj, Categorical_) else str(obj.dtype)
for obj in self.values()
]
return cast(
str,
generic_msg(
cmd="tohdf",
args={
"filename": path,
"dset": dataset,
"file_format": _file_type_to_int(file_type),
"write_mode": _mode_str_to_int(mode),
"objType": self.objType,
"num_cols": len(self.columns),
"column_names": self.columns,
"column_objTypes": [obj.objType for key, obj in self.items()],
"column_dtypes": dtypes,
"columns": column_data,
"index": self.index.values.name,
},
),
)

def update_hdf(self, prefix_path: str, index=False, columns=None, repack: bool = True):
"""
Overwrite the dataset with the name provided with this pdarray. If
Overwrite the dataset with the name provided with this dataframe. If
the dataset does not exist it is added
Parameters
Expand Down Expand Up @@ -1740,6 +1813,12 @@ def load(cls, prefix_path, file_format="INFER"):
Load dataframe from file
file_format needed for consistency with other load functions
"""
from arkouda.io import (
_dict_recombine_segarrays_categoricals,
get_filetype,
load_all,
)

prefix, extension = os.path.splitext(prefix_path)
first_file = f"{prefix}_LOCALE0000{extension}"
filetype = get_filetype(first_file) if file_format.lower() == "infer" else file_format
Expand Down Expand Up @@ -2387,7 +2466,7 @@ def _parse_col_name(entryName, dfName):
return colParts[3], colType

@staticmethod
def from_return_msg(repMsg):
def from_attach_msg(repMsg):
"""
Creates and returns a DataFrame based on return components from ak.util.attach
Expand Down Expand Up @@ -2440,7 +2519,7 @@ def from_return_msg(repMsg):
i += 3

elif parts[i] == "segarray":
info = json.loads(parts[i+1])
info = json.loads(parts[i + 1])
colName = DataFrame._parse_col_name(info["segments"], dfName)[0]
cols[colName] = SegArray.from_return_msg(parts[i + 1])
i += 1
Expand All @@ -2451,6 +2530,29 @@ def from_return_msg(repMsg):
df.name = dfName
return df

@classmethod
def from_return_msg(cls, rep_msg):
from arkouda.categorical import Categorical as Categorical_

data = json.loads(rep_msg)
idx = None
columns = {}
for k, create_data in data.items():
if k == "index":
idx = Index(create_pdarray(data["permutation"]))
else:
comps = create_data.split("+|+")
if comps[0] == pdarray.objType.upper():
columns[k] = create_pdarray(comps[1])
elif comps[0] == Strings.objType.upper():
columns[k] = Strings.from_return_msg(comps[1])
elif comps[0] == Categorical_.objType.upper():
columns[k] = Categorical_.from_return_msg(comps[1])
elif comps[0] == SegArray.objType.upper():
columns[k] = SegArray.from_return_msg(comps[1])

return cls(columns, idx)


def sorted(df, column=False):
"""
Expand Down
18 changes: 11 additions & 7 deletions arkouda/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,24 @@ def __init__(
self.size = values.size
self.dtype = values.dtype
self.name = name if name else values.name
return
elif isinstance(values, pd.Index):
self.values = array(values.values)
self.size = values.size
self.dtype = self.values.dtype
self.name = name if name else values.name
return
elif isinstance(values, List):
values = array(values)

self.values = values
self.size = self.values.size
self.dtype = self.values.dtype
self.name = name
self.values = values
self.size = self.values.size
self.dtype = self.values.dtype
self.name = name
elif isinstance(values, (pdarray, Strings, Categorical)):
self.values = values
self.size = self.values.size
self.dtype = self.values.dtype
self.name = name
else:
raise TypeError(f"Unable to create Index from type {type(values)}")

def __getitem__(self, key):
from arkouda.series import Series
Expand Down
25 changes: 17 additions & 8 deletions arkouda/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from arkouda.array_view import ArrayView
from arkouda.categorical import Categorical
from arkouda.client import generic_msg
from arkouda.dataframe import DataFrame
from arkouda.groupbyclass import GroupBy
from arkouda.pdarrayclass import create_pdarray, pdarray
from arkouda.pdarraycreation import array, arange
from arkouda.pdarraycreation import arange, array
from arkouda.segarray import SegArray
from arkouda.strings import Strings

Expand Down Expand Up @@ -435,7 +436,7 @@ def _parse_errors(rep_msg, allow_errors: bool = False):

def _parse_obj(
obj: Dict,
) -> Union[Strings, pdarray, ArrayView, SegArray, Categorical]:
) -> Union[Strings, pdarray, ArrayView, SegArray, Categorical, DataFrame]:
"""
Helper function to create an Arkouda object from read response
Expand Down Expand Up @@ -468,6 +469,8 @@ def _parse_obj(
return Categorical.from_return_msg(obj["created"])
elif GroupBy.objType.upper() == obj["arkouda_type"]:
return GroupBy.from_return_msg(obj["created"])
elif DataFrame.objType.upper() == obj["arkouda_type"]:
return DataFrame.from_return_msg(obj["created"])
else:
raise TypeError(f"Unknown arkouda type:{obj['arkouda_type']}")

Expand Down Expand Up @@ -519,7 +522,8 @@ def _build_objects(
SegArray,
ArrayView,
Categorical,
Mapping[str, Union[Strings, pdarray, SegArray, ArrayView, Categorical]],
DataFrame,
Mapping[str, Union[Strings, pdarray, SegArray, ArrayView, Categorical, DataFrame]],
]:
"""
Helper function to create the Arkouda objects from a read operation
Expand Down Expand Up @@ -569,7 +573,8 @@ def read_hdf(
SegArray,
ArrayView,
Categorical,
Mapping[str, Union[pdarray, Strings, SegArray, ArrayView, Categorical]],
DataFrame,
Mapping[str, Union[pdarray, Strings, SegArray, ArrayView, Categorical, DataFrame]],
]:
"""
Read Arkouda objects from HDF5 file/s
Expand Down Expand Up @@ -698,7 +703,8 @@ def read_parquet(
SegArray,
ArrayView,
Categorical,
Mapping[str, Union[pdarray, Strings, SegArray, ArrayView, Categorical]],
DataFrame,
Mapping[str, Union[pdarray, Strings, SegArray, ArrayView, Categorical, DataFrame]],
]:
"""
Read Arkouda objects from Parquet file/s
Expand Down Expand Up @@ -825,7 +831,8 @@ def read_csv(
SegArray,
ArrayView,
Categorical,
Mapping[str, Union[pdarray, Strings, SegArray, ArrayView, Categorical]],
DataFrame,
Mapping[str, Union[pdarray, Strings, SegArray, ArrayView, Categorical, DataFrame]],
]:
"""
Read CSV file(s) into Arkouda objects. If more than one dataset is found, the objects
Expand Down Expand Up @@ -1516,7 +1523,8 @@ def load(
SegArray,
ArrayView,
Categorical,
Mapping[str, Union[pdarray, Strings, SegArray, ArrayView, Categorical]],
DataFrame,
Mapping[str, Union[pdarray, Strings, SegArray, ArrayView, Categorical, DataFrame]],
]:
"""
Load a pdarray previously saved with ``pdarray.save()``.
Expand Down Expand Up @@ -1707,7 +1715,8 @@ def read(
SegArray,
ArrayView,
Categorical,
Mapping[str, Union[pdarray, Strings, SegArray, ArrayView, Categorical]],
DataFrame,
Mapping[str, Union[pdarray, Strings, SegArray, ArrayView, Categorical, DataFrame]],
]:
"""
Read datasets from files.
Expand Down
2 changes: 1 addition & 1 deletion arkouda/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def attach(name: str, dtype: str = "infer"):
elif repType == "dataframe":
from arkouda.dataframe import DataFrame

return DataFrame.from_return_msg(repMsg)
return DataFrame.from_attach_msg(repMsg)
elif repType == "segarray":
repMsg = repMsg[len(repType) + 1 :]
return SegArray.from_return_msg(repMsg)
Expand Down
6 changes: 2 additions & 4 deletions src/GenSymIO.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,8 @@ module GenSymIO {
var (segName, nBytes) = id.splitMsgToTuple("+", 2);
create_str = "created " + st.attrib(segName) + "+created bytes.size " + nBytes;
}
else if (akType == ObjType.SEGARRAY || akType == ObjType.CATEGORICAL) {
create_str = id;
}
else if akType == ObjType.GROUPBY {
else if (akType == ObjType.SEGARRAY || akType == ObjType.CATEGORICAL ||
akType == ObjType.GROUPBY || akType == ObjType.DATAFRAME) {
create_str = id;
}
else {
Expand Down
Loading

0 comments on commit ca6f8e5

Please sign in to comment.