Skip to content

Commit

Permalink
Bugfix/issue 3807 setting not apply in v8 heap limit (#3816)
Browse files Browse the repository at this point in the history
* switch to function arg passed context, instead thread context

* switch the default v8 heap soft limit from 100MB to 200 MB

* move log to ctor. output like
```
2024.03.01 23:52:39.428662 [ 200546 ] {0b772a75-c715-4ffc-8a9f-c980c835842c} <Information> JavaScriptAggregateFunction: udf name=test_sec_large, javascript_max_memory_bytes=23
```
  • Loading branch information
yokofly committed Dec 11, 2024
1 parent 2b89ef1 commit 855596d
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 23 deletions.
2 changes: 1 addition & 1 deletion programs/server/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ settings:
part_commit_pool_size: 8 # Total shared thread pool size for building and committing parts for Stream
max_idempotent_ids: 1000 # Maximum idempotent IDs to keep in memory and on disk for idempotent data ingestion
_tp_internal_system_open_sesame: true # Control the access to system.* streams
javascript_max_memory_bytes: 104857600 #Maximum heap size of javascript UDA/UDF in bytes, default is 100*1024*1024 bytes
javascript_max_memory_bytes: 204857600 #Maximum heap size of javascript UDA/UDF in bytes, default is 100*1024*1024 bytes
recovery_policy: "strict" # Recovery policy for materialized view. strict or best_effort
recovery_retry_for_sn_failure: 3 # retry times for sn failure. this value only apply if the `recovery_policy` is `best_effort`
max_block_size: 65409 # 65536 - (PADDING_FOR_SIMD - 1)
Expand Down
2 changes: 1 addition & 1 deletion programs/server/embedded.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
<part_commit_pool_size>8</part_commit_pool_size>
<max_idempotent_ids>1000</max_idempotent_ids>
<_tp_internal_system_open_sesame>true</_tp_internal_system_open_sesame>
<javascript_max_memory_bytes>104857600</javascript_max_memory_bytes>
<javascript_max_memory_bytes>204857600</javascript_max_memory_bytes>
</global>
<stream>
<default_shards>1</default_shards>
Expand Down
11 changes: 7 additions & 4 deletions src/AggregateFunctions/AggregateFunctionFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
const DataTypes & argument_types,
const Array & parameters,
AggregateFunctionProperties & out_properties,
ContextPtr context,
bool is_changelog_input) const
/// proton: ends
{
Expand All @@ -98,7 +99,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
[](const auto & type) { return type->onlyNull(); });

AggregateFunctionPtr nested_function = getImpl(
name, nested_types, nested_parameters, out_properties, has_null_arguments, is_changelog_input);
name, nested_types, nested_parameters, out_properties, has_null_arguments, context, is_changelog_input);

// Pure window functions are not real aggregate functions. Applying
// combinators doesn't make sense for them, they must handle the
Expand All @@ -109,7 +110,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
return combinator->transformAggregateFunction(nested_function, out_properties, types_without_low_cardinality, parameters);
}

auto with_original_arguments = getImpl(name, types_without_low_cardinality, parameters, out_properties, false, is_changelog_input);
auto with_original_arguments = getImpl(name, types_without_low_cardinality, parameters, out_properties, false, context, is_changelog_input);

if (!with_original_arguments)
throw Exception("Logical error: AggregateFunctionFactory returned nullptr", ErrorCodes::LOGICAL_ERROR);
Expand All @@ -123,6 +124,7 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
const Array & parameters,
AggregateFunctionProperties & out_properties,
bool has_null_arguments,
ContextPtr context,
bool is_changelog_input) const
/// proton: ends
{
Expand Down Expand Up @@ -202,7 +204,7 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
}

