Skip to content

Additional asv tests #2185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,084 changes: 1,042 additions & 42 deletions python/.asv/results/benchmarks.json

Large diffs are not rendered by default.

429 changes: 398 additions & 31 deletions python/arcticdb/util/environment_setup.py

Large diffs are not rendered by default.

125 changes: 118 additions & 7 deletions python/arcticdb/util/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,19 @@
from arcticdb.util.test import create_datetime_index, get_sample_dataframe, random_integers, random_string
from arcticdb.version_store.library import Library


# Types supported by arctic
ArcticIntType = Union[np.uint8, np.uint16, np.uint32, np.uint64, np.int8, np.int16, np.int32, np.int64]
ArcticFloatType = Union[np.float64, np.float32]
ArcticTypes = Union[ArcticIntType, ArcticFloatType, str]
if sys.version_info >= (3, 8):
supported_int_types_list = list(get_args(ArcticIntType))
supported_float_types_list = list(get_args(ArcticFloatType))
supported_types_list = list(get_args(ArcticTypes))
else:
supported_int_types_list = [np.uint8, np.uint16, np.uint32, np.uint64, np.int8, np.int16, np.int32, np.int64]
supported_float_types_list = [np.float64, np.float32]
supported_types_list = [str] + supported_int_types_list + supported_float_types_list

