Skip to content

Commit

Permalink
IMPALA-10557: Support Kudu's multi-row transaction
Browse files Browse the repository at this point in the history
Kudu added multi-row transaction so Impala could run query that inserts
multiple rows into Kudu's table in the context of a single transaction.
Kudu provides new Java/C++ client APIs to open/commit/rollback
transaction, create session with transaction, serialize/deserialize
metadata of transaction object. Kudu transaction object has built-in
heartbeater.

This patch added Impala support to use Kudu's multiple-row transaction.
 - Added a new query option to enable Kudu's transaction.
 - When the query option is set, a new Kudu transaction should be
   started for "insert", "CTAS" and "UPDATE/UPSERT/DELETE" statements
   by Impala's frontend of coordinator.
 - The Kudu transaction objects are kept in KuduTransactionManager until
   the transactions are going to be aborted or committed.
 - Frontend serialize the transaction metadata into a transaction token
   and pass to executors.
 - Executors deserialize the transaction token and ingest via that
   transaction handle. For Kudu session in the context of a transaction,
   return the first error if there are any pending errors for the Kudu
   session so that the Kudu transaction will be aborted.
   Since Kudu does not support transaction for "UPDATE/UPSERT/DELETE"
   statements now, Kudu returns error which causes transaction to be
   aborted.
 - Coordinator commits the transaction if everything goes well.
   Otherwise, aborts the transaction.

Also changed code to store KuduClient as shared pointer since KuduClient
has to be passed as a shared pointer when KuduTransaction::Deserialize()
is called.

Testing:
 - Added new e-to-e tests for Kudu transaction.
 - Passed core test.

Change-Id: I876ada48991afdff5d61b5d6a0417571aba7cb34
Reviewed-on: http://gerrit.cloudera.org:8080/17553
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
  • Loading branch information
wzhou-code authored and Impala Public Jenkins committed Jun 24, 2021
1 parent 09454ce commit fcaea30
Show file tree
Hide file tree
Showing 29 changed files with 793 additions and 66 deletions.
4 changes: 2 additions & 2 deletions be/src/exec/kudu-scan-node-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class KuduScanNodeBase : public ScanNode {

/// Pointer to the KuduClient, which is stored on the QueryState and shared between
/// scanners and fragment instances.
kudu::client::KuduClient* client_ = nullptr;
kudu::client::sp::shared_ptr<kudu::client::KuduClient> client_;

/// Set of scan tokens to be deserialized into Kudu scanners.
std::vector<std::string> scan_tokens_;
Expand All @@ -99,7 +99,7 @@ class KuduScanNodeBase : public ScanNode {
static const std::string KUDU_REMOTE_TOKENS;
static const std::string KUDU_CLIENT_TIME;

kudu::client::KuduClient* kudu_client() { return client_; }
kudu::client::KuduClient* kudu_client() { return client_.get(); }
RuntimeProfile::Counter* kudu_round_trips() const { return kudu_round_trips_; }
RuntimeProfile::Counter* kudu_client_time() const { return kudu_client_time_; }
};
Expand Down
38 changes: 34 additions & 4 deletions be/src/exec/kudu-table-sink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@
#include <sstream>

#include <boost/bind.hpp>
#include <kudu/client/client.h>
#include <kudu/client/write_op.h>
#include <thrift/protocol/TDebugProtocol.h>

#include "exec/kudu-util.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-expr.h"
#include "gen-cpp/ImpalaInternalService_constants.h"
#include "gutil/gscoped_ptr.h"
#include "runtime/descriptors.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "util/debug-util.h"
#include "util/runtime-profile-counters.h"

#include "common/names.h"
Expand All @@ -58,6 +60,7 @@ using kudu::client::KuduSchema;
using kudu::client::KuduClient;
using kudu::client::KuduRowResult;
using kudu::client::KuduTable;
using kudu::client::KuduTransaction;
using kudu::client::KuduInsert;
using kudu::client::KuduUpdate;
using kudu::client::KuduError;
Expand Down Expand Up @@ -158,7 +161,24 @@ Status KuduTableSink::Open(RuntimeState* state) {
kudu_column_nullabilities_.push_back(table_->schema().Column(i).is_nullable());
}

session_ = client_->NewSession();
// Inject failure or sleep time before creating Kudu session.
RETURN_IF_ERROR(
DebugAction(state->query_options(), "FIS_KUDU_TABLE_SINK_CREATE_SESSION"));

if (kudu_table_sink_.__isset.kudu_txn_token
&& !kudu_table_sink_.kudu_txn_token.empty()) {
// Deserialize the transaction token and create a session in the context of
// transaction.
KUDU_RETURN_IF_ERROR(
KuduTransaction::Deserialize(client_, kudu_table_sink_.kudu_txn_token, &txn_),
"Couldn't deserialize metadata of Kudu transaction");
KUDU_RETURN_IF_ERROR(txn_->CreateSession(&session_),
"Couldn't create session in the context of transaction");
is_transactional_ = true;
} else {
session_ = client_->NewSession();
}

session_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms);

