diff --git a/binding.gyp b/binding.gyp index 76580e04..458d69eb 100644 --- a/binding.gyp +++ b/binding.gyp @@ -25,6 +25,7 @@ "native-src/sync_root_interface/callbacks/CancelFetchData/CancelFetchDataCallback.cpp", "native-src/sync_root_interface/callbacks/FetchData/FetchData.cpp", "native-src/sync_root_interface/callbacks/FetchData/FileCopierWithProgress.cpp", + "native-src/sync_root_interface/callbacks/FetchData/TransferContext.cpp", "native-src/sync_root_interface/callbacks/FetchPlaceholder/FetchPlaceholder.cpp", "native-src/sync_root_interface/callbacks/NotifyDelete/NotifyDeleteCallback.cpp", "native-src/sync_root_interface/callbacks/NotifyFileAdded/NotifyFileAddedCallback.cpp", diff --git a/include/sync_root_interface/TransferContext.h b/include/sync_root_interface/TransferContext.h new file mode 100644 index 00000000..dc8ef415 --- /dev/null +++ b/include/sync_root_interface/TransferContext.h @@ -0,0 +1,38 @@ +#pragma once +#include +#include +#include +#include +#include +#include "stdafx.h" +#include +#include "Logger.h" +#include "Utilities.h" +#include "Placeholders.h" +#include "FileCopierWithProgress.h" + +struct TransferContext { + CF_CONNECTION_KEY connectionKey; + CF_TRANSFER_KEY transferKey; + LARGE_INTEGER fileSize; + LARGE_INTEGER requiredLength; + LARGE_INTEGER requiredOffset; + CF_CALLBACK_INFO callbackInfo; + std::wstring fullClientPath; + + size_t lastReadOffset = 0; + size_t lastSize = 0; + bool loadFinished = false; + + std::mutex mtx; + std::condition_variable cv; + bool ready = false; + + std::wstring fullServerFilePath; +}; + +std::shared_ptr GetOrCreateTransferContext( + CF_CONNECTION_KEY connKey, + CF_TRANSFER_KEY transferKey); + +void RemoveTransferContext(CF_TRANSFER_KEY transferKey); diff --git a/native-src/sync_root_interface/callbacks/FetchData/FetchData.cpp b/native-src/sync_root_interface/callbacks/FetchData/FetchData.cpp index 89abfb66..0a5c987b 100644 --- a/native-src/sync_root_interface/callbacks/FetchData/FetchData.cpp +++ b/native-src/sync_root_interface/callbacks/FetchData/FetchData.cpp @@ -7,7 +7,7 @@ #include #include #include -#include // para std::pai +#include #include #include #include @@ -18,6 +18,7 @@ #include #include #include +#include napi_threadsafe_function g_fetch_data_threadsafe_callback = nullptr; @@ -25,12 +26,8 @@ inline std::mutex mtx; inline std::mutex mtx_download; inline std::condition_variable cv; inline std::condition_variable cv_download; -inline bool ready = false; inline bool ready_download = false; inline bool callbackResult = false; -inline std::wstring fullServerFilePath; - -inline size_t lastSize; #define FIELD_SIZE(type, field) (sizeof(((type *)0)->field)) @@ -42,16 +39,8 @@ inline size_t lastSize; #define CHUNKDELAYMS 250 -CF_CONNECTION_KEY connectionKey; -CF_TRANSFER_KEY transferKey; -LARGE_INTEGER fileSize; -LARGE_INTEGER requiredLength; -LARGE_INTEGER requiredOffset; -CF_CALLBACK_INFO g_callback_info; std::wstring g_full_client_path; -inline bool load_finished = false; -inline size_t lastReadOffset = 0; struct FetchDataArgs { std::wstring fileIdentityArg; @@ -108,302 +97,300 @@ std::string WStringToString(const std::wstring &wstr) } } -size_t file_incremental_reading(napi_env env, const std::wstring &filename, size_t &dataSizeRead, bool final_step, float &progress, napi_value error_callback = nullptr) +static size_t file_incremental_reading(napi_env env, + TransferContext &ctx, + bool final_step, + float &progress) { std::ifstream file; + file.open(ctx.fullServerFilePath, std::ios::in | std::ios::binary); - printf("filename: %s\n", filename.c_str()); - file.open(filename, std::ios::in | std::ios::binary); - - if (!file.is_open()) - { - Logger::getInstance().log("Error al abrir el archivo.", LogLevel::ERROR); - return dataSizeRead; // Retorna el dataSizeRead sin cambios + if (!file.is_open()) { + Logger::getInstance().log("Error al abrir el archivo en file_incremental_reading.", LogLevel::ERROR); + return ctx.lastReadOffset; } - file.clear(); // Limpia los flags de error/end-of-file - file.seekg(0, std::ios::end); // Va al final del archivo - size_t newSize = file.tellg(); - - size_t datasizeAvailableUnread = newSize - dataSizeRead; - - size_t growth = newSize - lastSize; + file.clear(); + file.seekg(0, std::ios::end); + size_t newSize = static_cast(file.tellg()); - try - { + size_t datasizeAvailableUnread = newSize - ctx.lastReadOffset; + size_t growth = newSize - ctx.lastSize; - if ((datasizeAvailableUnread > 0)) // && CHUNK_SIZE < datasizeAvailableUnread && !final_step) || (datasizeAvailableUnread > 0 && final_step) - { // Si el archivo ha crecido + try { + if (datasizeAvailableUnread > 0) { std::vector buffer(CHUNK_SIZE); - file.seekg(dataSizeRead); + file.seekg(ctx.lastReadOffset); file.read(buffer.data(), CHUNK_SIZE); - LARGE_INTEGER startingOffset, length; - - startingOffset.QuadPart = dataSizeRead; + LARGE_INTEGER startingOffset, chunkBufferSize; + startingOffset.QuadPart = ctx.lastReadOffset; - LARGE_INTEGER chunkBufferSize; chunkBufferSize.QuadPart = min(datasizeAvailableUnread, CHUNK_SIZE); HRESULT hr = FileCopierWithProgress::TransferData( - connectionKey, - transferKey, + ctx.connectionKey, + ctx.transferKey, buffer.data(), startingOffset, chunkBufferSize, STATUS_SUCCESS); - dataSizeRead += chunkBufferSize.QuadPart; + ctx.lastReadOffset += chunkBufferSize.QuadPart; - if (FAILED(hr)) - { - wprintf(L"Error in FileCopierWithProgress::TransferData(), HRESULT: %lx\n", hr); - load_finished = true; - - HRESULT hr = FileCopierWithProgress::TransferData( - connectionKey, - transferKey, + if (FAILED(hr)) { + wprintf(L"Error en TransferData(). HRESULT: %lx\n", hr); + ctx.loadFinished = true; + FileCopierWithProgress::TransferData( + ctx.connectionKey, + ctx.transferKey, NULL, - requiredOffset, - requiredLength, + ctx.requiredOffset, + ctx.requiredLength, STATUS_UNSUCCESSFUL); - } - else - { - UINT64 totalSize = static_cast(fileSize.QuadPart); - progress = static_cast(dataSizeRead) / static_cast(totalSize); - Utilities::ApplyTransferStateToFile(g_full_client_path.c_str(), g_callback_info, totalSize, dataSizeRead); - Sleep(CHUNKDELAYMS); + } else { + UINT64 totalSize = static_cast(ctx.fileSize.QuadPart); + progress = static_cast(ctx.lastReadOffset) / static_cast(totalSize); + Utilities::ApplyTransferStateToFile(ctx.fullClientPath.c_str(), + ctx.callbackInfo, + totalSize, + ctx.lastReadOffset); + ::Sleep(CHUNKDELAYMS); } } - } - catch (...) - { - Logger::getInstance().log("Error to read file.", LogLevel::ERROR); - HRESULT hr = FileCopierWithProgress::TransferData( - connectionKey, - transferKey, + } catch (...) { + Logger::getInstance().log("Excepción en file_incremental_reading.", LogLevel::ERROR); + FileCopierWithProgress::TransferData( + ctx.connectionKey, + ctx.transferKey, NULL, - requiredOffset, - requiredLength, + ctx.requiredOffset, + ctx.requiredLength, STATUS_UNSUCCESSFUL); } file.close(); - lastSize = newSize; - return dataSizeRead; + ctx.lastSize = newSize; + return ctx.lastReadOffset; } -napi_value response_callback_fn_fetch_data(napi_env env, napi_callback_info info) +static napi_value response_callback_fn_fetch_data(napi_env env, napi_callback_info info) { Logger::getInstance().log("response_callback_fn_fetch_data called", LogLevel::DEBUG); + size_t argc = 3; napi_value argv[3]; - napi_get_cb_info(env, info, &argc, argv, nullptr, nullptr); + napi_status status = napi_get_cb_info(env, info, &argc, argv, nullptr, nullptr); + if (status != napi_ok) { + Logger::getInstance().log("Failed to get callback info", LogLevel::ERROR); + return create_response(env, true, 0); + } - if (argc < 2) - { + if (argc < 2) { Logger::getInstance().log("This function must receive at least two arguments", LogLevel::ERROR); - load_finished = true; - { - std::lock_guard lock(mtx); - - if (load_finished) - { - ready = true; - cv.notify_one(); - } - } return create_response(env, true, 0); } napi_valuetype valueType; - - // Verificar el primer argumento: debería ser un booleano - napi_typeof(env, argv[0], &valueType); - if (valueType != napi_boolean) - { + status = napi_typeof(env, argv[0], &valueType); + if (status != napi_ok || valueType != napi_boolean) { Logger::getInstance().log("First argument should be boolean", LogLevel::ERROR); - load_finished = true; - { - std::lock_guard lock(mtx); - - if (load_finished) - { - ready = true; - cv.notify_one(); - } - } - return create_response(env, true, 0); } - bool response; + bool response = false; napi_get_value_bool(env, argv[0], &response); - napi_typeof(env, argv[1], &valueType); - if (valueType != napi_string) - { + status = napi_typeof(env, argv[1], &valueType); + if (status != napi_ok || valueType != napi_string) { Logger::getInstance().log("Second argument should be string", LogLevel::ERROR); - load_finished = true; - { - std::lock_guard lock(mtx); - - if (load_finished) - { - ready = true; - cv.notify_one(); - } - } + return create_response(env, true, 0); + } + TransferContext* ctxPtr = nullptr; + napi_value thisArg = nullptr; + status = napi_get_cb_info(env, info, nullptr, nullptr, &thisArg, reinterpret_cast(&ctxPtr)); + if (status != napi_ok || !ctxPtr) { + Logger::getInstance().log("Could not retrieve TransferContext from callback data", LogLevel::ERROR); return create_response(env, true, 0); } - // si el primer argumento es false liberamos mutex - if (!response) - { - Logger::getInstance().log("Response is false", LogLevel::DEBUG); - load_finished = true; - lastReadOffset = 0; - { - std::lock_guard lock(mtx); + if (!response) { + Logger::getInstance().log("JS responded with false; we cancel hydration.", LogLevel::DEBUG); - if (load_finished) - { - ready = true; - cv.notify_one(); - } + ctxPtr->loadFinished = true; + ctxPtr->lastReadOffset = 0; + { + std::lock_guard lock(ctxPtr->mtx); + ctxPtr->ready = true; + ctxPtr->cv.notify_one(); } return create_response(env, true, 0); } - callbackResult = response; - - size_t response_len; + size_t response_len = 0; napi_get_value_string_utf16(env, argv[1], nullptr, 0, &response_len); std::wstring response_wstr(response_len, L'\0'); + napi_get_value_string_utf16(env, argv[1], (char16_t*)response_wstr.data(), response_len + 1, &response_len); + Logger::getInstance().log( + "JS responded with server file path = " + Logger::fromWStringToString(response_wstr), + LogLevel::DEBUG + ); + + ctxPtr->fullServerFilePath = response_wstr; + + float progress = 0.0f; + ctxPtr->lastReadOffset = file_incremental_reading(env, *ctxPtr, /*final_step=*/false, progress); + + if (ctxPtr->lastReadOffset == (size_t)ctxPtr->fileSize.QuadPart) { + Logger::getInstance().log("File fully read.", LogLevel::DEBUG); + ctxPtr->lastReadOffset = 0; + ctxPtr->loadFinished = true; + + Utilities::ApplyTransferStateToFile( + ctxPtr->fullClientPath.c_str(), + ctxPtr->callbackInfo, + ctxPtr->fileSize.QuadPart, + ctxPtr->fileSize.QuadPart + ); + + ::Sleep(CHUNKDELAYMS); + Placeholders::UpdatePinState(ctxPtr->fullClientPath.c_str(), PinState::AlwaysLocal); + } - napi_get_value_string_utf16(env, argv[1], (char16_t *)response_wstr.data(), response_len + 1, &response_len); - Logger::getInstance().log("response_wstr: " + Logger::fromWStringToString(response_wstr), LogLevel::DEBUG); - if (argc == 3) { - napi_valuetype callbackType; - napi_typeof(env, argv[2], &callbackType); - if (callbackType != napi_function) - { - Logger::getInstance().log("Third argument should be a function", LogLevel::ERROR); - load_finished = true; - return create_response(env, true, 0); + std::lock_guard lock(ctxPtr->mtx); + if (ctxPtr->loadFinished) { + ctxPtr->ready = true; + ctxPtr->cv.notify_one(); } } - fullServerFilePath = response_wstr; + Logger::getInstance().log( + "fetch data => finished: " + std::to_string(ctxPtr->loadFinished) + + ", progress: " + std::to_string(progress), + LogLevel::DEBUG + ); - float progress; - Logger::getInstance().log("incremental reading", LogLevel::DEBUG); - lastReadOffset = file_incremental_reading(env, fullServerFilePath, lastReadOffset, false, progress, argc == 3 ? argv[2] : nullptr); + return create_response(env, ctxPtr->loadFinished, progress); +} - std::wstring file_path = fullServerFilePath.c_str(); - std::ifstream file(file_path, std::ios::binary); +static napi_value create_error_response(napi_env env) +{ + Logger::getInstance().log("An error occurred during callback execution", LogLevel::ERROR); + return create_response(env, true, 0); +} - if (!file) +static void handle_cancellation(TransferContext* ctxPtr) +{ + ctxPtr->loadFinished = true; + ctxPtr->lastReadOffset = 0; { - Logger::getInstance().log("This file couldn't be opened in realtime.", LogLevel::WARN); - // load_finished = true; - // return create_response(env, true, 0); + std::lock_guard lock(ctxPtr->mtx); + ctxPtr->ready = true; + ctxPtr->cv.notify_one(); } +} - file.seekg(0, std::ios::end); - LONG total_size = file.tellg(); - file.seekg(0, std::ios::beg); +static void notify_completion(TransferContext* ctxPtr, float progress) +{ + std::lock_guard lock(ctxPtr->mtx); + if (ctxPtr->loadFinished) { + ctxPtr->ready = true; + ctxPtr->cv.notify_one(); + } +} - file.close(); - Logger::getInstance().log("Total size: " + std::to_string(total_size), LogLevel::DEBUG); - Logger::getInstance().log("Last read offset: " + std::to_string(lastReadOffset), LogLevel::DEBUG); - Logger::getInstance().log("fileSize: " + std::to_string(fileSize.QuadPart), LogLevel::DEBUG); +static void notify_fetch_data_call(napi_env env, napi_value js_callback, void *context, void *data) +{ + Logger::getInstance().log("notify_fetch_data_call called context isolated", LogLevel::DEBUG); + napi_status status; + TransferContext *ctx = static_cast(data); + Logger::getInstance().log("notify_fetch_data_call: ctx->fullClientPath = " + Logger::fromWStringToString(ctx->fullClientPath), LogLevel::DEBUG); - if (lastReadOffset == fileSize.QuadPart) + std::wstring fileIdentityWstr; { - Logger::getInstance().log("File has been fully read.", LogLevel::DEBUG); - lastReadOffset = 0; - load_finished = true; - Utilities::ApplyTransferStateToFile(g_full_client_path.c_str(), g_callback_info, fileSize.QuadPart, fileSize.QuadPart); - Sleep(CHUNKDELAYMS); - Placeholders::UpdatePinState(g_full_client_path.c_str(), PinState::AlwaysLocal); - }; - - napi_value resultBool; - napi_get_boolean(env, load_finished, &resultBool); - - napi_value progress_value; - napi_create_double(env, progress, &progress_value); - - Logger::getInstance().log("fetch data result: " + std::to_string(load_finished) + " " + std::to_string(progress) + " " + Logger::fromWStringToString(fullServerFilePath), LogLevel::DEBUG); - + const wchar_t *wchar_ptr = static_cast(ctx->callbackInfo.FileIdentity); + DWORD fileIdentityLength = ctx->callbackInfo.FileIdentityLength / sizeof(wchar_t); + fileIdentityWstr.assign(wchar_ptr, fileIdentityLength); + } + napi_value js_fileIdentityArg; { - std::lock_guard lock(mtx); - - if (load_finished) - { - ready = true; - cv.notify_one(); - } + std::u16string u16_fileIdentity(fileIdentityWstr.begin(), fileIdentityWstr.end()); + napi_create_string_utf16(env, + u16_fileIdentity.c_str(), + u16_fileIdentity.size(), + &js_fileIdentityArg); } - return create_response(env, load_finished, progress); -} - -void notify_fetch_data_call(napi_env env, napi_value js_callback, void *context, void *data) -{ - napi_status status; - FetchDataArgs *args = static_cast(data); - napi_value js_fileIdentityArg, undefined, result; - - std::u16string u16_fileIdentity(args->fileIdentityArg.begin(), args->fileIdentityArg.end()); - - napi_create_string_utf16(env, u16_fileIdentity.c_str(), u16_fileIdentity.size(), &js_fileIdentityArg); napi_value js_response_callback_fn; - napi_create_function(env, "responseCallback", NAPI_AUTO_LENGTH, response_callback_fn_fetch_data, nullptr, &js_response_callback_fn); + napi_create_function(env, + "responseCallback", + NAPI_AUTO_LENGTH, + response_callback_fn_fetch_data, + ctx, + &js_response_callback_fn); - napi_value args_to_js_callback[2] = {js_fileIdentityArg, js_response_callback_fn}; + napi_value args_to_js_callback[2] = { js_fileIdentityArg, js_response_callback_fn }; + Logger::getInstance().log("notify_fetch_data_call: calling JS function", LogLevel::DEBUG); + napi_value undefined, result; status = napi_get_undefined(env, &undefined); - if (status != napi_ok) - { - Logger::getInstance().log("Failed to get undefined value.\n", LogLevel::ERROR); + if (status != napi_ok) { + Logger::getInstance().log("Failed to get undefined in notify_fetch_data_call.", LogLevel::ERROR); return; } - std::unique_lock lock(mtx); - ready = false; - - status = napi_call_function(env, undefined, js_callback, 2, args_to_js_callback, &result); - if (status != napi_ok) + Logger::getInstance().log("notify_fetch_data_call: setting ctx->ready to false", LogLevel::DEBUG); { - Logger::getInstance().log("Failed to call JS function.", LogLevel::ERROR); - Logger::getInstance().log("Failed to call JS function in fetchData.", LogLevel::ERROR); + Logger::getInstance().log("notify_fetch_data_call: locking ctx->mtx", LogLevel::DEBUG); + std::unique_lock lock(ctx->mtx); + ctx->ready = false; + } + + status = napi_call_function(env, + undefined, + js_callback, + 2, + args_to_js_callback, + &result); + if (status != napi_ok) { + Logger::getInstance().log("Failed to call JS function in notify_fetch_data_call.", LogLevel::ERROR); return; } - delete args; + Logger::getInstance().log("Hydration concluded or user signaled to finish in notify_fetch_data_call.", LogLevel::INFO); + + ctx->lastReadOffset = 0; + ctx->loadFinished = false; + ctx->ready = false; + + // RemoveTransferContext(ctx->transferKey); } + void register_threadsafe_fetch_data_callback(const std::string &resource_name, napi_env env, InputSyncCallbacks input) { + Logger::getInstance().log("register_threadsafe_fetch_data_callback called", LogLevel::DEBUG); std::u16string converted_resource_name = std::u16string(resource_name.begin(), resource_name.end()); napi_value resource_name_value; - napi_create_string_utf16(env, converted_resource_name.c_str(), NAPI_AUTO_LENGTH, &resource_name_value); + napi_create_string_utf16(env, + converted_resource_name.c_str(), + NAPI_AUTO_LENGTH, + &resource_name_value); napi_threadsafe_function tsfn_fetch_data; napi_value fetch_data_value; napi_status status_ref = napi_get_reference_value(env, input.fetch_data_callback_ref, &fetch_data_value); + Logger::getInstance().log("status_ref: " + std::to_string(status_ref), LogLevel::DEBUG); + napi_status status = napi_create_threadsafe_function( env, fetch_data_value, @@ -417,71 +404,60 @@ void register_threadsafe_fetch_data_callback(const std::string &resource_name, n notify_fetch_data_call, &tsfn_fetch_data); - if (status != napi_ok) - { - Logger::getInstance().log("Failed to create threadsafe function.\n", LogLevel::ERROR); + if (status != napi_ok) { + Logger::getInstance().log("Failed to create threadsafe function (fetch_data).", LogLevel::ERROR); return; } - setup_global_tsfn_fetch_data(tsfn_fetch_data); + + Logger::getInstance().log("Threadsafe function (fetch_data) created successfully.", LogLevel::DEBUG); + + g_fetch_data_threadsafe_callback = tsfn_fetch_data; } void CALLBACK fetch_data_callback_wrapper( _In_ CONST CF_CALLBACK_INFO *callbackInfo, _In_ CONST CF_CALLBACK_PARAMETERS *callbackParameters) { - connectionKey = callbackInfo->ConnectionKey; - transferKey = callbackInfo->TransferKey; - fileSize = callbackInfo->FileSize; - requiredLength = callbackParameters->FetchData.RequiredLength; - requiredOffset = callbackParameters->FetchData.RequiredFileOffset; - g_callback_info = *callbackInfo; - + Logger::getInstance().log("fetch_data_callback_wrapper called", LogLevel::DEBUG); + + auto ctx = GetOrCreateTransferContext(callbackInfo->ConnectionKey, callbackInfo->TransferKey); + + ctx->fileSize = callbackInfo->FileSize; + ctx->requiredLength = callbackParameters->FetchData.RequiredLength; + ctx->requiredOffset = callbackParameters->FetchData.RequiredFileOffset; + ctx->callbackInfo = *callbackInfo; std::wstring fullClientPath(callbackInfo->VolumeDosName); fullClientPath.append(callbackInfo->NormalizedPath); + ctx->fullClientPath = fullClientPath; - g_full_client_path = fullClientPath; - - Logger::getInstance().log("Full download path: " + Logger::fromWStringToString(fullClientPath), LogLevel::INFO); - - LPCVOID fileIdentity = callbackInfo->FileIdentity; - DWORD fileIdentityLength = callbackInfo->FileIdentityLength; + Logger::getInstance().log("Full download path: " + + Logger::fromWStringToString(fullClientPath), + LogLevel::INFO); - const wchar_t *wchar_ptr = static_cast(fileIdentity); - std::wstring fileIdentityStr(wchar_ptr, fileIdentityLength / sizeof(wchar_t)); - - FetchDataArgs *args = new FetchDataArgs(); - args->fileIdentityArg = fileIdentityStr; - wprintf(L"Callback fetch_data_callback_wrapper called\n"); - wprintf(L"g_fetch_data_threadsafe_callback = %s\n", g_fetch_data_threadsafe_callback); - if (g_fetch_data_threadsafe_callback == nullptr) - { - wprintf(L"Callback fetch_data_callback_wrapper called but g_fetch_data_threadsafe_callback is null\n"); + if (g_fetch_data_threadsafe_callback == nullptr) { + Logger::getInstance().log("fetch_data_callback_wrapper: g_fetch_data_threadsafe_callback is null", + LogLevel::ERROR); return; } - napi_status status = napi_call_threadsafe_function(g_fetch_data_threadsafe_callback, args, napi_tsfn_blocking); - if (status != napi_ok) - { - Logger::getInstance().log("Callback called unsuccessfully.\n", LogLevel::ERROR); - }; + napi_status status = napi_call_threadsafe_function( + g_fetch_data_threadsafe_callback, + ctx.get(), + napi_tsfn_blocking); + if (status != napi_ok) { + Logger::getInstance().log("Callback called unsuccessfully in fetch_data_callback_wrapper.", LogLevel::ERROR); + } - Logger::getInstance().log("log 4.\n", LogLevel::DEBUG); + Logger::getInstance().log("fetch_data_callback_wrapper after napi_call_threadsafe_function", LogLevel::DEBUG); { - std::unique_lock lock(mtx); - Logger::getInstance().log("Mutex await call.\n", LogLevel::DEBUG); - while (!ready) - { - cv.wait(lock); + std::unique_lock lock(ctx->mtx); + while (!ctx->ready) { + ctx->cv.wait(lock); } } + + Logger::getInstance().log("Hydration finish in fetch_data_callback_wrapper", LogLevel::INFO); - Logger::getInstance().log("Hydration Finish\n", LogLevel::INFO); - - // DownloadMutexManager &mutexManager = DownloadMutexManager::getInstance(); - // mutexManager.setReady(true); - - lastReadOffset = 0; - load_finished = false; - ready = false; // Reset ready -} \ No newline at end of file + RemoveTransferContext(ctx->transferKey); +} diff --git a/native-src/sync_root_interface/callbacks/FetchData/TransferContext.cpp b/native-src/sync_root_interface/callbacks/FetchData/TransferContext.cpp new file mode 100644 index 00000000..ac832ff9 --- /dev/null +++ b/native-src/sync_root_interface/callbacks/FetchData/TransferContext.cpp @@ -0,0 +1,37 @@ +#include "TransferContext.h" + +struct CfTransferKeyLess { + bool operator()(const CF_TRANSFER_KEY &a, const CF_TRANSFER_KEY &b) const + { + return a.QuadPart < b.QuadPart; + } +}; + +static std::map, CfTransferKeyLess> g_transferContextMap; + + +static std::mutex g_contextMapMutex; + +std::shared_ptr GetOrCreateTransferContext( + CF_CONNECTION_KEY connKey, + CF_TRANSFER_KEY transferKey) +{ + std::lock_guard lock(g_contextMapMutex); + + auto it = g_transferContextMap.find(transferKey); + if (it != g_transferContextMap.end()) { + return it->second; + } + + auto ctx = std::make_shared(); + ctx->connectionKey = connKey; + ctx->transferKey = transferKey; + g_transferContextMap[transferKey] = ctx; + return ctx; +} + +void RemoveTransferContext(CF_TRANSFER_KEY transferKey) +{ + std::lock_guard lock(g_contextMapMutex); + g_transferContextMap.erase(transferKey); +}