class TimestampNumber:
"""
Expand Down Expand Up @@ -529,15 +538,10 @@ def generate_random_dataframe(cls, rows: int, cols: int, indexed: bool = True, s
cols=int(cols)
rows=int(rows)
np.random.seed(seed)
if sys.version_info >= (3, 8):
dtypes = np.random.choice(list(get_args(ArcticTypes)), cols)
else:
dtypes = [np.uint8, np.uint16, np.uint32, np.uint64,
np.int8, np.int16, np.int32, np.int64,
np.float32, np.float16, str]
dtypes = supported_types_list
gen = DFGenerator(size=rows, seed=seed)
for i in range(cols):
dtype = dtypes[i]
dtype = dtypes[i % len(dtypes)]
if 'int' in str(dtype):
gen.add_int_col(f"col_{i}", dtype)
pass
Expand All @@ -553,3 +557,110 @@ def generate_random_dataframe(cls, rows: int, cols: int, indexed: bool = True, s
gen.add_timestamp_index("index", "s", pd.Timestamp(0))
return gen.generate_dataframe()


@classmethod
def generate_random_int_dataframe(cls, start_name_prefix: str,
num_rows:int, num_cols:int,
dtype: ArcticIntType = np.int64, min_value: int = None, max_value: int = None,
seed: int = 3432) -> pd.DataFrame:
"""
To be used to generate large number of same type columns, when generation time is
critical
"""
np.random.seed(seed=seed)
platform_int_info = np.iinfo("int_")
iinfo = np.iinfo(dtype)
if min_value is None:
min_value = max(iinfo.min, platform_int_info.min)
if max_value is None:
max_value = min(iinfo.max, platform_int_info.max)

data = np.random.randint(min_value, max_value, size=(num_rows, num_cols), dtype= dtype)
columns = [f"{start_name_prefix}_{n}" for n in range(num_cols)]

return pd.DataFrame(data=data, columns=columns)

@classmethod
def generate_random_float_dataframe(cls, start_name_prefix: str, num_rows: int, num_cols: int,
dtype: ArcticFloatType = np.float64,
min_value: float = None, max_value: float = None, round_at: int = None,
seed: int = 54675) -> 'DFGenerator':
"""
To be used to generate large number of same type columns, when generation time is
critical
"""
# Higher numbers will trigger overflow in numpy uniform (-1e307 - 1e307)
# Get the minimum and maximum values for np.float32
info = np.finfo(np.float32)
_max = info.max
_min = info.min
np.random.seed(seed)
if min_value is None:
min_value = max(-1e307, -sys.float_info.max, _min)
if max_value is None:
max_value = min(1e307, sys.float_info.max, _max)
data = np.random.uniform(min_value, max_value, size=(num_rows, num_cols)).astype(dtype)
if round_at is not None:
data = np.round(data, round_at)

columns = [f"{start_name_prefix}_{n}" for n in range(num_cols)]

return pd.DataFrame(data=data, columns=columns)

@classmethod
def generate_random_strings_dataframe(cls, start_name_prefix: str, num_rows: int, num_cols: int,
column_sizes=None, seed: int = 4543):
"""
To be used to generate large number of same type columns, when generation time is
critical
If `column_sizes` not supplied default 10 will be used
"""
if column_sizes is None:
column_sizes = [10] * num_cols
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want different column sizes? Is it to have nulls? I.e. what's the point in column_sizes=[10, 5]?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this as well?

np.random.seed(seed=seed)
data = [[random_string(column_sizes[col])
for col in range(num_cols)]
for _ in range(num_rows)]

columns = [f"{start_name_prefix}_{n}" for n in range(num_cols)]
return pd.DataFrame(data=data, columns=columns)

@classmethod
def generate_wide_dataframe(cls, num_rows: int, num_cols: int,
num_string_cols: int,
start_time: pd.Timestamp = None,
freq: Union[str , timedelta , pd.Timedelta , pd.DateOffset] = 's',
seed = 23445):
"""
Generates as fast as possible specified number of columns.
Uses random arrays generation in numpy to do that
As the strings generation is slowest always be mindful to pass number between 1-1000 max
The generated dataframe will have also index starting at specified `start_time`
"""

cols, mod = divmod(num_cols - num_string_cols,
len (supported_int_types_list + supported_float_types_list )) # divide by number of unique frame types

frames = []
for dtype in supported_int_types_list:
frame = cls.generate_random_int_dataframe(dtype.__name__, num_rows=num_rows, num_cols=cols,
dtype=dtype, seed=seed)
frames.append(frame)

for dtype in supported_float_types_list:
frame = cls.generate_random_float_dataframe(dtype.__name__, num_rows=num_rows, num_cols=cols,
dtype=dtype, seed=seed)
frames.append(frame)

str_frame = cls.generate_random_strings_dataframe("str", num_rows=num_rows, num_cols=num_string_cols)
frames.append(str_frame)

frame: pd.DataFrame = pd.concat(frames, axis=1) # Concatenate horizontally

if start_time:
range = pd.date_range(start=start_time, periods=frame.shape[0], freq=freq, name='index')
frame.index = range

return frame


4 changes: 3 additions & 1 deletion python/benchmarks/local_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
from .common import *


PARAMS_QUERY_BUILDER = [1_000_000, 10_000_000]

class LocalQueryBuilderFunctions:
number = 5
timeout = 6000
LIB_NAME = "query_builder"
CONNECTION_STRING = "lmdb://query_builder?map_size=5GB"

params = [1_000_000, 10_000_000]
params = PARAMS_QUERY_BUILDER
param_names = ["num_rows"]

def setup_cache(self):
Expand Down
130 changes: 130 additions & 0 deletions python/benchmarks/real_batch_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""
Copyright 2025 Man Group Operations Limited

Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.

As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""

import os
import numpy as np
import pandas as pd
from arcticdb.util.environment_setup import Storage, SetupMultipleLibraries
from arcticdb.util.utils import TimestampNumber
from arcticdb.version_store.library import Library, ReadRequest, WritePayload


class AWSBatchBasicFunctions:
"""
This is similar test to :class:`BatchBasicFunctions`
Note that because batch functions are silent we do check if they work correctly along with
peakmem test where this will not influence result in any meaningful way
"""

rounds = 1
number = 3 # invokes 3 times the test runs between each setup-teardown
repeat = 1 # defines the number of times the measurements will invoke setup-teardown
min_run_count = 1
warmup_time = 0

timeout = 1200

SETUP_CLASS = (SetupMultipleLibraries(storage=Storage.AMAZON,
prefix="BASIC_BATCH")
.set_params([[500, 1000], [25_000, 50_000]]) # For test purposes
.set_number_symbols_parameter_index(0)
.set_number_rows_parameter_index(1)
.set_number_columns_parameter_index(None))
params = SETUP_CLASS.get_parameter_list()
param_names = ["num_symbols", "num_rows"]

def setup_cache(self):
set_env = AWSBatchBasicFunctions.SETUP_CLASS
set_env.setup_environment()
info = set_env.get_storage_info()
# NOTE: use only logger defined by setup class
set_env.logger().info(f"storage info object: {info}")
return info

def teardown(self, storage_info, num_symbols, num_rows):
## Delete own modifyable library
self.setup_env.delete_modifiable_library(os.getpid())

def setup(self, storage_info, num_symbols, num_rows):
self.setup_env = SetupMultipleLibraries.from_storage_info(storage_info)

self.lib: Library = self.setup_env.get_library(num_symbols)

# Get generated symbol names
self.symbols = []
for num_symb_idx in range(num_symbols):
sym_name = self.setup_env.get_symbol_name(num_symb_idx, num_rows, self.setup_env.default_number_cols)
self.symbols.append(sym_name)

#Construct read requests (will equal to number of symbols)
self.read_reqs = [ReadRequest(symbol) for symbol in self.symbols]

#Construct read requests based on 2 colmns, not whole DF (will equal to number of symbols)
COLS = self.setup_env.generate_dataframe(0, self.setup_env.default_number_cols).columns[2:4]
self.read_reqs_with_cols = [ReadRequest(symbol, columns=COLS) for symbol in self.symbols]

#Construct dataframe that will be used for write requests, not whole DF (will equal to number of symbols)
self.df: pd.DataFrame = self.setup_env.generate_dataframe(num_rows, self.setup_env.default_number_cols)

#Construct read request with date_range
self.date_range = self.get_last_x_percent_date_range(num_rows, 0.05)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the intention of this date_range? It should be a tuple of start, end date. I think the get_last_x_percent_date_range returns a list of dates with an equal frequency?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually surprised this benchmark even works. Have you tried running them?

self.read_reqs_date_range = [ReadRequest(symbol, date_range=self.date_range) for symbol in self.symbols]

## Make sure each process has its own write library
self.fresh_lib: Library = self.setup_env.get_modifiable_library(os.getpid())

def get_last_x_percent_date_range(self, num_rows, percents):
"""
Returns a date range selecting last X% of rows of dataframe
pass percents as 0.0-1.0
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is probably useful in general and we should move to utils.py?

