diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc index 8c5e978703..262f47a6f8 100644 --- a/be/src/runtime/query-exec-mgr.cc +++ b/be/src/runtime/query-exec-mgr.cc @@ -53,6 +53,8 @@ DEFINE_int32(query_exec_mgr_cancellation_thread_pool_size, 1, "(Advanced) Size of the QueryExecMgr thread-pool processing cancellations due to " "coordinator failure"); +DECLARE_int32(krpc_port); + const uint32_t QUERY_EXEC_MGR_MAX_CANCELLATION_QUEUE_SIZE = 65536; QueryExecMgr::QueryExecMgr() { @@ -78,6 +80,10 @@ Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request, bool dummy; QueryState* qs = GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy); + RETURN_IF_ERROR(DebugAction(query_ctx.client_request.query_options.debug_action, + "QUERY_STATE_BEFORE_INIT_GLOBAL")); + RETURN_IF_ERROR(DebugAction(query_ctx.client_request.query_options.debug_action, + "QUERY_STATE_BEFORE_INIT", {std::to_string(FLAGS_krpc_port)})); Status status = qs->Init(request, fragment_info); if (!status.ok()) { qs->ReleaseBackendResourceRefcount(); // Release refcnt acquired in Init(). diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index bf61b26e31..60c043ccfa 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -841,6 +841,11 @@ bool QueryState::codegen_cache_enabled() const { && ExecEnv::GetInstance()->codegen_cache_enabled(); } +bool QueryState::is_initialized() { + std::lock_guard l(init_lock_); + return is_initialized_; +} + bool QueryState::StartFInstances() { VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id()) << " #instances=" << fragment_info_.fragment_instance_ctxs.size(); diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h index 15195d64d5..b907fc8c11 100644 --- a/be/src/runtime/query-state.h +++ b/be/src/runtime/query-state.h @@ -168,6 +168,7 @@ class QueryState { return query_ctx_.client_request.query_options; } bool codegen_cache_enabled() const; + bool is_initialized(); MemTracker* query_mem_tracker() const { return query_mem_tracker_; } RuntimeProfile* host_profile() const { return host_profile_; } const NodeToFileSchedulings* node_to_file_schedulings() const { @@ -320,6 +321,16 @@ class QueryState { return fragment_state_map_; } + /// Returns true if the query has reached a terminal state. + bool IsTerminalState() const { + // Read into local variable to protect against concurrent modification + // of backend_exec_state_. + BackendExecState exec_state = backend_exec_state_; + return exec_state == BackendExecState::FINISHED + || exec_state == BackendExecState::CANCELLED + || exec_state == BackendExecState::ERROR; + } + private: friend class QueryExecMgr; @@ -548,13 +559,6 @@ class QueryState { return !overall_status_.ok() && !overall_status_.IsCancelled(); } - /// Returns true if the query has reached a terminal state. - bool IsTerminalState() const { - return backend_exec_state_ == BackendExecState::FINISHED - || backend_exec_state_ == BackendExecState::CANCELLED - || backend_exec_state_ == BackendExecState::ERROR; - } - /// Updates the BackendExecState based on 'overall_status_'. Should only be called when /// the current state is a non-terminal state. The transition can either be to the next /// legal state or ERROR if 'overall_status_' is an error. Called by the query state diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc index e7b1490505..cf41fbf48f 100644 --- a/be/src/runtime/runtime-filter-bank.cc +++ b/be/src/runtime/runtime-filter-bank.cc @@ -557,6 +557,10 @@ void RuntimeFilterBank::DistributeCompleteFilter( ++num_inflight_rpcs_; } + int32_t remaining_wait_time_ms = + max(0, GetRuntimeFilterWaitTime() - complete_filter->TimeSinceRegistrationMs()); + params.set_remaining_filter_wait_time_ms(remaining_wait_time_ms); + if (to_coordinator) { proxy->UpdateFilterAsync(params, res, controller, boost::bind( @@ -743,10 +747,7 @@ vector> RuntimeFilterBank::LockAllFilters() { } void RuntimeFilterBank::SendIncompleteFilters() { - int32_t wait_time_ms = FLAGS_runtime_filter_wait_time_ms; - if (query_state_->query_options().runtime_filter_wait_time_ms > 0) { - wait_time_ms = query_state_->query_options().runtime_filter_wait_time_ms; - } + int32_t wait_time_ms = GetRuntimeFilterWaitTime(); bool try_wait_aggregation = !cancelled_; for (auto& entry : filters_) { @@ -845,6 +846,14 @@ void RuntimeFilterBank::Close() { filter_mem_tracker_->Close(); } +int32_t RuntimeFilterBank::GetRuntimeFilterWaitTime() const { + int32_t wait_time_ms = FLAGS_runtime_filter_wait_time_ms; + if (query_state_->query_options().runtime_filter_wait_time_ms > 0) { + wait_time_ms = query_state_->query_options().runtime_filter_wait_time_ms; + } + return wait_time_ms; +} + RuntimeFilterBank::ProducedFilter::ProducedFilter( int pending_producers, int pending_remotes, RuntimeFilter* result_filter) : result_filter(result_filter), diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h index b656deedda..d013a15ab6 100644 --- a/be/src/runtime/runtime-filter-bank.h +++ b/be/src/runtime/runtime-filter-bank.h @@ -373,6 +373,8 @@ class RuntimeFilterBank { /// Disable a bloom filter by replacing it with an ALWAYS_TRUE_FILTER. /// Return a pointer to the new runtime filter. RuntimeFilter* DisableBloomFilter(std::unique_ptr& bloom_filter); + + int32_t GetRuntimeFilterWaitTime() const; }; } diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index b3465b36d9..6cc0022334 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -56,6 +56,10 @@ using namespace apache::thrift; using namespace org::apache::impala::fb; using namespace strings; +DEFINE_bool_hidden(sort_runtime_filter_aggregator_candidates, false, + "Control whether to sort intermediate runtime filter aggregator candidates based on " + "their KRPC address. Only used for testing."); + namespace impala { static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total"); @@ -334,7 +338,18 @@ void Scheduler::ComputeRandomKrpcForAggregation(const ExecutorConfig& executor_c int num_agg = (int)ceil((double)num_non_coordinator_host / num_filters_per_host); DCHECK_GT(num_agg, 0); - std::shuffle(instance_groups.begin(), instance_groups.end(), *state->rng()); + if (UNLIKELY(FLAGS_sort_runtime_filter_aggregator_candidates)) { + sort(instance_groups.begin(), instance_groups.end(), + [src_state](InstanceToAggPairs a, InstanceToAggPairs b) { + int idx_a = a[0].first; + int idx_b = b[0].first; + return CompareNetworkAddressPB(src_state->instance_states[idx_a].krpc_host, + src_state->instance_states[idx_b].krpc_host) + < 0; + }); + } else { + std::shuffle(instance_groups.begin(), instance_groups.end(), *state->rng()); + } if (coordinator_instances.size() > 0) { // Put coordinator group behind so that coordinator won't be selected as intermediate // aggregator. diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc index f26c67cbff..8a497f7b2f 100644 --- a/be/src/service/data-stream-service.cc +++ b/be/src/service/data-stream-service.cc @@ -51,6 +51,9 @@ DEFINE_string(datastream_service_queue_mem_limit, "5%", QUEUE_LIMIT_MSG.c_str()) DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of threads for processing " "datastream services' RPCs. If left at default value 0, it will be set to number of " "CPU cores. Set it to a positive value to change from the default."); +DEFINE_int32_hidden(update_filter_min_wait_time_ms, 500, + "Minimum time for UpdateFilterFromRemote RPC to wait until destination QueryState is " + "ready."); DECLARE_string(debug_actions); namespace impala { @@ -128,20 +131,52 @@ void DataStreamService::UpdateFilterFromRemote( DCHECK(req->has_query_id()); DCHECK( req->has_bloom_filter() || req->has_min_max_filter() || req->has_in_list_filter()); - QueryState::ScopedRef qs(ProtoToQueryId(req->query_id())); - - if (qs.get() != nullptr) { - qs->UpdateFilterFromRemote(*req, context); - RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get()); - } else { - // Query state for requested query_id might have been cancelled or closed. - // i.e., RUNTIME_FILTER_WAIT_TIME_MS has passed and all fragment instances of - // query_id has complete their execution. - string err_msg = Substitute("Query State not found for query_id=$0", - PrintId(ProtoToQueryId(req->query_id()))); - LOG(INFO) << err_msg; - RespondAndReleaseRpc(Status(err_msg), resp, context, mem_tracker_.get()); + int64_t arrival_time = MonotonicMillis(); + + // Loop until destination QueryState is ready to accept filter update from remote. + // Sleep for few miliseconds in-between and break after 500ms grace period passed. + // The grace period is short so that RPC thread is not blocked for too long. + // This is a much simpler mechanism than KrpcDataStreamMgr::AddData. + // TODO: Revisit this with more sophisticated deferral mechanism if needed. + bool query_found = false; + int64_t total_wait_time = 0; + int32_t sleep_duration_ms = 2; + if (req->remaining_filter_wait_time_ms() < FLAGS_update_filter_min_wait_time_ms) { + LOG(INFO) << "UpdateFilterFromRemote RPC called with remaining wait time " + << req->remaining_filter_wait_time_ms() << " ms, less than " + << FLAGS_update_filter_min_wait_time_ms << " ms minimum wait time."; } + + do { + { + QueryState::ScopedRef qs(ProtoToQueryId(req->query_id())); + query_found |= (qs.get() != nullptr); + if (query_found) { + if (qs.get() == nullptr || qs->IsTerminalState()) { + // Query was found, but now is either missing or in terminal state. + // Break the loop and response with an error. + break; + } else if (qs->is_initialized()) { + qs->UpdateFilterFromRemote(*req, context); + RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get()); + return; + } + } + } + usleep(sleep_duration_ms * 1000); + // double sleep time for next iteration up to 128ms. + if (2 * sleep_duration_ms <= 128) sleep_duration_ms *= 2; + total_wait_time = MonotonicMillis() - arrival_time; + } while (total_wait_time < FLAGS_update_filter_min_wait_time_ms); + + // Query state for requested query_id might have been cancelled, closed, or not ready. + // i.e., RUNTIME_FILTER_WAIT_TIME_MS has passed and all fragment instances of + // query_id has complete their execution. + string err_msg = Substitute("QueryState for query_id=$0 $1 after $2 ms", + PrintId(ProtoToQueryId(req->query_id())), + query_found ? "no longer running" : "not found", total_wait_time); + LOG(INFO) << err_msg; + RespondAndReleaseRpc(Status(err_msg), resp, context, mem_tracker_.get()); } void DataStreamService::PublishFilter( diff --git a/be/src/service/query-state-record.cc b/be/src/service/query-state-record.cc index d054cf6443..db1b6c928b 100644 --- a/be/src/service/query-state-record.cc +++ b/be/src/service/query-state-record.cc @@ -221,10 +221,7 @@ QueryStateExpanded::QueryStateExpanded(const ClientRequestState& exec_state, // Per-Host Metrics for (int i =0; i < exec_state.schedule()->backend_exec_params_size(); i++) { const BackendExecParamsPB& b = exec_state.schedule()->backend_exec_params(i); - TNetworkAddress host; - host.hostname = b.address().hostname(); - host.uds_address = b.address().uds_address(); - host.port = b.address().port(); + TNetworkAddress host = FromNetworkAddressPB(b.address()); PerHostState state; state.fragment_instance_count = b.instance_params_size(); @@ -445,4 +442,4 @@ EventsTimelineIterator EventsTimelineIterator::end() { return EventsTimelineIterator(labels_, timestamps_, labels_->size()); } -} // namespace impala \ No newline at end of file +} // namespace impala diff --git a/be/src/util/network-util-test.cc b/be/src/util/network-util-test.cc index c255466cc2..035b9b7eb8 100644 --- a/be/src/util/network-util-test.cc +++ b/be/src/util/network-util-test.cc @@ -41,6 +41,12 @@ TEST(NetworkUtil, NetAddrCompHostnameDiff) { ASSERT_TRUE(fixture(first, second)); ASSERT_FALSE(fixture(second, first)); + ASSERT_TRUE( + CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second)) + < 0); + ASSERT_TRUE( + CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first)) + > 0); } // Assert where host fields are equal but port is different. @@ -59,6 +65,12 @@ TEST(NetworkUtil, NetAddrCompPortDiff) { ASSERT_TRUE(fixture(first, second)); ASSERT_FALSE(fixture(second, first)); + ASSERT_TRUE( + CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second)) + < 0); + ASSERT_TRUE( + CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first)) + > 0); } // Assert where host and port fields are equal but uds address is different. @@ -77,6 +89,12 @@ TEST(NetworkUtil, NetAddrCompUDSAddrDiff) { ASSERT_TRUE(fixture(first, second)); ASSERT_FALSE(fixture(second, first)); + ASSERT_TRUE( + CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second)) + < 0); + ASSERT_TRUE( + CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first)) + > 0); } // Assert where all three comparison fields are equal. @@ -95,6 +113,81 @@ TEST(NetworkUtil, NetAddrUDSAddrSame) { ASSERT_FALSE(fixture(first, second)); ASSERT_FALSE(fixture(second, first)); + ASSERT_TRUE( + CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second)) + == 0); + ASSERT_TRUE( + CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first)) + == 0); +} + +// Assert where host and port fields are equal first address does not have +// uds address set. +TEST(NetworkUtil, NetAddrOneMissUDSAddr) { + TNetworkAddressComparator fixture; + TNetworkAddress first; + TNetworkAddress second; + + first.__set_hostname("host"); + first.__set_port(0); + + second.__set_hostname("host"); + second.__set_port(0); + second.__set_uds_address(""); + + ASSERT_TRUE(fixture(first, second)); + ASSERT_FALSE(fixture(second, first)); + ASSERT_TRUE( + CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second)) + < 0); + ASSERT_TRUE( + CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first)) + > 0); +} + +// Assert where host and port fields are equal and both address does not have +// uds address set. +TEST(NetworkUtil, NetAddrAllMissUDSAddr) { + TNetworkAddressComparator fixture; + TNetworkAddress first; + TNetworkAddress second; + + first.__set_hostname("host"); + first.__set_port(0); + + second.__set_hostname("host"); + second.__set_port(0); + + ASSERT_FALSE(fixture(first, second)); + ASSERT_FALSE(fixture(second, first)); + ASSERT_TRUE( + CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second)) + == 0); + ASSERT_TRUE( + CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first)) + == 0); +} + +void CheckTranslation(TNetworkAddress thrift_address) { + NetworkAddressPB proto_address = FromTNetworkAddress(thrift_address); + TNetworkAddress thrift_address2 = FromNetworkAddressPB(proto_address); + NetworkAddressPB proto_address2 = FromTNetworkAddress(thrift_address2); + + TNetworkAddressComparator fixture; + ASSERT_FALSE(fixture(thrift_address, thrift_address2)); + ASSERT_FALSE(fixture(thrift_address2, thrift_address)); + ASSERT_TRUE(CompareNetworkAddressPB(proto_address, proto_address2) == 0); + ASSERT_TRUE(CompareNetworkAddressPB(proto_address2, proto_address) == 0); +} + +// Assert consistent translation between TNetworkAddress and NetworkAddressPB. +TEST(NetworkUtil, NetAddrTranslation) { + TNetworkAddress addr; + addr.__set_hostname("host"); + addr.__set_port(0); + CheckTranslation(addr); + addr.__set_uds_address("uds"); + CheckTranslation(addr); } } // namespace impala diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc index 80b2cde0a8..b59cd408b7 100644 --- a/be/src/util/network-util.cc +++ b/be/src/util/network-util.cc @@ -240,7 +240,7 @@ TNetworkAddress FromNetworkAddressPB(const NetworkAddressPB& address) { TNetworkAddress t_address; t_address.__set_hostname(address.hostname()); t_address.__set_port(address.port()); - t_address.__set_uds_address(address.uds_address()); + if (address.has_uds_address()) t_address.__set_uds_address(address.uds_address()); return t_address; } @@ -248,7 +248,7 @@ NetworkAddressPB FromTNetworkAddress(const TNetworkAddress& address) { NetworkAddressPB address_pb; address_pb.set_hostname(address.hostname); address_pb.set_port(address.port); - address_pb.set_uds_address(address.uds_address); + if (address.__isset.uds_address) address_pb.set_uds_address(address.uds_address); return address_pb; } @@ -270,7 +270,16 @@ bool TNetworkAddressComparator::operator()(const TNetworkAddress& a, } // Hostnames and ports were the same, compare on uds address. - return a.uds_address.compare(b.uds_address) < 0; + if (a.__isset.uds_address) { + if (b.__isset.uds_address) { + return a.uds_address.compare(b.uds_address) < 0; + } else { + return false; + } + } else if (b.__isset.uds_address) { + return true; + } + return false; } /// Pick a random port in the range of ephemeral ports diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h index 359cc11f74..0facbb191f 100644 --- a/be/src/util/network-util.h +++ b/be/src/util/network-util.h @@ -109,10 +109,29 @@ struct TNetworkAddressComparator { /// a free ephemeral port can't be found after 100 tries. int FindUnusedEphemeralPort(); +/// Compare function for two NetworkAddressPB. +/// The order is decided first by hostname, then by port, then by uds address. +inline int CompareNetworkAddressPB( + const NetworkAddressPB& lhs, const NetworkAddressPB& rhs) { + int comp = lhs.hostname().compare(rhs.hostname()); + if (comp == 0) comp = lhs.port() - rhs.port(); + if (comp == 0) { + if (lhs.has_uds_address()) { + if (rhs.has_uds_address()) { + comp = lhs.uds_address().compare(rhs.uds_address()); + } else { + comp = 1; // lhs preceed rhs + } + } else if (rhs.has_uds_address()) { + comp = -1; // rhs preceed lhs + } + } + return comp; +} + /// Return true if two NetworkAddressPB are match. inline bool KrpcAddressEqual(const NetworkAddressPB& lhs, const NetworkAddressPB& rhs) { - return lhs.hostname() == rhs.hostname() && lhs.port() == rhs.port() - && lhs.uds_address() == rhs.uds_address(); + return CompareNetworkAddressPB(lhs, rhs) == 0; } extern const std::string LOCALHOST_IP_STR; diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto index 6bca8e4d7d..46c17beb05 100644 --- a/common/protobuf/data_stream_service.proto +++ b/common/protobuf/data_stream_service.proto @@ -124,6 +124,9 @@ message UpdateFilterParamsPB { optional MinMaxFilterPB min_max_filter = 4; optional InListFilterPB in_list_filter = 5; + + // Remaining filter wait time as understood by sender. + optional int32 remaining_filter_wait_time_ms = 6; } message UpdateFilterResultPB { diff --git a/tests/custom_cluster/test_runtime_filter_aggregation.py b/tests/custom_cluster/test_runtime_filter_aggregation.py index da00e53678..d1397cd9b9 100644 --- a/tests/custom_cluster/test_runtime_filter_aggregation.py +++ b/tests/custom_cluster/test_runtime_filter_aggregation.py @@ -21,7 +21,10 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.environ import build_flavor_timeout, ImpalaTestClusterProperties -from tests.common.test_dimensions import add_mandatory_exec_option +from tests.common.test_dimensions import ( + add_mandatory_exec_option, + add_exec_option_dimension +) # slow_build_timeout is set to 200000 to avoid failures like IMPALA-8064 where the # runtime filters don't arrive in time. @@ -43,6 +46,9 @@ def get_workload(cls): def add_test_dimensions(cls): super(TestRuntimeFilterAggregation, cls).add_test_dimensions() add_mandatory_exec_option(cls, 'max_num_filters_aggregated_per_host', 2) + # Exercise small, non-fatal jitters. + add_exec_option_dimension( + cls, 'debug_action', ['', 'QUERY_STATE_BEFORE_INIT_GLOBAL:JITTER@200]']) # Enable query option ASYNC_CODEGEN for slow build if build_runs_slowly: add_mandatory_exec_option(cls, "async_codegen", 1) @@ -60,3 +66,64 @@ def test_basic_filters(self, vector): } self.run_test_case('QueryTest/runtime_filters', vector, test_file_vars=vars) self.run_test_case('QueryTest/bloom_filters', vector) + + +class TestLateQueryStateInit(CustomClusterTestSuite): + """Test that distributed runtime filter aggregation still works + when remote query state of intermediate aggregator node is late to initialize.""" + _wait_time = WAIT_TIME_MS // 20 + _init_delay = [100, _wait_time] + + @classmethod + def get_workload(cls): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestLateQueryStateInit, cls).add_test_dimensions() + add_mandatory_exec_option(cls, 'max_num_filters_aggregated_per_host', 2) + add_mandatory_exec_option(cls, 'runtime_filter_wait_time_ms', cls._wait_time) + # Inject sleep in second impalad since sort_runtime_filter_aggregator_cadidate=true + # and the first one (coordinator) will never be selected as intermediate aggregator. + actions = ["QUERY_STATE_BEFORE_INIT:27001:SLEEP@{0}".format(d) for d in + cls._init_delay] + add_exec_option_dimension(cls, 'debug_action', actions) + # Enable query option ASYNC_CODEGEN for slow build + if build_runs_slowly: + add_mandatory_exec_option(cls, "async_codegen", 1) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--sort_runtime_filter_aggregator_candidates=true --logbuflevel=-1") + def test_late_query_state_init(self, vector): + """Test that distributed runtime filter aggregation still works + when remote query state of intermediate aggregator node is late to initialize.""" + query = ('select count(*) from functional.alltypes p ' + 'join [SHUFFLE] functional.alltypestiny b ' + 'on p.month = b.int_col and b.month = 1 and b.string_col = "1"') + exec_options = vector.get_value('exec_option') + result = self.execute_query_expect_success(self.client, query, exec_options) + assert result.data[0] == '620' + + # Expect no log printed in short init delay scenario. + # In long init delay scenario, two possible situations can happen: + # 1. The build scanner assigned in first impalad exchange all build rows (1) to + # the second impalad that is blocked at QueryExecMgr::StartQuery. This indirectly + # delay all impalad because all JOIN BUILD fragment need to wait for EOS signal + # from exchange sender. The probability for this case is 1/3. + # 2. The build scanner exchange all build rows to other impalads than the second one. + # The JOIN BUILD fragment in first and third impalads immediately receive EOS + # signal from exchange sender, complete build, and send their filter update to + # the second impalad. The second impalad stay blocked at QueryExecMgr::StartQuery + # and filter update need to wait until it gives up. The probability for this case + # is 2/3. + expected = -1 if str(self._init_delay[-1]) in exec_options['debug_action'] else 0 + all_blocked = 'UpdateFilterFromRemote RPC called with remaining wait time' + preagg_blocked = 'QueryState for query_id={0} no'.format(result.query_id) + log_pattern = '({0}|{1})'.format(all_blocked, preagg_blocked) + if expected == -1: + if 'Filter 0 inflight for final aggregation' in result.runtime_profile: + log_pattern = all_blocked # case 1. + else: + log_pattern = preagg_blocked # case 2. + self.assert_log_contains('impalad_node1', 'INFO', log_pattern, expected) diff --git a/tests/util/workload_management.py b/tests/util/workload_management.py index 4cdacfe548..6eb554e845 100644 --- a/tests/util/workload_management.py +++ b/tests/util/workload_management.py @@ -359,8 +359,10 @@ def assert_query(query_tbl, client, expected_cluster_id, raw_profile=None, impal perhost_frags = re.search(r'\n\s+Per Host Number of Fragment Instances:\s+(.*?)\n', profile_text) assert perhost_frags is not None - assert data[index] == ",".join(sorted(perhost_frags.group(1).replace("(", "=") - .replace(")", "").split(" "))), "per-host fragment instances incorrect" + expected = ",".join(sorted(perhost_frags.group(1).replace("(", "=") + .replace(")", "").split(" "))) + assert data[index] == expected, ('per-host fragment instances incorrect.' + ' expected="{0}" actual="{1}"').format(expected, data[index]) # Backends Count index += 1