/// proton: starts. Check user defined aggr function
auto aggr = UserDefinedFunctionFactory::getAggregateFunction(name, argument_types, parameters, out_properties, is_changelog_input);
auto aggr = UserDefinedFunctionFactory::getAggregateFunction(name, argument_types, parameters, out_properties, context, is_changelog_input);
if (aggr)
return aggr;
/// proton: ends
Expand All @@ -225,12 +227,13 @@ AggregateFunctionPtr AggregateFunctionFactory::tryGet(
const DataTypes & argument_types,
const Array & parameters,
AggregateFunctionProperties & out_properties,
ContextPtr context,
bool is_changelog_input) const
/// proton: ends
{
return isAggregateFunctionName(name)
/// proton: starts
? get(name, argument_types, parameters, out_properties, is_changelog_input)
? get(name, argument_types, parameters, out_properties, context, is_changelog_input)
/// proton: ends
: nullptr;
}
Expand Down
3 changes: 3 additions & 0 deletions src/AggregateFunctions/AggregateFunctionFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class AggregateFunctionFactory final : private boost::noncopyable, public IFacto
const DataTypes & argument_types,
const Array & parameters,
AggregateFunctionProperties & out_properties,
ContextPtr context = nullptr,
bool is_changelog_input = false) const;

/// Returns nullptr if not found.
Expand All @@ -77,6 +78,7 @@ class AggregateFunctionFactory final : private boost::noncopyable, public IFacto
const DataTypes & argument_types,
const Array & parameters,
AggregateFunctionProperties & out_properties,
ContextPtr context = nullptr,
bool is_changelog_input = false) const;
/// proton: ends

Expand All @@ -97,6 +99,7 @@ class AggregateFunctionFactory final : private boost::noncopyable, public IFacto
const Array & parameters,
AggregateFunctionProperties & out_properties,
bool has_null_arguments,
ContextPtr context,
bool is_changelog_input = false) const;
/// proton: ends

Expand Down
2 changes: 2 additions & 0 deletions src/AggregateFunctions/AggregateFunctionJavaScriptAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@ AggregateFunctionJavaScriptAdapter::AggregateFunctionJavaScriptAdapter(
, is_changelog_input(is_changelog_input_)
, max_v8_heap_size_in_bytes(max_v8_heap_size_in_bytes_)
, blueprint(config->name, config->source)
, logger(&Poco::Logger::get("JavaScriptAggregateFunction"))
{
LOG_INFO(logger, "udf name={}, javascript_max_memory_bytes={}", config->name, max_v8_heap_size_in_bytes);
}

String AggregateFunctionJavaScriptAdapter::getName() const
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class AggregateFunctionJavaScriptAdapter final : public IAggregateFunctionHelper
bool is_changelog_input = false;
size_t max_v8_heap_size_in_bytes;
JavaScriptBlueprint blueprint;

Poco::Logger * logger;
public:
AggregateFunctionJavaScriptAdapter(
JavaScriptUserDefinedFunctionConfigurationPtr config_,
Expand Down
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, aysnc_ingest_max_outstanding_blocks, 10000, "Max outstanding blocks to be committed during async ingestion", 0) \
M(Bool, _tp_internal_system_open_sesame, true, "Control the access to system.* streams", 0) \
M(Bool, is_internal, false, "Control the statistics of select query", 0) \
M(UInt64, javascript_max_memory_bytes, 100 * 1024 * 1024, "Maximum heap size of javascript UDA/UDF in bytes", 0) \
M(UInt64, javascript_max_memory_bytes, 200 * 1024 * 1024, "Maximum heap size of javascript UDA/UDF in bytes", 0) \
M(Bool, enable_dependency_check, true, "Enable the dependency check of view/materialized view", 0) \
M(RecoveryPolicy, recovery_policy, RecoveryPolicy::Strict, "Default recovery policy for materialized view when inner query failed. 'strict': always recover from checkpointed; 'best_effort': attempts to recover from checkpointed and allow skipping of some data with permanent errors;", 0) \
M(UInt64, recovery_retry_for_sn_failure, 3, "Default retry times for sn failure. only apply for 'best_effort': attempts to recover from checkpointed and allow skipping of some data with permanent errors;", 0) \
Expand Down
23 changes: 10 additions & 13 deletions src/Functions/UserDefined/UserDefinedFunctionFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ extern const int UNKNOWN_FUNCTION;
/// proton: ends
}

UserDefinedFunctionFactory::UserDefinedFunctionFactory() : logger(&Poco::Logger::get("UserDefinedFunctionFactory"))
{
}