freq = self.setup_env.index_freq
start = TimestampNumber.from_timestamp(self.setup_env.start_timestamp, freq)
percent_5 = int(num_rows * percents)
end_range: TimestampNumber = start + num_rows
start_range: TimestampNumber = end_range - percent_5
range = pd.date_range(start=start_range.to_timestamp(), end=end_range.to_timestamp(), freq=freq)
return range

def time_read_batch(self, storage_info, num_symbols, num_rows):
read_batch_result = self.lib.read_batch(self.read_reqs)

def time_write_batch(self, storage_info, num_symbols, num_rows):
payloads = [WritePayload(symbol, self.df) for symbol in self.symbols]
write_batch_result = self.fresh_lib.write_batch(payloads)

def time_read_batch_with_columns(self, storage_info, num_symbols, num_rows):
read_batch_result = self.lib.read_batch(self.read_reqs_with_cols)

def peakmem_write_batch(self, storage_info, num_symbols, num_rows):
payloads = [WritePayload(symbol, self.df) for symbol in self.symbols]
write_batch_result = self.fresh_lib.write_batch(payloads)
# Quick check all is ok (will not affect bemchmarks)
assert write_batch_result[0].symbol in self.symbols
assert write_batch_result[-1].symbol in self.symbols

def peakmem_read_batch(self, storage_info, num_symbols, num_rows):
read_batch_result = self.lib.read_batch(self.read_reqs)
# Quick check all is ok (will not affect bemchmarks)
assert read_batch_result[0].data.shape[0] == num_rows
assert read_batch_result[-1].data.shape[0] == num_rows

