Skip to content

Commit

Permalink
IMPALA-13040
Browse files Browse the repository at this point in the history
  • Loading branch information
Asmoday committed May 30, 2024
1 parent ccb5a85 commit 749f4d1
Show file tree
Hide file tree
Showing 14 changed files with 304 additions and 38 deletions.
6 changes: 6 additions & 0 deletions be/src/runtime/query-exec-mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ DEFINE_int32(query_exec_mgr_cancellation_thread_pool_size, 1,
"(Advanced) Size of the QueryExecMgr thread-pool processing cancellations due to "
"coordinator failure");

DECLARE_int32(krpc_port);

const uint32_t QUERY_EXEC_MGR_MAX_CANCELLATION_QUEUE_SIZE = 65536;

QueryExecMgr::QueryExecMgr() {
Expand All @@ -78,6 +80,10 @@ Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request,
bool dummy;
QueryState* qs =
GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy);
RETURN_IF_ERROR(DebugAction(query_ctx.client_request.query_options.debug_action,
"QUERY_STATE_BEFORE_INIT_GLOBAL"));
RETURN_IF_ERROR(DebugAction(query_ctx.client_request.query_options.debug_action,
"QUERY_STATE_BEFORE_INIT", {std::to_string(FLAGS_krpc_port)}));
Status status = qs->Init(request, fragment_info);
if (!status.ok()) {
qs->ReleaseBackendResourceRefcount(); // Release refcnt acquired in Init().
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/query-state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,11 @@ bool QueryState::codegen_cache_enabled() const {
&& ExecEnv::GetInstance()->codegen_cache_enabled();
}

bool QueryState::is_initialized() {
std::lock_guard<std::mutex> l(init_lock_);
return is_initialized_;
}

bool QueryState::StartFInstances() {
VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id())
<< " #instances=" << fragment_info_.fragment_instance_ctxs.size();
Expand Down
18 changes: 11 additions & 7 deletions be/src/runtime/query-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class QueryState {
return query_ctx_.client_request.query_options;
}
bool codegen_cache_enabled() const;
bool is_initialized();
MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
RuntimeProfile* host_profile() const { return host_profile_; }
const NodeToFileSchedulings* node_to_file_schedulings() const {
Expand Down Expand Up @@ -320,6 +321,16 @@ class QueryState {
return fragment_state_map_;
}

/// Returns true if the query has reached a terminal state.
bool IsTerminalState() const {
// Read into local variable to protect against concurrent modification
// of backend_exec_state_.
BackendExecState exec_state = backend_exec_state_;
return exec_state == BackendExecState::FINISHED
|| exec_state == BackendExecState::CANCELLED
|| exec_state == BackendExecState::ERROR;
}

private:
friend class QueryExecMgr;

Expand Down Expand Up @@ -548,13 +559,6 @@ class QueryState {
return !overall_status_.ok() && !overall_status_.IsCancelled();
}

/// Returns true if the query has reached a terminal state.
bool IsTerminalState() const {
return backend_exec_state_ == BackendExecState::FINISHED
|| backend_exec_state_ == BackendExecState::CANCELLED
|| backend_exec_state_ == BackendExecState::ERROR;
}

/// Updates the BackendExecState based on 'overall_status_'. Should only be called when
/// the current state is a non-terminal state. The transition can either be to the next
/// legal state or ERROR if 'overall_status_' is an error. Called by the query state
Expand Down
17 changes: 13 additions & 4 deletions be/src/runtime/runtime-filter-bank.cc
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ void RuntimeFilterBank::DistributeCompleteFilter(
++num_inflight_rpcs_;
}

int32_t remaining_wait_time_ms =
max(0, GetRuntimeFilterWaitTime() - complete_filter->TimeSinceRegistrationMs());
params.set_remaining_filter_wait_time_ms(remaining_wait_time_ms);

if (to_coordinator) {
proxy->UpdateFilterAsync(params, res, controller,
boost::bind(
Expand Down Expand Up @@ -743,10 +747,7 @@ vector<unique_lock<SpinLock>> RuntimeFilterBank::LockAllFilters() {
}

void RuntimeFilterBank::SendIncompleteFilters() {
int32_t wait_time_ms = FLAGS_runtime_filter_wait_time_ms;
if (query_state_->query_options().runtime_filter_wait_time_ms > 0) {
wait_time_ms = query_state_->query_options().runtime_filter_wait_time_ms;
}
int32_t wait_time_ms = GetRuntimeFilterWaitTime();

bool try_wait_aggregation = !cancelled_;
for (auto& entry : filters_) {
Expand Down Expand Up @@ -845,6 +846,14 @@ void RuntimeFilterBank::Close() {
filter_mem_tracker_->Close();
}

int32_t RuntimeFilterBank::GetRuntimeFilterWaitTime() const {
int32_t wait_time_ms = FLAGS_runtime_filter_wait_time_ms;
if (query_state_->query_options().runtime_filter_wait_time_ms > 0) {
wait_time_ms = query_state_->query_options().runtime_filter_wait_time_ms;
}
return wait_time_ms;
}

RuntimeFilterBank::ProducedFilter::ProducedFilter(
int pending_producers, int pending_remotes, RuntimeFilter* result_filter)
: result_filter(result_filter),
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/runtime-filter-bank.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ class RuntimeFilterBank {
/// Disable a bloom filter by replacing it with an ALWAYS_TRUE_FILTER.
/// Return a pointer to the new runtime filter.
RuntimeFilter* DisableBloomFilter(std::unique_ptr<RuntimeFilter>& bloom_filter);

int32_t GetRuntimeFilterWaitTime() const;
};

}
Expand Down
17 changes: 16 additions & 1 deletion be/src/scheduling/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ using namespace apache::thrift;
using namespace org::apache::impala::fb;
using namespace strings;

DEFINE_bool_hidden(sort_runtime_filter_aggregator_candidates, false,
"Control whether to sort intermediate runtime filter aggregator candidates based on "
"their KRPC address. Only used for testing.");

namespace impala {

static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
Expand Down Expand Up @@ -334,7 +338,18 @@ void Scheduler::ComputeRandomKrpcForAggregation(const ExecutorConfig& executor_c
int num_agg = (int)ceil((double)num_non_coordinator_host / num_filters_per_host);
DCHECK_GT(num_agg, 0);

std::shuffle(instance_groups.begin(), instance_groups.end(), *state->rng());
if (UNLIKELY(FLAGS_sort_runtime_filter_aggregator_candidates)) {
sort(instance_groups.begin(), instance_groups.end(),
[src_state](InstanceToAggPairs a, InstanceToAggPairs b) {
int idx_a = a[0].first;
int idx_b = b[0].first;
return CompareNetworkAddressPB(src_state->instance_states[idx_a].krpc_host,
src_state->instance_states[idx_b].krpc_host)
< 0;
});
} else {
std::shuffle(instance_groups.begin(), instance_groups.end(), *state->rng());
}
if (coordinator_instances.size() > 0) {
// Put coordinator group behind so that coordinator won't be selected as intermediate
// aggregator.
Expand Down
61 changes: 48 additions & 13 deletions be/src/service/data-stream-service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ DEFINE_string(datastream_service_queue_mem_limit, "5%", QUEUE_LIMIT_MSG.c_str())
DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of threads for processing "
"datastream services' RPCs. If left at default value 0, it will be set to number of "
"CPU cores. Set it to a positive value to change from the default.");
DEFINE_int32_hidden(update_filter_min_wait_time_ms, 500,
"Minimum time for UpdateFilterFromRemote RPC to wait until destination QueryState is "
"ready.");
DECLARE_string(debug_actions);

namespace impala {
Expand Down Expand Up @@ -128,20 +131,52 @@ void DataStreamService::UpdateFilterFromRemote(
DCHECK(req->has_query_id());
DCHECK(
req->has_bloom_filter() || req->has_min_max_filter() || req->has_in_list_filter());
QueryState::ScopedRef qs(ProtoToQueryId(req->query_id()));

if (qs.get() != nullptr) {
qs->UpdateFilterFromRemote(*req, context);
RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
} else {
// Query state for requested query_id might have been cancelled or closed.
// i.e., RUNTIME_FILTER_WAIT_TIME_MS has passed and all fragment instances of
// query_id has complete their execution.
string err_msg = Substitute("Query State not found for query_id=$0",
PrintId(ProtoToQueryId(req->query_id())));
LOG(INFO) << err_msg;
RespondAndReleaseRpc(Status(err_msg), resp, context, mem_tracker_.get());
int64_t arrival_time = MonotonicMillis();

// Loop until destination QueryState is ready to accept filter update from remote.
// Sleep for few miliseconds in-between and break after 500ms grace period passed.
// The grace period is short so that RPC thread is not blocked for too long.
// This is a much simpler mechanism than KrpcDataStreamMgr::AddData.
// TODO: Revisit this with more sophisticated deferral mechanism if needed.
bool query_found = false;
int64_t total_wait_time = 0;
int32_t sleep_duration_ms = 2;
if (req->remaining_filter_wait_time_ms() < FLAGS_update_filter_min_wait_time_ms) {
LOG(INFO) << "UpdateFilterFromRemote RPC called with remaining wait time "
<< req->remaining_filter_wait_time_ms() << " ms, less than "
<< FLAGS_update_filter_min_wait_time_ms << " ms minimum wait time.";
}

do {
{
QueryState::ScopedRef qs(ProtoToQueryId(req->query_id()));
query_found |= (qs.get() != nullptr);
if (query_found) {
if (qs.get() == nullptr || qs->IsTerminalState()) {
// Query was found, but now is either missing or in terminal state.
// Break the loop and response with an error.
break;
} else if (qs->is_initialized()) {
qs->UpdateFilterFromRemote(*req, context);
RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
return;
}
}
}
usleep(sleep_duration_ms * 1000);
// double sleep time for next iteration up to 128ms.
if (2 * sleep_duration_ms <= 128) sleep_duration_ms *= 2;
total_wait_time = MonotonicMillis() - arrival_time;
} while (total_wait_time < FLAGS_update_filter_min_wait_time_ms);

// Query state for requested query_id might have been cancelled, closed, or not ready.
// i.e., RUNTIME_FILTER_WAIT_TIME_MS has passed and all fragment instances of
// query_id has complete their execution.
string err_msg = Substitute("QueryState for query_id=$0 $1 after $2 ms",
PrintId(ProtoToQueryId(req->query_id())),
query_found ? "no longer running" : "not found", total_wait_time);
LOG(INFO) << err_msg;
RespondAndReleaseRpc(Status(err_msg), resp, context, mem_tracker_.get());
}

void DataStreamService::PublishFilter(
Expand Down
7 changes: 2 additions & 5 deletions be/src/service/query-state-record.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,7 @@ QueryStateExpanded::QueryStateExpanded(const ClientRequestState& exec_state,
// Per-Host Metrics
for (int i =0; i < exec_state.schedule()->backend_exec_params_size(); i++) {
const BackendExecParamsPB& b = exec_state.schedule()->backend_exec_params(i);
TNetworkAddress host;
host.hostname = b.address().hostname();
host.uds_address = b.address().uds_address();
host.port = b.address().port();
TNetworkAddress host = FromNetworkAddressPB(b.address());

PerHostState state;
state.fragment_instance_count = b.instance_params_size();
Expand Down Expand Up @@ -445,4 +442,4 @@ EventsTimelineIterator EventsTimelineIterator::end() {
return EventsTimelineIterator(labels_, timestamps_, labels_->size());
}

} // namespace impala
} // namespace impala
93 changes: 93 additions & 0 deletions be/src/util/network-util-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ TEST(NetworkUtil, NetAddrCompHostnameDiff) {

ASSERT_TRUE(fixture(first, second));
ASSERT_FALSE(fixture(second, first));
ASSERT_TRUE(
CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second))
< 0);
ASSERT_TRUE(
CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first))
> 0);
}

// Assert where host fields are equal but port is different.
Expand All @@ -59,6 +65,12 @@ TEST(NetworkUtil, NetAddrCompPortDiff) {

ASSERT_TRUE(fixture(first, second));
ASSERT_FALSE(fixture(second, first));
ASSERT_TRUE(
CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second))
< 0);
ASSERT_TRUE(
CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first))
> 0);
}

// Assert where host and port fields are equal but uds address is different.
Expand All @@ -77,6 +89,12 @@ TEST(NetworkUtil, NetAddrCompUDSAddrDiff) {

ASSERT_TRUE(fixture(first, second));
ASSERT_FALSE(fixture(second, first));
ASSERT_TRUE(
CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second))
< 0);
ASSERT_TRUE(
CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first))
> 0);
}

