diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 12549dc31c7..b67539352ae 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1615,8 +1615,14 @@ IOStatus DBImpl::SyncWalImpl(bool include_current_wal, for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to_number; ++it) { auto& log = *it; + // Ensure the head of logs_ is marked as getting_synced if any is. log.PrepareForSync(); - wals_to_sync.push_back(log.writer); + // If last sync failed on a later WAL, this could be a fully synced + // and closed WAL that just needs to be recorded as synced in the + // manifest. + if (log.writer->file()) { + wals_to_sync.push_back(log.writer); + } } need_wal_dir_sync = !log_dir_synced_; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 6e6671f8a82..7ecf53b40a9 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1722,8 +1722,11 @@ class DBImpl : public DB { return w; } Status ClearWriter() { - // TODO: plumb Env::IOActivity, Env::IOPriority - Status s = writer->WriteBuffer(WriteOptions()); + Status s; + if (writer->file()) { + // TODO: plumb Env::IOActivity, Env::IOPriority + s = writer->WriteBuffer(WriteOptions()); + } delete writer; writer = nullptr; return s; @@ -1738,10 +1741,16 @@ class DBImpl : public DB { void PrepareForSync() { assert(!getting_synced); - // Size is expected to be monotonically increasing. - assert(writer->file()->GetFlushedSize() >= pre_sync_size); + // Ensure the head of logs_ is marked as getting_synced if any is. getting_synced = true; - pre_sync_size = writer->file()->GetFlushedSize(); + // If last sync failed on a later WAL, this could be a fully synced + // and closed WAL that just needs to be recorded as synced in the + // manifest. + if (writer->file()) { + // Size is expected to be monotonically increasing. + assert(writer->file()->GetFlushedSize() >= pre_sync_size); + pre_sync_size = writer->file()->GetFlushedSize(); + } } void FinishSync() { diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index a4976c3ed35..494298be7e6 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -14,6 +14,7 @@ #include "port/stack_trace.h" #include "rocksdb/file_system.h" #include "test_util/sync_point.h" +#include "util/defer.h" #include "util/udt_util.h" #include "utilities/fault_injection_env.h" #include "utilities/fault_injection_fs.h" @@ -1471,6 +1472,93 @@ TEST_F(DBWALTest, SyncMultipleLogs) { ASSERT_OK(dbfull()->SyncWAL()); } +TEST_F(DBWALTest, SyncWalPartialFailure) { + class MyTestFileSystem : public FileSystemWrapper { + public: + explicit MyTestFileSystem(std::shared_ptr base) + : FileSystemWrapper(std::move(base)) {} + + static const char* kClassName() { return "MyTestFileSystem"; } + const char* Name() const override { return kClassName(); } + IOStatus NewWritableFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override { + IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg); + if (s.ok()) { + *result = + std::make_unique(std::move(*result), *this); + } + return s; + } + + AcqRelAtomic syncs_before_failure_{UINT32_MAX}; + + protected: + class MyTestWritableFile : public FSWritableFileOwnerWrapper { + public: + MyTestWritableFile(std::unique_ptr&& file, + MyTestFileSystem& fs) + : FSWritableFileOwnerWrapper(std::move(file)), fs_(fs) {} + + IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override { + int prev_val = fs_.syncs_before_failure_.FetchSub(1); + if (prev_val == 0) { + return IOStatus::IOError("fault"); + } else { + return target()->Sync(options, dbg); + } + } + + protected: + MyTestFileSystem& fs_; + }; + }; + + Options options = CurrentOptions(); + options.max_write_buffer_number = 4; + options.track_and_verify_wals_in_manifest = true; + options.max_bgerror_resume_count = 0; // manual resume + + auto custom_fs = + std::make_shared(options.env->GetFileSystem()); + std::unique_ptr fault_fs_env(NewCompositeEnv(custom_fs)); + options.env = fault_fs_env.get(); + Reopen(options); + Defer closer([this]() { Close(); }); + + // This is the simplest way to get + // * one inactive WAL, synced + // * one inactive WAL, not synced, and + // * one active WAL, not synced + // with a single thread, to exercise as much logic as we reasonably can. + ASSERT_OK(static_cast_with_check(db_)->PauseBackgroundWork()); + ASSERT_OK(Put("key1", "val1")); + ASSERT_OK(static_cast_with_check(db_)->TEST_SwitchMemtable()); + ASSERT_OK(db_->SyncWAL()); + ASSERT_OK(Put("key2", "val2")); + ASSERT_OK(static_cast_with_check(db_)->TEST_SwitchMemtable()); + ASSERT_OK(Put("key3", "val3")); + + // Allow 1 of the WALs to sync, but another won't + custom_fs->syncs_before_failure_.Store(1); + ASSERT_NOK(db_->SyncWAL()); + + // Stuck in this state. (This could previously cause a segfault.) + ASSERT_NOK(db_->SyncWAL()); + + // Can't Resume because WAL write failure is considered non-recoverable, + // regardless of the IOStatus itself. (Can/should be fixed?) + ASSERT_NOK(db_->Resume()); + + // Verify no data loss after reopen. + // Also Close() could previously crash in this state. + Reopen(options); + ASSERT_EQ("val1", Get("key1")); + ASSERT_EQ("val2", Get("key2")); + ASSERT_EQ("val3", Get("key3")); +} + // Github issue 1339. Prior the fix we read sequence id from the first log to // a local variable, then keep increase the variable as we replay logs, // ignoring actual sequence id of the records. This is incorrect if some writes