Skip to content

Commit

Permalink
Closes Bears-R-Us#2512 - Snapshot to HDF5 (Bears-R-Us#2606)
Browse files Browse the repository at this point in the history
* Snapshot restore functionality. Waiting on user feedback for dataframe handling. Need to add docstrings and more detailed comments. Need to test on Multi-Locale.

* Snapshot completed returning dictionary.

* clean up

* Cleanup 2

* Adding docstrings

* Addressing review

* Fixing var location in code

---------

Co-authored-by: pierce <48131946+pierce314159@users.noreply.github.com>
  • Loading branch information
Ethan-DeBandi99 and stress-tess authored Jul 24, 2023
1 parent 1ec91cc commit 4769af9
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 41 deletions.
2 changes: 1 addition & 1 deletion arkouda/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1543,7 +1543,7 @@ def _to_hdf_snapshot(self, path, dataset="DataFrame", mode="truncate", file_type
}
)
if isinstance(obj, Categorical_)
else json.dumps({"segments": obj.segments, "values": obj.values})
else json.dumps({"segments": obj.segments.name, "values": obj.values.name})
for k, obj in self.items()
]
dtypes = [
Expand Down
64 changes: 64 additions & 0 deletions arkouda/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
"load",
"load_all",
"update_hdf",
"snapshot",
"restore",
]