// Assert where all three comparison fields are equal.
Expand All @@ -95,6 +113,81 @@ TEST(NetworkUtil, NetAddrUDSAddrSame) {

ASSERT_FALSE(fixture(first, second));
ASSERT_FALSE(fixture(second, first));
ASSERT_TRUE(
CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second))
== 0);
ASSERT_TRUE(
CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first))
== 0);
}

// Assert where host and port fields are equal first address does not have
// uds address set.
TEST(NetworkUtil, NetAddrOneMissUDSAddr) {
TNetworkAddressComparator fixture;
TNetworkAddress first;
TNetworkAddress second;

first.__set_hostname("host");
first.__set_port(0);

second.__set_hostname("host");
second.__set_port(0);
second.__set_uds_address("");

ASSERT_TRUE(fixture(first, second));
ASSERT_FALSE(fixture(second, first));
ASSERT_TRUE(
CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second))
< 0);
ASSERT_TRUE(
CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first))
> 0);
}

// Assert where host and port fields are equal and both address does not have
// uds address set.
TEST(NetworkUtil, NetAddrAllMissUDSAddr) {
TNetworkAddressComparator fixture;
TNetworkAddress first;
TNetworkAddress second;

first.__set_hostname("host");
first.__set_port(0);

second.__set_hostname("host");
second.__set_port(0);

ASSERT_FALSE(fixture(first, second));
ASSERT_FALSE(fixture(second, first));
ASSERT_TRUE(
CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second))
== 0);
ASSERT_TRUE(
CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first))
== 0);
}

void CheckTranslation(TNetworkAddress thrift_address) {
NetworkAddressPB proto_address = FromTNetworkAddress(thrift_address);
TNetworkAddress thrift_address2 = FromNetworkAddressPB(proto_address);
NetworkAddressPB proto_address2 = FromTNetworkAddress(thrift_address2);

TNetworkAddressComparator fixture;
ASSERT_FALSE(fixture(thrift_address, thrift_address2));
ASSERT_FALSE(fixture(thrift_address2, thrift_address));
ASSERT_TRUE(CompareNetworkAddressPB(proto_address, proto_address2) == 0);
ASSERT_TRUE(CompareNetworkAddressPB(proto_address2, proto_address) == 0);
}

// Assert consistent translation between TNetworkAddress and NetworkAddressPB.
TEST(NetworkUtil, NetAddrTranslation) {
TNetworkAddress addr;
addr.__set_hostname("host");
addr.__set_port(0);
CheckTranslation(addr);
addr.__set_uds_address("uds");
CheckTranslation(addr);
}

} // namespace impala
Loading

0 comments on commit 749f4d1

Please sign in to comment.