def peakmem_read_batch_with_columns(self, storage_info, num_symbols, num_rows):
read_batch_result = self.lib.read_batch(self.read_reqs_with_cols)
# Quick check all is ok (will not affect bemchmarks)
assert read_batch_result[0].data.shape[0] == num_rows
assert read_batch_result[-1].data.shape[0] == num_rows

def time_read_batch_with_date_ranges(self, storage_info, num_symbols, num_rows):
self.lib.read_batch(self.read_reqs_date_range)

def peakmem_read_batch_with_date_ranges(self, storage_info, num_symbols, num_rows):
read_batch_result = self.lib.read_batch(self.read_reqs_date_range)
# Quick check all is ok (will not affect bemchmarks)
assert read_batch_result[0].data.shape[0] > 2
assert read_batch_result[-1].data.shape[0] > 2
85 changes: 85 additions & 0 deletions python/benchmarks/real_finalize_staged_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""
Copyright 2025 Man Group Operations Limited

Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.

As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""


import os
from arcticdb.util.environment_setup import Storage, NoSetup
from arcticdb.util.utils import CachedDFGenerator, TimestampNumber, stage_chunks
from arcticdb.version_store.library import StagedDataFinalizeMethod


class AWSFinalizeStagedData:
"""
Checks finalizing staged data. Note, that staged symbols can be finalized only twice,
therefore certain design decisions must be taken in advance so each process sets up
environment for exactly one test
"""

rounds = 1
number = 1
repeat = 1
min_run_count = 1
warmup_time = 0

timeout = 1200

SETUP_CLASS = (NoSetup(storage=Storage.AMAZON,
prefix="FINALIZE"))

params = [500, 1000] # Test data [10, 20]
param_names = ["num_chunks"]

def setup_cache(self):
# Preconditions for this test
assert AWSFinalizeStagedData.number == 1
assert AWSFinalizeStagedData.repeat == 1
assert AWSFinalizeStagedData.rounds == 1
assert AWSFinalizeStagedData.warmup_time == 0

set_env = AWSFinalizeStagedData.SETUP_CLASS
info = set_env.get_storage_info()
set_env.logger().info(f"storage info object: {info}")
df_cache = CachedDFGenerator(500000, [5])
cache = {
"storage_info" : info,
"df_cache" : df_cache
}
return cache

def setup(self, cache, num_chunks: int):
self.df_cache: CachedDFGenerator = cache["df_cache"]
self.set_env = NoSetup.from_storage_info(cache["storage_info"])

self.pid = os.getpid()
self.lib = self.set_env.get_modifiable_library(self.pid)

INITIAL_TIMESTAMP: TimestampNumber = TimestampNumber(
0, self.df_cache.TIME_UNIT
) # Synchronize index frequency

df = self.df_cache.generate_dataframe_timestamp_indexed(200, 0, self.df_cache.TIME_UNIT)
list_of_chunks = [10000] * num_chunks
self.symbol = self.set_env.get_symbol_name_template(self.pid)

self.lib.write(self.symbol, data=df, prune_previous_versions=True)
stage_chunks(self.lib, self.symbol, self.df_cache, INITIAL_TIMESTAMP, list_of_chunks)

def teardown(self, cache: CachedDFGenerator, param: int):
self.set_env.delete_modifiable_library(self.pid)

def time_finalize_staged_data(self, cache: CachedDFGenerator, param: int):
self.set_env.logger().info(f"Library: {self.lib}")
self.set_env.logger().info(f"Symbol: {self.symbol}")
assert self.symbol in self.lib.get_staged_symbols()
self.lib.finalize_staged_data(self.symbol, mode=StagedDataFinalizeMethod.WRITE)

def peakmem_finalize_staged_data(self, cache: CachedDFGenerator, param: int):
self.set_env.logger().info(f"Library: {self.lib}")
self.set_env.logger().info(f"Symbol: {self.symbol}")
assert self.symbol in self.lib.get_staged_symbols()
self.lib.finalize_staged_data(self.symbol, mode=StagedDataFinalizeMethod.WRITE)
Loading
Loading