ARKOUDA_HDF5_FILE_METADATA_GROUP = "_arkouda_metadata"
Expand Down Expand Up @@ -1921,3 +1923,65 @@ def read_tagged_data(
raise RuntimeError("CSV does not support tagging data with file name associated.")
else:
raise RuntimeError(f"Invalid File Type detected, {ftype}")


def snapshot(filename):
"""
Create a snapshot of the current Arkouda namespace. All currently accessible variables containing
Arkouda objects will be written to an HDF5 file.
Unlike other save/load functions, this maintains the integrity of dataframes.
Current Variable names are used as the dataset name when saving.
Parameters
----------
filename: str
Name to use when storing file
Returns
--------
None
See Also
---------
ak.restore
"""
import inspect
from types import ModuleType
from arkouda.dataframe import DataFrame

filename = filename + "_SNAPSHOT"
mode = "TRUNCATE"
callers_local_vars = inspect.currentframe().f_back.f_locals.items()
for name, val in [
(n, v) for n, v in callers_local_vars if not n.startswith("__") and not isinstance(v, ModuleType)
]:
if isinstance(val, (pdarray, Categorical, SegArray, Strings, DataFrame, GroupBy)):
if isinstance(val, DataFrame):
val._to_hdf_snapshot(filename, dataset=name, mode=mode)
else:
val.to_hdf(filename, dataset=name, mode=mode)
mode = "APPEND"


def restore(filename):
"""
Return data saved using `ak.snapshot`
Parameters
----------
filename: str
Name used to create snapshot to be read
Returns
--------
Dict
Notes
------
Unlike other save/load methods using snapshot restore will save DataFrames alongside other
objects in HDF5. Thus, they are returned within the dictionary as a dataframe.
"""
restore_files = glob.glob(f"{filename}_SNAPSHOT_LOCALE*")
return read_hdf(restore_files)
26 changes: 13 additions & 13 deletions src/HDF5Msg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -2181,41 +2181,41 @@ module HDF5Msg {
when DType.Int64 {
var values = toSymEntry(vals_entry, int);
//localize values and write dataset
writeSegmentedLocalDset(file_id, group, values, segments, true, int);
writeSegmentedLocalDset(file_id, "%s/%s".format(group, dset), values, segments, true, int);
dtype = getDataType(int);
}
when DType.UInt64 {
var values = toSymEntry(vals_entry, uint);
//localize values and write dataset
writeSegmentedLocalDset(file_id, group, values, segments, true, uint);
writeSegmentedLocalDset(file_id, "%s/%s".format(group, dset), values, segments, true, uint);
dtype = getDataType(uint);
}
when DType.Float64 {
var values = toSymEntry(vals_entry, real);
//localize values and write dataset
writeSegmentedLocalDset(file_id, group, values, segments, true, real);
writeSegmentedLocalDset(file_id, "%s/%s".format(group, dset), values, segments, true, real);
dtype = getDataType(real);
}
when DType.Bool {
var values = toSymEntry(vals_entry, bool);
//localize values and write dataset
writeSegmentedLocalDset(file_id, group, values, segments, true, bool);
writeSegmentedLocalDset(file_id, "%s/%s".format(group, dset), values, segments, true, bool);
dtype = getDataType(bool);
}
when (DType.BigInt) {
var values = toSymEntry(vals_entry, bigint);
// create the group
validateGroup(file_id, f, "%s/%s".format(group, SEGMENTED_VALUE_NAME), overwrite); // stored as group - group uses the dataset name
validateGroup(file_id, f, "%s/%s/%s".format(group, dset, SEGMENTED_VALUE_NAME), overwrite); // stored as group - group uses the dataset name
//localize values and write dataset
writeSegmentedLocalDset(file_id, group, values, segments, true, bigint);
dtype = getDataType(uint);
}
when (DType.Strings){
var values = toSegStringSymEntry(vals_entry);
// create the group
validateGroup(file_id, f, "%s/%s".format(group, SEGMENTED_VALUE_NAME), overwrite);
validateGroup(file_id, f, "%s/%s/%s".format(group, dset, SEGMENTED_VALUE_NAME), overwrite);
//localize values and write dataset
writeNestedSegmentedLocalDset(file_id, group, values, segments, true, uint(8));
writeNestedSegmentedLocalDset(file_id, "%s/%s".format(group, dset), values, segments, true, uint(8));
dtype = getDataType(uint(8));
}
otherwise {
Expand Down Expand Up @@ -2410,27 +2410,27 @@ module HDF5Msg {
select vals_entry.dtype {
when DType.Int64 {
var values = toSymEntry(vals_entry, int);
writeSegmentedDistDset(filenames, group, ot: string, overwrite, values.a, segments.a, st, int);
writeSegmentedDistDset(filenames, "/%s/%s".format(group, dset), ot: string, overwrite, values.a, segments.a, st, int);
}
when DType.UInt64 {
var values = toSymEntry(vals_entry, uint);
writeSegmentedDistDset(filenames, group, ot: string, overwrite, values.a, segments.a, st, uint);
writeSegmentedDistDset(filenames, "/%s/%s".format(group, dset), ot: string, overwrite, values.a, segments.a, st, uint);
}
when DType.Float64 {
var values = toSymEntry(vals_entry, real);
writeSegmentedDistDset(filenames, group, ot: string, overwrite, values.a, segments.a, st, real);
writeSegmentedDistDset(filenames, "/%s/%s".format(group, dset), ot: string, overwrite, values.a, segments.a, st, real);
}
when DType.Bool {
var values = toSymEntry(vals_entry, bool);
writeSegmentedDistDset(filenames, group, ot: string, overwrite, values.a, segments.a, st, bool);
writeSegmentedDistDset(filenames, "/%s/%s".format(group, dset), ot: string, overwrite, values.a, segments.a, st, bool);
}
when DType.BigInt {
var values = toSymEntry(vals_entry, bigint);
writeSegmentedDistDset(filenames, group, ot: string, overwrite, values.a, segments.a, st, bigint, values.max_bits);
writeSegmentedDistDset(filenames, "/%s/%s".format(group, dset), ot: string, overwrite, values.a, segments.a, st, bigint, values.max_bits);
}
when DType.Strings {
var values = toSegStringSymEntry(vals_entry);
writeNestedSegmentedDistDset(filenames, group, ot: string, overwrite, values, segments.a, st, uint(8));
writeNestedSegmentedDistDset(filenames, "/%s/%s".format(group, dset), ot: string, overwrite, values, segments.a, st, uint(8));
}
otherwise {
throw getErrorWithContext(
Expand Down
27 changes: 0 additions & 27 deletions tests/dataframe_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,30 +719,3 @@ def test_subset(self):
self.assertListEqual(df.index.to_list(), df2.index.to_list())
self.assertListEqual(df["a"].to_list(), df2["a"].to_list())
self.assertListEqual(df["b"].to_list(), df2["b"].to_list())

def test_snapshot(self):
from pandas.testing import assert_frame_equal

df = build_ak_df()
# standard index
column_order = ["userName", "userID", "item", "day", "amount", "bi"]
with tempfile.TemporaryDirectory(dir=DataFrameTest.df_test_base_tmp) as tmp_dirname:
fname = tmp_dirname + "/snapshot_test"
df._to_hdf_snapshot(fname)
rd_data = ak.read_hdf(fname + "_*")

self.assertTrue(
assert_frame_equal(df[column_order].to_pandas(), rd_data[column_order].to_pandas())
is None
)

df._set_index(["A" + str(i) for i in range(len(df))])
with tempfile.TemporaryDirectory(dir=DataFrameTest.df_test_base_tmp) as tmp_dirname:
fname = tmp_dirname + "/snapshot_test"
df._to_hdf_snapshot(fname)
rd_data = ak.read_hdf(fname + "_*")

self.assertTrue(
assert_frame_equal(df[column_order].to_pandas(), rd_data[column_order].to_pandas())
is None
)
75 changes: 75 additions & 0 deletions tests/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,81 @@ def test_segarray_str_hdf5(self):
self.assertListEqual(x.segments.to_list(), rd.segments.to_list())
self.assertListEqual(x.values.to_list(), rd.values.to_list())

def test_snapshot(self):
from pandas.testing import assert_frame_equal
df = ak.DataFrame(
{
"int_col": ak.arange(10),
"uint_col": ak.array([i + 2**63 for i in range(10)], dtype=ak.uint64),
"float_col": ak.linspace(-3.5, 3.5, 10),
"bool_col": ak.randint(0, 2, 10, dtype=ak.bool),
"bigint_col": ak.array([i + 2**200 for i in range(10)], dtype=ak.bigint),
"segarr_col": ak.SegArray(ak.arange(0, 20, 2), ak.randint(0, 3, 20)),
"str_col": ak.random_strings_uniform(0, 3, 10),
}
)
df_str_idx = df.copy()
df_str_idx._set_index(["A" + str(i) for i in range(len(df))])
col_order = df.columns
df_ref = df.to_pandas()
df_str_idx_ref = df_str_idx.to_pandas()
a = ak.randint(0, 10, 100)
a_ref = a.to_list()
s = ak.random_strings_uniform(0, 5, 50)
s_ref = s.to_list()
c = ak.Categorical(s)
c_ref = c.to_list()
g = ak.GroupBy(a)
g_ref = {
"perm": g.permutation.to_list(),
"keys": g.keys.to_list(),
"segments": g.segments.to_list(),
}

with tempfile.TemporaryDirectory(dir=IOTest.io_test_dir) as tmp_dirname:
ak.snapshot(f"{tmp_dirname}/arkouda_snapshot_test")
# delete variables
del df
del df_str_idx
del a
del s
del c
del g

# verify no longer in the namespace
with self.assertRaises(NameError):
self.assertTrue(not df)
with self.assertRaises(NameError):
self.assertTrue(not df_str_idx)
with self.assertRaises(NameError):
self.assertTrue(not a)
with self.assertRaises(NameError):
self.assertTrue(not s)
with self.assertRaises(NameError):
self.assertTrue(not c)
with self.assertRaises(NameError):
self.assertTrue(not g)

# restore the variables
data = ak.restore(f"{tmp_dirname}/arkouda_snapshot_test")
for vn in ["df", "df_str_idx", "a", "s", "c", "g"]:
# ensure all variable names returned
self.assertTrue(vn in data.keys())

# validate that restored variables are correct
self.assertTrue(
assert_frame_equal(df_ref[col_order], data["df"].to_pandas()[col_order]) is None
)
self.assertTrue(
assert_frame_equal(df_str_idx_ref[col_order], data["df_str_idx"].to_pandas()[col_order]) is None
)
self.assertListEqual(a_ref, data["a"].to_list())
self.assertListEqual(s_ref, data["s"].to_list())
self.assertListEqual(c_ref, data["c"].to_list())
self.assertListEqual(g_ref["perm"], data["g"].permutation.to_list())
self.assertListEqual(g_ref["keys"], data["g"].keys.to_list())
self.assertListEqual(g_ref["segments"], data["g"].segments.to_list())

def tearDown(self):
super(IOTest, self).tearDown()
for f in glob.glob("{}/*".format(IOTest.io_test_dir)):
Expand Down

0 comments on commit 4769af9

Please sign in to comment.