// KuduSession Set* methods here and below return a status for API compatibility.
Expand Down Expand Up @@ -277,8 +297,13 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
SCOPED_TIMER(kudu_apply_timer_);
for (auto&& write: write_ops) {
KUDU_RETURN_IF_ERROR(session_->Apply(write.release()), "Error applying Kudu Op.");
// Inject failure when writing partial of row batch.
RETURN_IF_ERROR(
DebugAction(state->query_options(), "FIS_KUDU_TABLE_SINK_WRITE_PARTIAL_ROW"));
}
}
// Inject failure or sleep time in the end of writing row batch.
RETURN_IF_ERROR(DebugAction(state->query_options(), "FIS_KUDU_TABLE_SINK_WRITE_BATCH"));

// Increment for all rows received by the sink, including errors.
COUNTER_ADD(total_rows_, batch->num_rows());
Expand All @@ -301,7 +326,11 @@ Status KuduTableSink::CheckForErrors(RuntimeState* state) {
// we can't be sure all errors can be ignored, so an error status will be reported.
bool error_overflow = false;
session_->GetPendingErrors(&errors, &error_overflow);
if (UNLIKELY(error_overflow)) {
if (is_transactional_) {
// Return the first error so that the transaction will be aborted.
status = Status(
strings::Substitute("Kudu reported error: $0", errors[0]->status().ToString()));
} else if (UNLIKELY(error_overflow)) {
status = Status("Error overflow in Kudu session.");
}

Expand Down Expand Up @@ -352,7 +381,8 @@ void KuduTableSink::Close(RuntimeState* state) {
if (closed_) return;
session_.reset();
mem_tracker_->Release(client_tracked_bytes_);
client_ = nullptr;
txn_.reset();
client_.reset();
SCOPED_TIMER(profile()->total_time_counter());
DataSink::Close(state);
closed_ = true;
Expand Down
13 changes: 9 additions & 4 deletions be/src/exec/kudu-table-sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class KuduTableSinkConfig : public DataSinkConfig {
/// buffer for a particular destination (of the 10MB of the total mutation buffer space)
/// because Kudu currently has some 8MB buffer limits.
///
/// Kudu doesn't have transactions yet, so some rows may fail to write while others are
/// If Kudu's transaction is not enabled, some rows may fail to write while others are
/// successful. The Kudu client reports errors, some of which are treated as warnings and
/// will not fail the query: PK already exists on INSERT, key not found on UPDATE/DELETE,
/// NULL in a non-nullable column, and PK specifying rows in an uncovered range.
Expand Down Expand Up @@ -96,11 +96,13 @@ class KuduTableSink : public DataSink {
/// The descriptor of the KuduTable being written to. Set on Prepare().
const KuduTableDescriptor* table_desc_;

/// The Kudu client, owned by the ExecEnv.
kudu::client::KuduClient* client_ = nullptr;
/// The Kudu table and session.
/// Pointer to the Kudu client, shared among ExecEnv and other actors which hold the
/// pointer.
kudu::client::sp::shared_ptr<kudu::client::KuduClient> client_;
/// The Kudu table, session, and transaction.
kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
kudu::client::sp::shared_ptr<kudu::client::KuduSession> session_;
kudu::client::sp::shared_ptr<kudu::client::KuduTransaction> txn_;

/// A cache of the nullability of each Kudu column. The Kudu schema accessor
/// is not inlined and actually creates a copy (see IMPALA-8284).
Expand Down Expand Up @@ -133,6 +135,9 @@ class KuduTableSink : public DataSink {
/// Rate at which the sink consumes and processes rows, i.e. writing rows to Kudu or
/// skipping rows that are known to violate nullability constraints.
RuntimeProfile::Counter* rows_processed_rate_ = nullptr;

/// True if it's in Kudu transaction. It's valid only after Open() succeeds.
bool is_transactional_ = false;
};

} // namespace impala
Expand Down
5 changes: 2 additions & 3 deletions be/src/exprs/kudu-partition-expr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ Status KuduPartitionExpr::Init(
DCHECK(dynamic_cast<KuduTableDescriptor*>(table_desc))
<< "Target table for KuduPartitioner must be a Kudu table.";
table_desc_ = static_cast<KuduTableDescriptor*>(table_desc);
kudu::client::KuduClient* client;
RETURN_IF_ERROR(ExecEnv::GetInstance()->GetKuduClient(
table_desc_->kudu_master_addresses(), &client));
KUDU_RETURN_IF_ERROR(client->OpenTable(table_desc_->table_name(), &table_),
table_desc_->kudu_master_addresses(), &client_));
KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc_->table_name(), &table_),
"Failed to open Kudu table.");
return Status::OK();
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/exprs/kudu-partition-expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class KuduPartitionExpr : public ScalarExpr {
private:
TKuduPartitionExpr tkudu_partition_expr_;

/// Pointer to the Kudu client, shared among ExecEnv and other actors which hold the
/// pointer.
kudu::client::sp::shared_ptr<kudu::client::KuduClient> client_;

/// Descriptor of the table to use the partiitoning scheme from. Set in Prepare().
KuduTableDescriptor* table_desc_;

Expand Down
22 changes: 9 additions & 13 deletions be/src/runtime/exec-env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <boost/algorithm/string.hpp>
#include <gflags/gflags.h>
#include <gutil/strings/substitute.h>
#include <kudu/client/client.h>

#include "catalog/catalog-service-client-wrapper.h"
#include "common/logging.h"
Expand Down Expand Up @@ -190,10 +189,6 @@ void SendClusterMembershipToFrontend(

namespace impala {

struct ExecEnv::KuduClientPtr {
kudu::client::sp::shared_ptr<kudu::client::KuduClient> kudu_client;
};

ExecEnv* ExecEnv::exec_env_ = nullptr;

ExecEnv::ExecEnv(bool external_fe)
Expand Down Expand Up @@ -581,20 +576,21 @@ void ExecEnv::InitSystemStateInfo() {
});
}

Status ExecEnv::GetKuduClient(
const vector<string>& master_addresses, kudu::client::KuduClient** client) {
Status ExecEnv::GetKuduClient(const vector<string>& master_addresses,
kudu::client::sp::shared_ptr<kudu::client::KuduClient>* client) {
string master_addr_concat = join(master_addresses, ",");
lock_guard<SpinLock> l(kudu_client_map_lock_);
auto kudu_client_map_it = kudu_client_map_.find(master_addr_concat);
if (kudu_client_map_it == kudu_client_map_.end()) {
// KuduClient doesn't exist, create it
KuduClientPtr* kudu_client_ptr = new KuduClientPtr;
RETURN_IF_ERROR(CreateKuduClient(master_addresses, &kudu_client_ptr->kudu_client));
kudu_client_map_[master_addr_concat].reset(kudu_client_ptr);
*client = kudu_client_ptr->kudu_client.get();
// KuduClient doesn't exist, create it.
LOG(INFO) << "Creating a new KuduClient for masters=" << master_addr_concat;
kudu::client::sp::shared_ptr<kudu::client::KuduClient> kudu_client;
RETURN_IF_ERROR(CreateKuduClient(master_addresses, &kudu_client));
kudu_client_map_.insert(make_pair(master_addr_concat, kudu_client));
*client = kudu_client;
} else {
// Return existing KuduClient
*client = kudu_client_map_it->second->kudu_client.get();
*client = kudu_client_map_it->second;
}
return Status::OK();
}
Expand Down
18 changes: 9 additions & 9 deletions be/src/runtime/exec-env.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <unordered_map>

#include <boost/scoped_ptr.hpp>
#include <kudu/client/client.h>

// NOTE: try not to add more headers here: exec-env.h is included in many many files.
#include "common/global-types.h"
Expand Down Expand Up @@ -172,10 +173,11 @@ class ExecEnv {

/// Gets a KuduClient for this list of master addresses. It will look up and share
/// an existing KuduClient if possible. Otherwise, it will create a new KuduClient
/// internally and return a pointer to it. All KuduClients accessed through this
/// interface are owned by the ExecEnv. Thread safe.
/// internally and return a shared pointer to it. All KuduClients accessed through this
/// interface are shared among ExecEnv and other actors which hold the returned handle.
/// Thread safe.
Status GetKuduClient(const std::vector<std::string>& master_addrs,
kudu::client::KuduClient** client) WARN_UNUSED_RESULT;
kudu::client::sp::shared_ptr<kudu::client::KuduClient>* client) WARN_UNUSED_RESULT;

int64_t admit_mem_limit() const { return admit_mem_limit_; }
int64_t admission_slots() const { return admission_slots_; }
Expand Down Expand Up @@ -262,14 +264,12 @@ class ExecEnv {

SpinLock kudu_client_map_lock_; // protects kudu_client_map_

/// Opaque type for storing the pointer to the KuduClient. This allows us
/// to avoid including Kudu header files.
struct KuduClientPtr;

/// Map from the master addresses string for a Kudu table to the KuduClientPtr for
/// Map from the master addresses string for a Kudu table to the KuduClient for
/// accessing that table. The master address string is constructed by joining
/// the sorted master address list entries with a comma separator.
typedef std::unordered_map<std::string, std::unique_ptr<KuduClientPtr>> KuduClientMap;
typedef std::unordered_map<std::string,
kudu::client::sp::shared_ptr<kudu::client::KuduClient>>
KuduClientMap;

/// Map for sharing KuduClients across the ExecEnv. This map requires that the master
/// address lists be identical in order to share a KuduClient.
Expand Down
55 changes: 53 additions & 2 deletions be/src/service/client-request-state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,11 @@ Status ClientRequestState::Finalize(bool check_inflight, const Status* cause) {
}

// If the transaction didn't get committed by this point then we should just abort it.
if (InTransaction()) AbortTransaction();
if (InTransaction()) {
AbortTransaction();
} else if (InKuduTransaction()) {
AbortKuduTransaction();
}

UpdateEndTime();

Expand Down Expand Up @@ -947,7 +951,11 @@ Status ClientRequestState::WaitInternal() {
if (!child_queries.empty()) query_events_->MarkEvent("Child queries finished");

if (GetCoordinator() != NULL) {
RETURN_IF_ERROR(GetCoordinator()->Wait());
Status status = GetCoordinator()->Wait();
if (UNLIKELY(!status.ok())) {
if (InKuduTransaction()) AbortKuduTransaction();
return status;
}
RETURN_IF_ERROR(UpdateCatalog());
}

Expand Down Expand Up @@ -1332,6 +1340,17 @@ Status ClientRequestState::UpdateCatalog() {
RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
exec_request_->query_options.sync_ddl));
}
} else if (InKuduTransaction()) {
// Commit the Kudu transaction. Clear transaction state if it's successful.
// Otherwise, abort the Kudu transaction and clear transaction state.
// Note that TQueryExecRequest.finalize_params is not set for inserting rows to Kudu
// table.
Status status = CommitKuduTransaction();
if (UNLIKELY(!status.ok())) {
AbortKuduTransaction();
LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail();
return status;
}
}
query_events_->MarkEvent("DML Metastore update finished");
return Status::OK();
Expand Down Expand Up @@ -1610,6 +1629,38 @@ void ClientRequestState::ClearTransactionState() {
transaction_closed_ = true;
}

bool ClientRequestState::InKuduTransaction() const {
// If Kudu transaction is opened, TQueryExecRequest.query_ctx.is_kudu_transactional
// is set as true by Frontend.doCreateExecRequest().
DCHECK(exec_request_ != nullptr);
return (exec_request_->query_exec_request.query_ctx.is_kudu_transactional
&& !transaction_closed_);
}

void ClientRequestState::AbortKuduTransaction() {
DCHECK(InKuduTransaction());
if (frontend_->AbortKuduTransaction(query_ctx_.query_id).ok()) {
query_events_->MarkEvent("Kudu transaction aborted");
} else {
VLOG(1) << Substitute("Unable to abort Kudu transaction with query-id: $0",
PrintId(query_ctx_.query_id));
}
transaction_closed_ = true;
}

Status ClientRequestState::CommitKuduTransaction() {
DCHECK(InKuduTransaction());
Status status = frontend_->CommitKuduTransaction(query_ctx_.query_id);
if (status.ok()) {
query_events_->MarkEvent("Kudu transaction committed");
transaction_closed_ = true;
} else {
VLOG(1) << Substitute("Unable to commit Kudu transaction with query-id: $0",
PrintId(query_ctx_.query_id));
}
return status;
}

void ClientRequestState::LogQueryEvents() {
// Wait until the results are available. This guarantees the completion of non QUERY
// statements like DDL/DML etc. Query events are logged if the query reaches a FINISHED
Expand Down
20 changes: 15 additions & 5 deletions be/src/service/client-request-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ class ClientRequestState {

/// Gather and publish all required updates to the metastore.
/// For transactional queries:
/// If everything goes well the transaction is committed by the Catalogd.
/// If everything goes well the Hive transaction is committed by the Catalogd,
/// but the Kudu transaction is committed by this function.
/// If an error occurs the transaction gets aborted by this function. Either way
/// the transaction will be closed when this function returns.
Status UpdateCatalog() WARN_UNUSED_RESULT;
Expand Down Expand Up @@ -748,19 +749,28 @@ class ClientRequestState {
/// 'SET' and all of them for 'SET ALL'
void PopulateResultForSet(bool is_set_all);

/// Returns the transaction id for this client request. 'InTransaction()' must be
/// Returns the Hive transaction id for this client request. 'InTransaction()' must be
/// true when invoked.
int64_t GetTransactionId() const;

/// Returns true if there is an open transaction for this client request.
/// Returns true if there is an open Hive transaction for this client request.
bool InTransaction() const;

/// Aborts the transaction of this client request.
/// Aborts the Hive transaction of this client request.
void AbortTransaction();

/// Invoke this function when the transaction is committed or aborted.
/// Invoke this function when the Hive transaction is committed or aborted.
void ClearTransactionState();

/// Returns true if there is an open Kudu transaction for this client request.
bool InKuduTransaction() const;

/// Aborts the Kudu transaction of this client request.
void AbortKuduTransaction();

/// Commits the Kudu transaction of this client request.
Status CommitKuduTransaction();

/// helper that logs the audit record for this query id. Takes the query_status
/// as input parameter so that it operates on the same status polled in the
/// beginning of LogQueryEvents().
Expand Down
Loading

0 comments on commit fcaea30

Please sign in to comment.