Skip to content

Commit

Permalink
refactor(function_test): abstract some functions to reduce duplicate …
Browse files Browse the repository at this point in the history
…code (#1756)
  • Loading branch information
acelyc111 authored Dec 11, 2023
1 parent fa82854 commit 4dd2b7a
Show file tree
Hide file tree
Showing 18 changed files with 398 additions and 624 deletions.
20 changes: 10 additions & 10 deletions src/test/function_test/backup_restore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ set(MY_PROJ_SRC "")
set(MY_SRC_SEARCH_MODE "GLOB")

set(MY_PROJ_LIBS
dsn_client
dsn_replication_common
dsn_utils
gtest
sasl2
gssapi_krb5
krb5
function_test_utils
pegasus_client_static
)
dsn_client
dsn_replication_common
dsn_utils
gtest
sasl2
gssapi_krb5
krb5
function_test_utils
pegasus_client_static
test_utils)

set(MY_BOOST_LIBS Boost::system Boost::filesystem)

Expand Down
151 changes: 31 additions & 120 deletions src/test/function_test/backup_restore/test_backup_and_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,19 @@
// under the License.

#include <stdint.h>
#include <unistd.h>
#include <iostream>
#include <memory>
#include <string>
#include <vector>

#include "backup_types.h"
#include "base/pegasus_const.h"
#include "client/replication_ddl_client.h"
#include "dsn.layer2_types.h"
#include "gtest/gtest.h"
#include "include/pegasus/client.h"
#include "include/pegasus/error.h"
#include "runtime/rpc/rpc_address.h"
#include "test/function_test/utils/test_util.h"
#include "test_util/test_util.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/test_macros.h"
#include "utils/utils.h"

using namespace dsn;
Expand All @@ -44,135 +40,50 @@ class backup_restore_test : public test_util
public:
void TearDown() override
{
ASSERT_EQ(ERR_OK, ddl_client_->drop_app(app_name_, 0));
ASSERT_EQ(ERR_OK, ddl_client_->drop_app(table_name_, 0));
ASSERT_EQ(ERR_OK, ddl_client_->drop_app(s_new_app_name, 0));
}

bool write_data()
void wait_backup_complete(int64_t backup_id)
{
for (int i = 0; i < s_num_of_rows; ++i) {
int ret = client_->set("hashkey_" + std::to_string(i),
"sortkey_" + std::to_string(i),
"value_" + std::to_string(i));
if (ret != pegasus::PERR_OK) {
std::cout << "write data failed. " << std::endl;
return false;
}
}
return true;
}

bool verify_data(const std::string &app_name)
{
for (int i = 0; i < s_num_of_rows; ++i) {
const std::string &expected_value = "value_" + std::to_string(i);
std::string value;
int ret =
client_->get("hashkey_" + std::to_string(i), "sortkey_" + std::to_string(i), value);
if (ret != pegasus::PERR_OK) {
return false;
}
if (value != expected_value) {
return false;
}
}
return true;
}

start_backup_app_response start_backup(const std::string &user_specified_path = "")
{
return ddl_client_->backup_app(app_id_, s_provider_type, user_specified_path).get_value();
}

query_backup_status_response query_backup(int64_t backup_id)
{
return ddl_client_->query_backup(app_id_, backup_id).get_value();
}

error_code start_restore(int64_t backup_id, const std::string &user_specified_path = "")
{
return ddl_client_->do_restore(s_provider_type,
cluster_name_,
/*policy_name=*/"",
backup_id,
app_name_,
app_id_,
s_new_app_name,
/*skip_bad_partition=*/false,
user_specified_path);
}

bool wait_backup_complete(int64_t backup_id, int max_sleep_seconds)
{
int sleep_sec = 0;
bool is_backup_complete = false;
while (!is_backup_complete && sleep_sec <= max_sleep_seconds) {
std::cout << "sleep a while to wait backup complete." << std::endl;
sleep(s_check_interval_sec);
sleep_sec += s_check_interval_sec;

auto resp = query_backup(backup_id);
if (resp.err != ERR_OK) {
return false;
}
// we got only one backup_item for a certain app_id and backup_id.
auto item = resp.backup_items[0];
is_backup_complete = (item.end_time_ms > 0);
}
return is_backup_complete;
}

