Skip to content

Commit dd1bfaa

Browse files
committed
Fix and test arcticdb reading streaming data
Fixes: - Column filter in static schema - Column ordering when introducing a new column with an incomplete segment Tests: - Columns filter in static and dynamic schema - Reading diffrent schema incompletes - Compatibility test for reading incompletes from an old env
1 parent 7b4bfd1 commit dd1bfaa

File tree

4 files changed

+152
-8
lines changed

4 files changed

+152
-8
lines changed

cpp/arcticdb/pipeline/read_pipeline.hpp

+10-5
Original file line numberDiff line numberDiff line change
@@ -141,19 +141,24 @@ inline void generate_filtered_field_descriptors(std::shared_ptr<PipelineContext>
141141
generate_filtered_field_descriptors(*context, columns);
142142
}
143143

144+
inline void get_column_bitset_in_context(
145+
const ReadQuery& query,
146+
const std::shared_ptr<PipelineContext>& pipeline_context) {
147+
pipeline_context->set_selected_columns(query.columns);
148+
pipeline_context->overall_column_bitset_ = overall_column_bitset(pipeline_context->descriptor(),
149+
query.clauses_,
150+
pipeline_context->selected_columns_);
151+
}
152+
144153
template<class ContainerType>
145154
inline std::vector<FilterQuery<ContainerType>> get_column_bitset_and_query_functions(
146155
const ReadQuery& query,
147156
const std::shared_ptr<PipelineContext>& pipeline_context,
148157
bool dynamic_schema,
149158
bool column_groups) {
150159
using namespace arcticdb::pipelines::index;
151-
152160
if(!dynamic_schema || column_groups) {
153-
pipeline_context->set_selected_columns(query.columns);
154-
pipeline_context->overall_column_bitset_ = overall_column_bitset(pipeline_context->descriptor(),
155-
query.clauses_,
156-
pipeline_context->selected_columns_);
161+
get_column_bitset_in_context(query, pipeline_context);
157162
}
158163
return build_read_query_filters<ContainerType>(pipeline_context, query.row_filter, dynamic_schema, column_groups);
159164
}

cpp/arcticdb/version/version_core.cpp

