From f0faa0c64f7a803ca82979368eb84608fb89d916 Mon Sep 17 00:00:00 2001 From: JaySon Date: Fri, 27 Dec 2024 12:00:30 +0800 Subject: [PATCH] *: Refine the error message when schema mismatch in ExchangeReceiver (#9744) ref pingcap/tiflash#9673 Signed-off-by: JaySon-Huang --- dbms/src/Flash/Coprocessor/CodecUtils.cpp | 36 ++--- dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 69 ++++----- dbms/src/Flash/Mpp/ExchangeReceiver.h | 6 +- dbms/src/Server/Server.cpp | 2 - dbms/src/Server/StorageConfigParser.cpp | 7 +- .../src/Storages/DeltaMerge/RestoreDMFile.cpp | 132 +++++++++++------- dbms/src/Storages/DeltaMerge/Segment.cpp | 2 +- dbms/src/Storages/FormatVersion.cpp | 2 +- 8 files changed, 149 insertions(+), 107 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/CodecUtils.cpp b/dbms/src/Flash/Coprocessor/CodecUtils.cpp index c9a5de7fd60..84f0aa5731d 100644 --- a/dbms/src/Flash/Coprocessor/CodecUtils.cpp +++ b/dbms/src/Flash/Coprocessor/CodecUtils.cpp @@ -12,39 +12,39 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include -namespace DB -{ -namespace ErrorCodes +namespace DB::ErrorCodes { extern const int LOGICAL_ERROR; -} // namespace ErrorCodes +} // namespace DB::ErrorCodes -namespace CodecUtils +namespace DB::CodecUtils { void checkColumnSize(const String & identifier, size_t expected, size_t actual) { - if unlikely (expected != actual) + if (unlikely(expected != actual)) throw Exception( - fmt::format("{} schema size mismatch, expected {}, actual {}.", identifier, expected, actual), - ErrorCodes::LOGICAL_ERROR); + ErrorCodes::LOGICAL_ERROR, + "{} schema size mismatch, expected {}, actual {}.", + identifier, + expected, + actual); } void checkDataTypeName(const String & identifier, size_t column_index, const String & expected, const String & actual) { - if unlikely (expected != actual) + if (unlikely(expected != actual)) throw Exception( - fmt::format( - "{} schema mismatch at column {}, expected {}, actual {}", - identifier, - column_index, - expected, - actual), - ErrorCodes::LOGICAL_ERROR); + ErrorCodes::LOGICAL_ERROR, + "{} schema mismatch at column {}, expected {}, actual {}", + identifier, + column_index, + expected, + actual); } -} // namespace CodecUtils -} // namespace DB +} // namespace DB::CodecUtils diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index ad7fbfbc373..10bc0db9d29 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -32,7 +32,6 @@ #include #include #include -#include namespace DB { @@ -816,7 +815,17 @@ ExchangeReceiverResult ExchangeReceiverBase::toExchangeReceiveResult recv_msg->getReqInfo(), recv_msg->getErrorPtr()->msg()); - return toDecodeResult(stream_id, block_queue, header, recv_msg, decoder_ptr); + try + { + return toDecodeResult(stream_id, block_queue, header, recv_msg, decoder_ptr); + } + catch (DB::Exception & e) + { + // Add the MPPTask identifier and exector_id to the error message, make it easier to + // identify the specific stage within a complex query where the error occurs + e.addMessage(fmt::format("{}", exc_log->identifier())); + e.rethrow(); + } } case ReceiveStatus::eof: return handleUnnormalChannel(block_queue, decoder_ptr); @@ -882,42 +891,36 @@ ExchangeReceiverResult ExchangeReceiverBase::toDecodeResult( { assert(recv_msg != nullptr); const auto * resp_ptr = recv_msg->getRespPtr(stream_id); - if (resp_ptr - != nullptr) /// the data of the last packet is serialized from tipb::SelectResponse including execution summaries. - { - auto select_resp = std::make_shared(); - if (unlikely(!select_resp->ParseFromString(*resp_ptr))) - { - return ExchangeReceiverResult::newError(recv_msg->getSourceIndex(), recv_msg->getReqInfo(), "decode error"); - } - else - { - auto result - = ExchangeReceiverResult::newOk(select_resp, recv_msg->getSourceIndex(), recv_msg->getReqInfo()); - /// If mocking TiFlash as TiDB, we should decode chunks from select_resp. - if (unlikely(!result.resp->chunks().empty())) - { - assert(recv_msg->getChunks(stream_id).empty()); - // Fine grained shuffle should only be enabled when sending data to TiFlash node. - // So all data should be encoded into MPPDataPacket.chunks. - RUNTIME_CHECK_MSG( - !enable_fine_grained_shuffle_flag, - "Data should not be encoded into tipb::SelectResponse.chunks when fine grained shuffle is enabled"); - result.decode_detail = CoprocessorReader::decodeChunks(select_resp, block_queue, header, schema); - } - else if (!recv_msg->getChunks(stream_id).empty()) - { - result.decode_detail = decodeChunks(stream_id, recv_msg, block_queue, decoder_ptr); - } - return result; - } - } - else /// the non-last packets + if (resp_ptr == nullptr) { + /// the non-last packets auto result = ExchangeReceiverResult::newOk(nullptr, recv_msg->getSourceIndex(), recv_msg->getReqInfo()); result.decode_detail = decodeChunks(stream_id, recv_msg, block_queue, decoder_ptr); return result; } + + /// the data of the last packet is serialized from tipb::SelectResponse including execution summaries. + auto select_resp = std::make_shared(); + if (unlikely(!select_resp->ParseFromString(*resp_ptr))) + return ExchangeReceiverResult::newError(recv_msg->getSourceIndex(), recv_msg->getReqInfo(), "decode error"); + + auto result = ExchangeReceiverResult::newOk(select_resp, recv_msg->getSourceIndex(), recv_msg->getReqInfo()); + /// If mocking TiFlash as TiDB, we should decode chunks from select_resp. + if (unlikely(!result.resp->chunks().empty())) + { + assert(recv_msg->getChunks(stream_id).empty()); + // Fine grained shuffle should only be enabled when sending data to TiFlash node. + // So all data should be encoded into MPPDataPacket.chunks. + RUNTIME_CHECK_MSG( + !enable_fine_grained_shuffle_flag, + "Data should not be encoded into tipb::SelectResponse.chunks when fine grained shuffle is enabled"); + result.decode_detail = CoprocessorReader::decodeChunks(select_resp, block_queue, header, schema); + } + else if (!recv_msg->getChunks(stream_id).empty()) + { + result.decode_detail = decodeChunks(stream_id, recv_msg, block_queue, decoder_ptr); + } + return result; } template diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index d195b549946..1adc9db1b16 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -20,10 +20,8 @@ #include #include -#include #include #include -#include namespace DB { @@ -36,8 +34,8 @@ struct ExchangeReceiverResult size_t call_index; String req_info; bool meet_error; - String error_msg; bool eof; + String error_msg; DecodeDetail decode_detail; ExchangeReceiverResult() @@ -74,8 +72,8 @@ struct ExchangeReceiverResult , call_index(call_index_) , req_info(req_info_) , meet_error(meet_error_) - , error_msg(error_msg_) , eof(eof_) + , error_msg(error_msg_) {} }; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 43583c5c502..4287b5ddb41 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1021,8 +1021,6 @@ int Server::main(const std::vector & /*args*/) } } - LOG_INFO(log, "Using api_version={}", storage_config.api_version); - // Set whether to use safe point v2. PDClientHelper::enable_safepoint_v2 = config().getBool("enable_safe_point_v2", false); diff --git a/dbms/src/Server/StorageConfigParser.cpp b/dbms/src/Server/StorageConfigParser.cpp index 032abec967b..973736632b5 100644 --- a/dbms/src/Server/StorageConfigParser.cpp +++ b/dbms/src/Server/StorageConfigParser.cpp @@ -258,7 +258,12 @@ void TiFlashStorageConfig::parseMisc(const String & storage_section, const Logge lazily_init_store = get_bool_config_or_default("lazily_init_store", lazily_init_store); - LOG_INFO(log, "format_version {} lazily_init_store {}", format_version, lazily_init_store); + LOG_INFO( + log, + "format_version={} lazily_init_store={} api_version={}", + format_version, + lazily_init_store, + api_version); } Strings TiFlashStorageConfig::getAllNormalPaths() const diff --git a/dbms/src/Storages/DeltaMerge/RestoreDMFile.cpp b/dbms/src/Storages/DeltaMerge/RestoreDMFile.cpp index 44380661ebd..f3fa8059d8d 100644 --- a/dbms/src/Storages/DeltaMerge/RestoreDMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/RestoreDMFile.cpp @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include +#include #include #include #include @@ -29,38 +31,61 @@ DMFilePtr restoreDMFileFromRemoteDataSource( UInt64 file_page_id, UInt64 meta_version) { - auto path_delegate = dm_context.path_pool->getStableDiskDelegator(); - auto wn_ps = dm_context.global_context.getWriteNodePageStorage(); - auto full_page_id = UniversalPageIdFormat::toFullPageId( - UniversalPageIdFormat::toFullPrefix(dm_context.keyspace_id, StorageType::Data, dm_context.physical_table_id), - file_page_id); - auto full_external_id = wn_ps->getNormalPageId(full_page_id); - auto local_external_id = UniversalPageIdFormat::getU64ID(full_external_id); - auto remote_data_location = wn_ps->getCheckpointLocation(full_page_id); - const auto & lock_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)); - auto file_oid = lock_key_view.asDataFile().getDMFileOID(); - auto prepared = remote_data_store->prepareDMFile(file_oid, file_page_id); - auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version); - // gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here - path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk()); + DMFilePtr dmfile; + try + { + auto path_delegate = dm_context.path_pool->getStableDiskDelegator(); + auto wn_ps = dm_context.global_context.getWriteNodePageStorage(); + auto full_page_id = UniversalPageIdFormat::toFullPageId( + UniversalPageIdFormat::toFullPrefix( + dm_context.keyspace_id, + StorageType::Data, + dm_context.physical_table_id), + file_page_id); + auto full_external_id = wn_ps->getNormalPageId(full_page_id); + auto local_external_id = UniversalPageIdFormat::getU64ID(full_external_id); + auto remote_data_location = wn_ps->getCheckpointLocation(full_page_id); + const auto & lock_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)); + auto file_oid = lock_key_view.asDataFile().getDMFileOID(); + auto prepared = remote_data_store->prepareDMFile(file_oid, file_page_id); + dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version); + // gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here + path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk()); + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format("file_page_id={} meta_version={}", file_page_id, meta_version)); + e.rethrow(); + } + assert(dmfile != nullptr); return dmfile; } DMFilePtr restoreDMFileFromLocal(const DMContext & dm_context, UInt64 file_page_id, UInt64 meta_version) { - auto path_delegate = dm_context.path_pool->getStableDiskDelegator(); - auto file_id = dm_context.storage_pool->dataReader()->getNormalPageId(file_page_id); - auto file_parent_path = path_delegate.getDTFilePath(file_id); - auto dmfile = DMFile::restore( - dm_context.global_context.getFileProvider(), - file_id, - file_page_id, - file_parent_path, - DMFileMeta::ReadMode::all(), - meta_version, - dm_context.keyspace_id); - auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk()); - RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path()); + DMFilePtr dmfile; + try + { + auto path_delegate = dm_context.path_pool->getStableDiskDelegator(); + auto file_id = dm_context.storage_pool->dataReader()->getNormalPageId(file_page_id); + auto file_parent_path = path_delegate.getDTFilePath(file_id); + dmfile = DMFile::restore( + dm_context.global_context.getFileProvider(), + file_id, + file_page_id, + file_parent_path, + DMFileMeta::ReadMode::all(), + meta_version, + dm_context.keyspace_id); + auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk()); + RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path()); + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format("file_page_id={} meta_version={}", file_page_id, meta_version)); + e.rethrow(); + } + assert(dmfile != nullptr); return dmfile; } @@ -72,26 +97,39 @@ DMFilePtr restoreDMFileFromCheckpoint( UInt64 file_page_id, UInt64 meta_version) { - auto full_page_id = UniversalPageIdFormat::toFullPageId( - UniversalPageIdFormat::toFullPrefix(dm_context.keyspace_id, StorageType::Data, dm_context.physical_table_id), - file_page_id); - auto remote_data_location = temp_ps->getCheckpointLocation(full_page_id); - auto data_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)).asDataFile(); - auto file_oid = data_key_view.getDMFileOID(); - auto data_key = data_key_view.toFullKey(); - auto delegator = dm_context.path_pool->getStableDiskDelegator(); - auto new_local_page_id = dm_context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); - PS::V3::CheckpointLocation loc{ - .data_file_id = std::make_shared(data_key), - .offset_in_file = 0, - .size_in_file = 0, - }; - wbs.data.putRemoteExternal(new_local_page_id, loc); - auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id); - auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version); - wbs.writeLogAndData(); - // new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here - delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk()); + DMFilePtr dmfile; + try + { + auto full_page_id = UniversalPageIdFormat::toFullPageId( + UniversalPageIdFormat::toFullPrefix( + dm_context.keyspace_id, + StorageType::Data, + dm_context.physical_table_id), + file_page_id); + auto remote_data_location = temp_ps->getCheckpointLocation(full_page_id); + auto data_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)).asDataFile(); + auto file_oid = data_key_view.getDMFileOID(); + auto data_key = data_key_view.toFullKey(); + auto delegator = dm_context.path_pool->getStableDiskDelegator(); + auto new_local_page_id = dm_context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + PS::V3::CheckpointLocation loc{ + .data_file_id = std::make_shared(data_key), + .offset_in_file = 0, + .size_in_file = 0, + }; + wbs.data.putRemoteExternal(new_local_page_id, loc); + auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id); + dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version); + wbs.writeLogAndData(); + // new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here + delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk()); + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format("file_page_id={} meta_version={}", file_page_id, meta_version)); + e.rethrow(); + } + assert(dmfile != nullptr); return dmfile; } diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 0ecdcd8de68..5dcc9604477 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -440,7 +440,7 @@ SegmentPtr Segment::restoreSegment( // } catch (DB::Exception & e) { - e.addMessage(fmt::format("while restoreSegment, segment_id={}", segment_id)); + e.addMessage(fmt::format("while restoreSegment, segment_id={} ident={}", segment_id, parent_log->identifier())); e.rethrow(); } RUNTIME_CHECK_MSG(false, "unreachable"); diff --git a/dbms/src/Storages/FormatVersion.cpp b/dbms/src/Storages/FormatVersion.cpp index cdc068428e5..6b8f9970049 100644 --- a/dbms/src/Storages/FormatVersion.cpp +++ b/dbms/src/Storages/FormatVersion.cpp @@ -49,7 +49,7 @@ const StorageFormatVersion & toStorageFormat(UInt64 setting) case 103: return STORAGE_FORMAT_V103; default: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal setting value: {}", setting); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal format_version value: {}", setting); } } } // namespace