bool wait_app_become_healthy(const std::string &app_name, uint32_t max_sleep_seconds)
{
int sleep_sec = 0;
bool is_app_healthy = false;
while (!is_app_healthy && sleep_sec <= max_sleep_seconds) {
std::cout << "sleep a while to wait app become healthy." << std::endl;
sleep(s_check_interval_sec);
sleep_sec += s_check_interval_sec;

int32_t partition_count;
std::vector<partition_configuration> partitions;
auto err = ddl_client_->list_app(app_name, app_id_, partition_count, partitions);
if (err != ERR_OK) {
std::cout << "list app " + app_name + " failed" << std::endl;
return false;
}
int32_t healthy_partition_count = 0;
for (const auto &partition : partitions) {
if (partition.primary.is_invalid()) {
break;
}
if (partition.secondaries.size() + 1 < partition.max_replica_count) {
break;
}
healthy_partition_count++;
}
is_app_healthy = (healthy_partition_count == partition_count);
}
return is_app_healthy;
ASSERT_IN_TIME(
[&] {
auto resp = ddl_client_->query_backup(table_id_, backup_id).get_value();
ASSERT_EQ(dsn::ERR_OK, resp.err);
ASSERT_FALSE(resp.backup_items.empty());
// we got only one backup_item for a certain app_id and backup_id.
ASSERT_GT(resp.backup_items[0].end_time_ms, 0);
},
180);
}

void test_backup_and_restore(const std::string &user_specified_path = "")
{
ASSERT_TRUE(wait_app_become_healthy(app_name_, 180));
NO_FATALS(wait_table_healthy(table_name_));
NO_FATALS(write_data(s_num_of_rows));
NO_FATALS(verify_data(table_name_, s_num_of_rows));

ASSERT_TRUE(write_data());
ASSERT_TRUE(verify_data(app_name_));

auto resp = start_backup(user_specified_path);
auto resp =
ddl_client_->backup_app(table_id_, s_provider_type, user_specified_path).get_value();
ASSERT_EQ(ERR_OK, resp.err);
int64_t backup_id = resp.backup_id;
ASSERT_TRUE(wait_backup_complete(backup_id, 180));
ASSERT_EQ(ERR_OK, start_restore(backup_id, user_specified_path));
ASSERT_TRUE(wait_app_become_healthy(s_new_app_name, 180));

ASSERT_TRUE(verify_data(s_new_app_name));
NO_FATALS(wait_backup_complete(backup_id));
ASSERT_EQ(ERR_OK,
ddl_client_->do_restore(s_provider_type,
cluster_name_,
/* policy_name */ "",
backup_id,
table_name_,
table_id_,
s_new_app_name,
/* skip_bad_partition */ false,
user_specified_path));
NO_FATALS(wait_table_healthy(s_new_app_name));
NO_FATALS(verify_data(s_new_app_name, s_num_of_rows));
}

private:
static const uint32_t s_num_of_rows = 1000;
static const uint8_t s_check_interval_sec = 10;
static const std::string s_new_app_name;
static const std::string s_provider_type;
};
Expand Down
7 changes: 4 additions & 3 deletions src/test/function_test/base_api/integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ TEST_F(integration_test, write_corrupt_db)
// cause timeout.
// Force to fetch the latest route table.
client_ =
pegasus_client_factory::get_client(cluster_name_.c_str(), app_name_.c_str());
pegasus_client_factory::get_client(cluster_name_.c_str(), table_name_.c_str());
ASSERT_TRUE(client_ != nullptr);
} else {
ASSERT_TRUE(false) << ret;
Expand All @@ -86,7 +86,8 @@ TEST_F(integration_test, write_corrupt_db)
break;
}
ASSERT_EQ(PERR_NOT_FOUND, ret);
client_ = pegasus_client_factory::get_client(cluster_name_.c_str(), app_name_.c_str());
client_ =
pegasus_client_factory::get_client(cluster_name_.c_str(), table_name_.c_str());
ASSERT_TRUE(client_ != nullptr);