+11-3
Original file line numberDiff line numberDiff line change
@@ -1085,9 +1085,17 @@ bool read_incompletes_to_pipeline(
10851085
// Mark the start point of the incompletes, so we know that there is no column slicing after this point
10861086
pipeline_context->incompletes_after_ = pipeline_context->slice_and_keys_.size();
10871087

1088-
// If there are only incompletes we need to add the index here
10891088
if(pipeline_context->slice_and_keys_.empty()) {
1089+
// If there are only incompletes we need to do the following (typically done when reading the index key):
1090+
// - add the index columns to query
1091+
// - in case of static schema: populate the descriptor and column_bitset
10901092
add_index_columns_to_query(read_query, seg.index_descriptor());
1093+
if (!dynamic_schema) {
1094+
pipeline_context->desc_ = seg.descriptor();
1095+
get_column_bitset_in_context(
1096+
read_query,
1097+
pipeline_context);
1098+
}
10911099
}
10921100
pipeline_context->slice_and_keys_.insert(std::end(pipeline_context->slice_and_keys_), incomplete_segments.begin(), incomplete_segments.end());
10931101

@@ -1116,9 +1124,9 @@ bool read_incompletes_to_pipeline(
11161124
pipeline_context->staged_descriptor_ =
11171125
merge_descriptors(seg.descriptor(), incomplete_segments, read_query.columns);
11181126
if (pipeline_context->desc_) {
1119-
const std::array fields_ptr = {pipeline_context->desc_->fields_ptr()};
1127+
const std::array staged_fields_ptr = {pipeline_context->staged_descriptor_->fields_ptr()};
11201128
pipeline_context->desc_ =
1121-
merge_descriptors(*pipeline_context->staged_descriptor_, fields_ptr, read_query.columns);
1129+
merge_descriptors(*pipeline_context->desc_, staged_fields_ptr, read_query.columns);
11221130
} else {
11231131
pipeline_context->desc_ = pipeline_context->staged_descriptor_;
11241132
}

python/tests/compat/arcticdb/test_compatibility.py

+47
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22
from packaging import version
33
import pandas as pd
4+
import numpy as np
45
from arcticdb.util.test import assert_frame_equal
56
from arcticdb.options import ModifiableEnterpriseLibraryOption
67
from arcticdb.toolbox.library_tool import LibraryTool
@@ -171,3 +172,49 @@ def test_compat_snapshot_metadata_read(old_venv_and_arctic_uri, lib_name):
171172
snaps = curr.lib.list_snapshots()
172173
meta = snaps["old_snap"]
173174
assert meta == {"old_key": "old_value"}
175+
176+
177+
def test_compat_read_incomplete(old_venv_and_arctic_uri, lib_name):
178+
old_venv, arctic_uri = old_venv_and_arctic_uri
179+
sym = "sym"
180+
df = pd.DataFrame({
181+
"col": np.arange(10),
182+
"float_col": np.arange(10, dtype=np.float64),
183+
"str_col": [f"str_{i}" for i in range(10)]
184+
}, pd.date_range("2024-01-01", periods=10))
185+
df_1 = df.iloc[:8]
186+
df_2 = df.iloc[8:]
187+
188+
old_ac = old_venv.create_arctic(arctic_uri)
189+
old_lib = old_ac.create_library(lib_name)
190+
191+
if version.Version(old_venv.version) >= version.Version("5.1.0"):
192+
# In version 5.1.0 (with commit a3b7545) we moved the streaming incomplete python API to the library tool.
193+
old_lib.execute([
194+
"""
195+
lib_tool = lib.library_tool()
196+
lib_tool.append_incomplete("sym", df_1)
197+
lib_tool.append_incomplete("sym", df_2)
198+
"""
199+
], dfs={"df_1": df_1, "df_2": df_2})
200+
else:
201+
old_lib.execute([
202+
"""
203+
lib._nvs.append("sym", df_1, incomplete=True)
204+
lib._nvs.append("sym", df_2, incomplete=True)
205+
"""
206+
], dfs={"df_1": df_1, "df_2": df_2})
207+
208+
209+
with CurrentVersion(arctic_uri, lib_name) as curr:
210+
read_df = curr.lib._nvs.read(sym, date_range=(None, None), incomplete=True).data
211+
assert_frame_equal(read_df, df)
212+
213+
read_df = curr.lib._nvs.read(sym, date_range=(None, None), incomplete=True, columns=["float_col"]).data
214+
assert_frame_equal(read_df, df[["float_col"]])
215+
216+
read_df = curr.lib._nvs.read(sym, date_range=(None, None), incomplete=True, columns=["float_col", "str_col"]).data
217+
assert_frame_equal(read_df, df[["float_col", "str_col"]])
218+
219+
read_df = curr.lib._nvs.read(sym, date_range=(pd.Timestamp(2024, 1, 5), pd.Timestamp(2024, 1, 9)), incomplete=True, columns=["float_col", "str_col"]).data
220+
assert_frame_equal(read_df, df[["float_col", "str_col"]].iloc[4:9])

python/tests/unit/arcticdb/version_store/test_incompletes.py

+84
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from arcticdb.exceptions import MissingDataException
1313
from arcticdb_ext.storage import KeyType
1414

15+
from arcticdb.util.venv import CurrentVersion
1516

1617
@pytest.mark.parametrize("batch", (True, False))
1718
def test_read_incompletes_with_indexed_data(lmdb_version_store_v1, batch):
@@ -80,3 +81,86 @@ def test_read_incompletes_no_chunking(lmdb_version_store_tiny_segment):
8081

8182
ref_keys = lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, sym)
8283
assert len(ref_keys) == 1
84+
85+
@pytest.mark.parametrize("dynamic_schema", [True, False])
86+
def test_read_incompletes_columns_filter(version_store_factory, dynamic_schema):
87+
lib = version_store_factory(dynamic_schema=dynamic_schema)
88+
lib_tool = lib.library_tool()
89+
sym = "sym"
90+
df = pd.DataFrame({
91+
"col": np.arange(20),
92+
"float_col": np.arange(20, dtype=np.float64),
93+
"str_col": [f"str_{i}" for i in range(20)]
94+
}, pd.date_range("2024-01-01", periods=20))
95+
lib_tool.append_incomplete(sym, df.iloc[:5])
96+
lib_tool.append_incomplete(sym, df.iloc[5:8])
97+
lib_tool.append_incomplete(sym, df.iloc[8:10])
98+
99+
date_range = (None, None)
100+
col_df = lib.read(sym, date_range=date_range, incomplete=True, columns=["col"]).data
101+
assert_frame_equal(col_df, df[["col"]].iloc[:10])
102+
103+
float_col_df = lib.read(sym, date_range=date_range, incomplete=True, columns=["float_col"]).data
104+
assert_frame_equal(float_col_df, df[["float_col"]].iloc[:10])
105+
106+
float_and_str_col_df = lib.read(sym, date_range=date_range, incomplete=True, columns=["float_col", "str_col"]).data
107+
assert_frame_equal(float_and_str_col_df, df[["float_col", "str_col"]].iloc[:10])
108+
109+
date_range = (pd.Timestamp(2024, 1, 3), pd.Timestamp(2024, 1, 8))
110+
float_and_str_col_df = lib.read(sym, date_range=date_range, incomplete=True, columns=["float_col", "str_col"]).data
111+
assert_frame_equal(float_and_str_col_df, df[["float_col", "str_col"]].iloc[2:8])
112+
113+
# Compact and add the rest of the df
114+
lib.compact_incomplete(sym, append=True, convert_int_to_float=False, via_iteration=False)
115+
lib_tool.append_incomplete(sym, df.iloc[10:17])
116+
lib_tool.append_incomplete(sym, df.iloc[17:])
117+
118+
date_range = (None, None)
119+
float_col_df = lib.read(sym, date_range=date_range, incomplete=True, columns=["float_col"]).data
120+
assert_frame_equal(float_col_df, df[["float_col"]])
121+
122+
float_and_str_col_df = lib.read(sym, date_range=date_range, incomplete=True, columns=["float_col", "str_col"]).data
123+
assert_frame_equal(float_and_str_col_df, df[["float_col", "str_col"]])
124+
125+
# Only incomplete range
126+
date_range = (pd.Timestamp(2024, 1, 12), pd.Timestamp(2024, 1, 18))
127+
float_and_str_col_df = lib.read(sym, date_range=date_range, incomplete=True, columns=["float_col", "str_col"]).data
128+
assert_frame_equal(float_and_str_col_df, df[["float_col", "str_col"]].iloc[11:18])
129+
130+
131+
def test_read_incompletes_dynamic(lmdb_version_store_dynamic_schema_v1):
132+
lib = lmdb_version_store_dynamic_schema_v1
133+
lib_tool = lib.library_tool()
134+
sym = "sym"
135+
136+
def get_date(days_after_epoch):
137+
return pd.Timestamp(0) + pd.DateOffset(days=days_after_epoch)
138+
139+
def get_index(days_after_epoch, num_days):
140+
return pd.date_range(get_date(days_after_epoch), periods=num_days, freq="d")
141+
142+
df_1 = pd.DataFrame({"col_1": [1., 2., 3.], "col_2": [1., 2., 3.]}, index=get_index(0, 3))
143+
df_2 = pd.DataFrame({"col_2": [4., 5.], "col_3": [1., 2.]}, index=get_index(3, 2))
144+
df_3 = pd.DataFrame({"col_3": [3., 4.], "col_4": [1., 2.]}, index=get_index(5, 2))
145+
146+
lib_tool.append_incomplete(sym, df_1)
147+
lib_tool.append_incomplete(sym, df_2)
148+
149+
df = lib.read(sym, date_range=(None, None), incomplete=True).data
150+
assert_frame_equal(df, pd.concat([df_1, df_2]))
151+
152+
# If reading just a single incomplete we will get the result in its own schema
153+
df = lib.read(sym, date_range = (get_date(3), None), incomplete=True).data
154+
assert_frame_equal(df, df_2)
155+
156+
lib.compact_incomplete(sym, append=True, convert_int_to_float=False, via_iteration=False)
157+
158+
df = lib.read(sym, date_range=(None, None), incomplete=True).data
159+
assert_frame_equal(df, pd.concat([df_1, df_2]))
160+
161+
lib_tool.append_incomplete(sym, df_3)
162+
df = lib.read(sym, date_range=(None, None), incomplete=True).data
163+
assert_frame_equal(df, pd.concat([df_1, df_2, df_3]))
164+
165+
df_col_filter = lib.read(sym, date_range=(None, None), incomplete=True, columns=["col_2", "col_4"]).data
166+
assert_frame_equal(df_col_filter, pd.concat([df_1, df_2, df_3])[["col_2", "col_4"]])

0 commit comments

Comments
 (0)