Skip to content

Commit 47e9a5e

Browse files
authored
Merge branch 'master' into 2221-arcticdb-writes-dont-handle-numpy-slicesviews-correctly
2 parents 35a5e86 + 7b4bfd1 commit 47e9a5e

File tree

4 files changed

+289
-4
lines changed

4 files changed

+289
-4
lines changed

environment_unix.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ dependencies:
3636
- azure-core-cpp
3737
- azure-identity-cpp
3838
- azure-storage-blobs-cpp
39-
- fmt
39+
- fmt <11.1
4040
- folly==2023.09.25.00
4141
- unordered_dense
4242
# Vendored build dependencies (see `cpp/thirdparty` and `cpp/vcpkg.json`)

python/arcticdb/util/utils.py

+126
Original file line numberDiff line numberDiff line change
@@ -553,3 +553,129 @@ def generate_random_dataframe(cls, rows: int, cols: int, indexed: bool = True, s
553553
gen.add_timestamp_index("index", "s", pd.Timestamp(0))
554554
return gen.generate_dataframe()
555555

556+
557+
@classmethod
558+
def generate_random_int_dataframe(seclslf, start_name_prefix: str,
559+
num_rows:int, num_cols:int,
560+
dtype: ArcticIntType = np.int64, min_value: int = None, max_value: int = None,
561+
seed: int = 3432) -> pd.DataFrame:
562+
"""
563+
To be used to generate large number of same type columns, when generation time is
564+
critical
565+
"""
566+
np.random.seed(seed=seed)
567+
platform_int_info = np.iinfo("int_")
568+
iinfo = np.iinfo(dtype)
569+
if min_value is None:
570+
min_value = max(iinfo.min, platform_int_info.min)
571+
if max_value is None:
572+
max_value = min(iinfo.max, platform_int_info.max)
573+
574+
data = np.random.randint(min_value, max_value, size=(num_rows, num_cols), dtype= dtype)
575+
columns = [f"{start_name_prefix}_{n}" for n in range(num_cols)]
576+
577+
return pd.DataFrame(data=data, columns=columns)
578+
579+
@classmethod
580+
def generate_random_float_dataframe(cls, start_name_prefix: str, num_rows: int, num_cols: int,
581+
dtype: ArcticFloatType = np.float64,
582+
min_value: float = None, max_value: float = None, round_at: int = None,
583+
seed: int = 54675) -> 'DFGenerator':
584+
"""
585+
To be used to generate large number of same type columns, when generation time is
586+
critical
587+
"""
588+
# Higher numbers will trigger overflow in numpy uniform (-1e307 - 1e307)
589+
# Get the minimum and maximum values for np.float32
590+
info = np.finfo(np.float32)
591+
_max = info.max
592+
_min = info.min
593+
np.random.seed(seed)
594+
if min_value is None:
595+
min_value = max(-1e307, -sys.float_info.max, _min)
596+
if max_value is None:
597+
max_value = min(1e307, sys.float_info.max, _max)
598+
if round_at is None:
599+
data = np.random.uniform(min_value, max_value, size=(num_rows, num_cols)).astype(dtype)
600+
else :
601+
data = np.round(np.random.uniform(min_value, max_value,
602+
size=(num_rows, num_cols)), round_at).astype(dtype)
603+
604+
columns = [f"{start_name_prefix}_{n}" for n in range(num_cols)]
605+
606+
return pd.DataFrame(data=data, columns=columns)
607+
608+
@classmethod
609+
def generate_random_strings_dataframe(cls, start_name_prefix: str, num_rows: int, num_cols: int,
610+
column_sizes=None, seed: int = 4543):
611+
"""
612+
To be used to generate large number of same type columns, when generation time is
613+
critical
614+
If `column_sizes` not supplied default 10 will be used
615+
"""
616+
if column_sizes is None:
617+
column_sizes = [10] * num_cols
618+
np.random.seed(seed=seed)
619+
data = [[random_string(column_sizes[col])
620+
for col in range(num_cols)]
621+
for _ in range(num_rows)]
622+
623+
columns = [f"{start_name_prefix}_{n}" for n in range(num_cols)]
624+
return pd.DataFrame(data=data, columns=columns)
625+
626+
@classmethod
627+
def generate_wide_dataframe(cls, num_rows: int, num_cols: int,
628+
num_string_cols: int,
629+
start_time: pd.Timestamp = None,
630+
freq: Union[str , timedelta , pd.Timedelta , pd.DateOffset] = 's',
631+
seed = 23445):
632+
"""
633+
Generates as fast as possible specified number of columns.
634+
Uses random arrays generation in numpy to do that
635+
As the strings generation is slowest always be mindful to pass number between 1-1000 max
636+
The generated dataframe will have also index starting at specified `start_time`
637+
"""
638+
639+
cols, mod = divmod(num_cols - num_string_cols, 10) # divide by number of unique frame types
640+
641+
int_frame1 = cls.generate_random_int_dataframe("int8", num_rows=num_rows, num_cols=cols,
642+
dtype=np.int8, seed=seed)
643+
644+
int_frame2 = cls.generate_random_int_dataframe("int16", num_rows=num_rows, num_cols=cols,
645+
dtype=np.int16, seed=seed)
646+
647+
int_frame3 = cls.generate_random_int_dataframe("int32", num_rows=num_rows, num_cols=cols,
648+
dtype=np.int32, seed=seed)
649+
650+
int_frame4 = cls.generate_random_int_dataframe("int64", num_rows=num_rows, num_cols=cols + mod,
651+
dtype=np.int64, seed=seed)
652+
653+
uint_frame1 = cls.generate_random_int_dataframe("uint8", num_rows=num_rows, num_cols=cols,
654+
dtype=np.uint8, seed=seed)
655+
656+
uint_frame2 = cls.generate_random_int_dataframe("uint16", num_rows=num_rows, num_cols=cols,
657+
dtype=np.uint16, seed=seed)
658+
659+
uint_frame3 = cls.generate_random_int_dataframe("uint32", num_rows=num_rows, num_cols=cols,
660+
dtype=np.uint32, seed=seed)
661+
662+
uint_frame4 = cls.generate_random_int_dataframe("uint64", num_rows=num_rows, num_cols=cols,
663+
dtype=np.uint64, seed=seed)
664+
665+
float_frame1 = cls.generate_random_float_dataframe("float32", num_rows=num_rows, num_cols=cols,
666+
dtype=np.float32, seed=seed)
667+
668+
float_frame2 = cls.generate_random_float_dataframe("float64", num_rows=num_rows, num_cols=cols,
669+
dtype=np.float64, seed=seed)
670+
671+
str_frame = cls.generate_random_strings_dataframe("str", num_rows=num_rows, num_cols=num_string_cols)
672+
673+
frame: pd.DataFrame = pd.concat([int_frame1, int_frame2, int_frame3, int_frame4,
674+
uint_frame1, uint_frame2, uint_frame3, uint_frame4,
675+
float_frame1, float_frame2, str_frame], axis=1) # Concatenate horizontally
676+
677+
if start_time:
678+
range = pd.date_range(start=start_time, periods=frame.shape[0], freq=freq, name='index')
679+
frame.index = range
680+
681+
return frame

python/tests/conftest.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def lmdb_storage(tmp_path) -> Generator[LmdbStorageFixture, None, None]:
129129

130130

131131
@pytest.fixture
132-
def lmdb_library(lmdb_storage, lib_name) -> Library:
132+
def lmdb_library(lmdb_storage, lib_name) -> Generator[Library, None, None]:
133133
return lmdb_storage.create_arctic().create_library(lib_name)
134134

135135

python/tests/stress/arcticdb/version_store/test_mem_leaks.py

+161-2
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,28 @@
2222
import time
2323
import pandas as pd
2424

25-
from typing import Generator, Tuple
25+
from typing import Generator, List, Tuple
26+
from arcticdb.encoding_version import EncodingVersion
27+
from arcticdb.options import LibraryOptions
2628
from arcticdb.util.test import get_sample_dataframe, random_string
29+
from arcticdb.util.utils import DFGenerator
2730
from arcticdb.version_store.library import Library, ReadRequest
2831
from arcticdb.version_store.processing import QueryBuilder
2932
from arcticdb.version_store._store import NativeVersionStore
30-
from tests.util.mark import MACOS, SLOW_TESTS_MARK, WINDOWS, MEMRAY_SUPPORTED, MEMRAY_TESTS_MARK, SKIP_CONDA_MARK
33+
from arcticdb_ext.version_store import PythonVersionStoreReadOptions
34+
from tests.util.mark import LINUX, MACOS, SLOW_TESTS_MARK, WINDOWS, MEMRAY_SUPPORTED, MEMRAY_TESTS_MARK, SKIP_CONDA_MARK
3135

3236

3337
logging.basicConfig(level=logging.INFO)
3438
logger = logging.getLogger("Memory_tests")
3539

40+
## IMPORTANT !!!
41+
##
42+
## All memory tests MUST be done with fixtures that return Library object
43+
## and not NativeVersionStore. Reason is that the last is thick wrapper which
44+
## is hiding some possible problems, therefore all tests have to be done with what
45+
## customer works on
46+
3647

3748
# region HELPER functions for non-memray tests
3849

@@ -590,6 +601,30 @@ def is_relevant(stack: Stack) -> bool:
590601
# do something to check if we need this to be added
591602
# as mem leak
592603
# print(f"SAMPLE >>> {frame.filename}:{frame.function}[{frame.lineno}]")
604+
frame_info_str = f"{frame.filename}:{frame.function}:[{frame.lineno}]"
605+
606+
if "folly::CPUThreadPoolExecutor::CPUTask" in frame_info_str:
607+
logger.warning(f"Frame excluded : {frame_info_str}")
608+
logger.warning(f'''Explanation : These are on purpose, and they come from the interaction of
609+
multi-threading and forking. When Python forks, the task-scheduler has a linked-list
610+
of tasks to execute, but there is a global lock held that protects the thread-local state.
611+
We can't free the list without accessing the global thread-local storage singleton,
612+
and that is protected by a lock which is now held be a thread that is in a different
613+
process, so it will never be unlocked in the child. As a work-around we intentionally
614+
leak the task-scheduler and replace it with a new one, in this method:
615+
https://github.com/man-group/ArcticDB/blob/master/cpp/arcticdb/async/task_scheduler.cpp#L34
616+
617+
It's actually due to a bug in folly, because the lock around the thread-local
618+
storage manager has a compile-time token that should be used to differentiate it
619+
from other locks, but it has been constructed with type void as have other locks.
620+
It's possible they might fix it at some point in which case we can free the memory.
621+
Or we do have a vague intention to move away from folly for async if we
622+
find something better
623+
624+
Great that it is catching this, as it's the one case in the whole project where I know
625+
for certain that it does leak memory (and only because there's no alternative''')
626+
return False
627+
593628
pass
594629
return True
595630

@@ -727,3 +762,127 @@ def test_mem_leak_read_all_arctic_lib_memray(library_with_big_symbol_):
727762
logger.info(f"Test took : {time.time() - st}")
728763

729764
gc.collect()
765+
766+
@pytest.fixture
767+
def lmdb_library(lmdb_storage, lib_name, request) -> Generator[Library, None, None]:
768+
"""
769+
Allows passing library creation parameters as parameters of the test or other fixture.
770+
Example:
771+
772+
773+
@pytest.mark.parametrize("lmdb_library_any", [
774+
{'library_options': LibraryOptions(rows_per_segment=100, columns_per_segment=100)}
775+
], indirect=True)
776+
def test_my_test(lmdb_library_any):
777+
.....
778+
"""
779+
params = request.param if hasattr(request, 'param') else {}
780+
yield lmdb_storage.create_arctic().create_library(name=lib_name, **params)
781+
782+
783+
@pytest.fixture
784+
def prepare_head_tails_symbol(lmdb_library):
785+
"""
786+
This fixture is part of test `test_mem_leak_head_tail_memray`
787+
788+
It creates a symbol with several versions and snapshot for each version.
789+
It inserts dataframes which are large given the segment size of library.
790+
And if the dynamic schema is used each version has more columns than previous version
791+
792+
Should not be reused
793+
"""
794+
lib: Library = lmdb_library
795+
opts = lib.options()
796+
797+
total_number_columns = 1002
798+
symbol = "asdf12345"
799+
num_rows_list = [279,199,1,350,999,0,1001]
800+
snapshot_names = []
801+
for rows in num_rows_list:
802+
st = time.time()
803+
df = DFGenerator.generate_wide_dataframe(num_rows=rows, num_cols=total_number_columns, num_string_cols=25,
804+
start_time=pd.Timestamp(0),seed=64578)
805+
lib.write(symbol,df)
806+
snap = f"{symbol}_{rows}"
807+
lib.snapshot(snap)
808+
snapshot_names.append(snap)
809+
logger.info(f"Generated {rows} in {time.time() - st} sec")
810+
if opts.dynamic_schema:
811+
# Dynamic libraries are dynamic by nature so the test should cover that
812+
# characteristic
813+
total_number_columns += 20
814+
logger.info(f"Total number of columns increased to {total_number_columns}")
815+
816+
all_columns = df.columns.to_list()
817+
yield (lib, symbol, num_rows_list, snapshot_names, all_columns)
818+
lib.delete(symbol=symbol)
819+
820+
821+
@MEMRAY_TESTS_MARK
822+
@SLOW_TESTS_MARK
823+
## Linux is having quite huge location there will be separate issue to investigate why
824+
@pytest.mark.limit_leaks(location_limit="1000 KB" if LINUX else "52 KB", filter_fn=is_relevant)
825+
@pytest.mark.parametrize("lmdb_library", [
826+
{'library_options': LibraryOptions(rows_per_segment=233, columns_per_segment=197, dynamic_schema=True, encoding_version=EncodingVersion.V2)},
827+
{'library_options': LibraryOptions(rows_per_segment=99, columns_per_segment=99, dynamic_schema=False, encoding_version=EncodingVersion.V1)}
828+
], indirect=True)
829+
def test_mem_leak_head_tail_memray(prepare_head_tails_symbol):
830+
"""
831+
This test aims to test `head` and `tail` functions if they do leak memory.
832+
The creation of initial symbol (test environment precondition) is in specialized fixture
833+
so that memray does not detect memory there as this will slow the process many times
834+
"""
835+
836+
symbol: str
837+
num_rows_list: List[int]
838+
store: NativeVersionStore = None
839+
snapshot_names: List[str]
840+
all_columns: List[str]
841+
(store, symbol, num_rows_list, snapshot_names, all_columns) = prepare_head_tails_symbol
842+
843+
start_test: float = time.time()
844+
max_rows:int = max(num_rows_list)
845+
846+
np.random.seed(959034)
847+
# constructing a list of head and tail rows to be selected
848+
num_rows_to_select = []
849+
important_values = [0, 1, 0 -1, 2, -2, max_rows, -max_rows ] # some boundary cases
850+
num_rows_to_select.extend(important_values)
851+
num_rows_to_select.extend(np.random.randint(low=5, high=99, size=7)) # add 7 more random values
852+
# number of iterations will be the list length/size
853+
iterations = len(num_rows_to_select)
854+
# constructing a random list of values for snapshot names for each iteration
855+
snapshots_list: List[str] = np.random.choice(snapshot_names, iterations)
856+
# constructing a random list of values for versions names for each iteration
857+
versions_list: List[int] = np.random.randint(0, len(num_rows_list) - 1, iterations)
858+
# constructing a random list of number of columns to be selected
859+
number_columns_for_selection_list: List[int] = np.random.randint(0, len(all_columns)-1, iterations)
860+
861+
count: int = 0
862+
# We will execute several time all head/tail operations with specific number of columns.
863+
# the number of columns consist of random columns and boundary cases see definition above
864+
for rows in num_rows_to_select:
865+
selected_columns:List[str] = np.random.choice(all_columns, number_columns_for_selection_list[count], replace=False).tolist()
866+
snap: str = snapshots_list[count]
867+
ver: str = int(versions_list[count])
868+
logger.info(f"rows {rows} / snapshot {snap}")
869+
df1: pd.DataFrame = store.head(n=rows, as_of=snap, symbol=symbol).data
870+
df2: pd.DataFrame = store.tail(n=rows, as_of=snap, symbol=symbol).data
871+
df3: pd.DataFrame = store.head(n=rows, as_of=ver, symbol=symbol, columns=selected_columns).data
872+
difference = list(set(df3.columns.to_list()).difference(set(selected_columns)))
873+
assert len(difference) == 0, f"Columns not included : {difference}"
874+
df4: pd.DataFrame = store.tail(n=rows, as_of=ver, symbol=symbol, columns=selected_columns).data
875+
difference = list(set(df4.columns.to_list()).difference(set(selected_columns)))
876+
assert len(difference) == 0, f"Columns not included : {difference}"
877+
878+
logger.info(f"Iteration {count} / {iterations} completed")
879+
count += 1
880+
del selected_columns, df1, df2, df3, df4
881+
882+
del store, symbol, num_rows_list, snapshot_names, all_columns
883+
del num_rows_to_select, important_values, snapshots_list, versions_list, number_columns_for_selection_list
884+
gc.collect()
885+
time.sleep(10) # collection is not immediate
886+
logger.info(f"Test completed in {time.time() - start_test}")
887+
888+

0 commit comments

Comments
 (0)