UserDefinedFunctionFactory & UserDefinedFunctionFactory::instance()
{
static UserDefinedFunctionFactory result;
Expand Down Expand Up @@ -68,6 +72,7 @@ AggregateFunctionPtr UserDefinedFunctionFactory::getAggregateFunction(
const DataTypes & types,
const Array & parameters,
AggregateFunctionProperties & /*properties*/,
ContextPtr context,
bool is_changelog_input)
{
const auto & loader = ExternalUserDefinedFunctionsLoader::instance(nullptr);
Expand Down Expand Up @@ -107,18 +112,14 @@ AggregateFunctionPtr UserDefinedFunctionFactory::getAggregateFunction(
size_t num_of_args = config->arguments.size();
validate_arguments(types.back()->getName() == "int8" ? num_of_args : num_of_args - 1);

ContextPtr query_context;
if (CurrentThread::isInitialized())
query_context = CurrentThread::get().getQueryContext();

if (!query_context || !query_context->getSettingsRef().javascript_max_memory_bytes)
if (!context || !context->getSettingsRef().javascript_max_memory_bytes)
{
LOG_ERROR(&Poco::Logger::get("UserDefinedFunctionFactory"), "query_context is invalid");
LOG_ERROR(instance().getLogger(), "query_context is invalid");
return nullptr;
}

return std::make_shared<AggregateFunctionJavaScriptAdapter>(
config, types, parameters, is_changelog_input, query_context->getSettingsRef().javascript_max_memory_bytes);
config, types, parameters, is_changelog_input, context->getSettingsRef().javascript_max_memory_bytes);
}

return nullptr;
Expand Down Expand Up @@ -160,7 +161,7 @@ FunctionOverloadResolverPtr UserDefinedFunctionFactory::tryGet(const String & fu
/// proton: starts
try
{
return get(function_name,std::move(context));
return get(function_name, std::move(context));
}
catch (Exception &)
{
Expand Down Expand Up @@ -196,11 +197,7 @@ std::vector<String> UserDefinedFunctionFactory::getRegisteredNames(ContextPtr co

/// proton: starts
bool UserDefinedFunctionFactory::registerFunction(
ContextPtr context,
const String & function_name,
Poco::JSON::Object::Ptr json_func,
bool throw_if_exists,
bool replace_if_exists)
ContextPtr context, const String & function_name, Poco::JSON::Object::Ptr json_func, bool throw_if_exists, bool replace_if_exists)
{
Streaming::validateUDFName(function_name);

Expand Down
7 changes: 7 additions & 0 deletions src/Functions/UserDefined/UserDefinedFunctionFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class UserDefinedFunctionFactory
const DataTypes & types,
const Array & parameters,
AggregateFunctionProperties & properties,
ContextPtr context,
/// whether input of aggregation function is changelog, aggregate function does not pass _tp_delta column to UDA if it is false
bool is_changelog_input = false);

Expand All @@ -53,6 +54,12 @@ class UserDefinedFunctionFactory
static bool has(const String & function_name, ContextPtr context);

static std::vector<String> getRegisteredNames(ContextPtr context);

Poco::Logger * getLogger() const { return logger; }

private:
UserDefinedFunctionFactory();
Poco::Logger * logger;
};

}
4 changes: 2 additions & 2 deletions src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ AggregateFunctionPtr getAggregateFunction(
/// Examples: Translate `quantile(x, 0.5)` to `quantile(0.5)(x)`
tryTranslateToParametricAggregateFunction(node, types, parameters, argument_names, context);
if (throw_if_empty)
return AggregateFunctionFactory::instance().get(node->name, types, parameters, properties, is_changelog_input);
return AggregateFunctionFactory::instance().get(node->name, types, parameters, properties, context, is_changelog_input);
else
return AggregateFunctionFactory::instance().tryGet(node->name, types, parameters, properties, is_changelog_input);
return AggregateFunctionFactory::instance().tryGet(node->name, types, parameters, properties, context, is_changelog_input);
}
/// proton: ends.
}
Expand Down

0 comments on commit 855596d

Please sign in to comment.