ret = client_->get(hkey, skey, got_value);
Expand Down Expand Up @@ -179,7 +180,7 @@ TEST_F(integration_test, read_corrupt_db)
// a new read operation on the primary replica it ever held will cause timeout.
// Force to fetch the latest route table.
client_ =
pegasus_client_factory::get_client(cluster_name_.c_str(), app_name_.c_str());
pegasus_client_factory::get_client(cluster_name_.c_str(), table_name_.c_str());
ASSERT_TRUE(client_ != nullptr);
} else {
ASSERT_TRUE(false) << ret;
Expand Down
2 changes: 1 addition & 1 deletion src/test/function_test/base_api/test_batch_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class batch_get : public test_util
TEST_F(batch_get, set_and_then_batch_get)
{
auto rrdb_client =
new ::dsn::apps::rrdb_client(cluster_name_.c_str(), meta_list_, app_name_.c_str());
new ::dsn::apps::rrdb_client(cluster_name_.c_str(), meta_list_, table_name_.c_str());

int test_data_count = 100;
int test_timeout_milliseconds = 3000;
Expand Down
4 changes: 2 additions & 2 deletions src/test/function_test/base_api/test_recall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ TEST_F(drop_and_recall, simple)
}

// drop the table
ASSERT_EQ(dsn::ERR_OK, ddl_client_->drop_app(app_name_, 0));
ASSERT_EQ(dsn::ERR_OK, ddl_client_->drop_app(table_name_, 0));

// wait for all elements to be dropped
for (int i = 0; i < partition_count_; ++i) {
Expand All @@ -91,7 +91,7 @@ TEST_F(drop_and_recall, simple)
}

// then recall table
ASSERT_EQ(dsn::ERR_OK, ddl_client_->recall_app(app_id_, ""));
ASSERT_EQ(dsn::ERR_OK, ddl_client_->recall_app(table_id_, ""));

// then read all keys
for (int i = 0; i < kv_count; ++i) {
Expand Down
25 changes: 7 additions & 18 deletions src/test/function_test/base_api/test_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@
#include <string.h>
#include <time.h>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <iterator>
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include <vector>

Expand All @@ -36,14 +34,13 @@
#include "client/replication_ddl_client.h"
#include "gtest/gtest.h"
#include "include/pegasus/client.h"
#include "meta_admin_types.h"
#include "pegasus/error.h"
#include "test/function_test/utils/test_util.h"
#include "test/function_test/utils/utils.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/fmt_logging.h"
#include "utils/synchronize.h"
#include "utils/test_macros.h"

using namespace ::pegasus;

Expand All @@ -53,14 +50,14 @@ class scan_test : public test_util
void SetUp() override
{
test_util::SetUp();
ASSERT_EQ(dsn::ERR_OK, ddl_client_->drop_app(app_name_, 0));
ASSERT_EQ(dsn::ERR_OK, ddl_client_->create_app(app_name_, "pegasus", 8, 3, {}, false));
client_ = pegasus_client_factory::get_client(cluster_name_.c_str(), app_name_.c_str());
ASSERT_EQ(dsn::ERR_OK, ddl_client_->drop_app(table_name_, 0));
ASSERT_EQ(dsn::ERR_OK, ddl_client_->create_app(table_name_, "pegasus", 8, 3, {}, false));
client_ = pegasus_client_factory::get_client(cluster_name_.c_str(), table_name_.c_str());
ASSERT_TRUE(client_ != nullptr);
ASSERT_NO_FATAL_FAILURE(fill_database());
}

void TearDown() override { ASSERT_EQ(dsn::ERR_OK, ddl_client_->drop_app(app_name_, 0)); }
void TearDown() override { ASSERT_EQ(dsn::ERR_OK, ddl_client_->drop_app(table_name_, 0)); }

// REQUIRED: 'buffer_' has been filled with random chars.
const std::string random_string() const
Expand Down Expand Up @@ -425,12 +422,7 @@ TEST_F(scan_test, REQUEST_EXPIRE_TS)
TEST_F(scan_test, ITERATION_TIME_LIMIT)
{
// update iteration threshold to 1ms
auto response = ddl_client_->set_app_envs(
client_->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(1)});
ASSERT_EQ(true, response.is_ok());
ASSERT_EQ(dsn::ERR_OK, response.get_value().err);
// wait envs to be synced.
std::this_thread::sleep_for(std::chrono::seconds(30));
NO_FATALS(update_table_env({ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(1)}));

// write data into table
int32_t i = 0;
Expand All @@ -450,8 +442,5 @@ TEST_F(scan_test, ITERATION_TIME_LIMIT)
ASSERT_EQ(-1, count);

// set iteration threshold to 100ms
response = ddl_client_->set_app_envs(
client_->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(100)});
ASSERT_TRUE(response.is_ok());
ASSERT_EQ(dsn::ERR_OK, response.get_value().err);
NO_FATALS(update_table_env({ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(100)}));
}
Loading

0 comments on commit 4dd2b7a

Please sign in to comment.