Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified dist/addon.node
Binary file not shown.
13 changes: 5 additions & 8 deletions include/sync_root_interface/TransferContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@
#include <string>
#include "stdafx.h"
#include <cfapi.h>
#include "Logger.h"
#include "Utilities.h"
#include "Placeholders.h"
#include "FileCopierWithProgress.h"

struct TransferContext {
struct TransferContext
{
CF_CONNECTION_KEY connectionKey;
CF_TRANSFER_KEY transferKey;
LARGE_INTEGER fileSize;
LARGE_INTEGER requiredLength;
LARGE_INTEGER requiredOffset;
CF_CALLBACK_INFO callbackInfo;
std::wstring fullClientPath;
std::wstring path;

size_t lastReadOffset = 0;
size_t lastSize = 0;
Expand All @@ -31,8 +30,6 @@ struct TransferContext {
std::wstring fullServerFilePath;
};

std::shared_ptr<TransferContext> GetOrCreateTransferContext(
CF_CONNECTION_KEY connKey,
CF_TRANSFER_KEY transferKey);

std::shared_ptr<TransferContext> CreateTransferContext(CF_TRANSFER_KEY transferKey);
std::shared_ptr<TransferContext> GetTransferContext(CF_TRANSFER_KEY transferKey);
void RemoveTransferContext(CF_TRANSFER_KEY transferKey);
Original file line number Diff line number Diff line change
@@ -1,57 +1,50 @@
#include "stdafx.h"
#include <Callbacks.h>
#include <Logger.h>
#include <cfapi.h>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <filesystem>
#include <TransferContext.h>

napi_threadsafe_function g_cancel_fetch_data_threadsafe_callback = nullptr;

struct CallbackContext {
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
};

struct CancelFetchDataArgs {
std::wstring fileIdentityArg;
CallbackContext* context;

CancelFetchDataArgs(const std::wstring& fileId, CallbackContext* ctx)
: fileIdentityArg(fileId), context(ctx) {}
};

void notify_cancel_fetch_data_call(napi_env env, napi_value js_callback, void *context, void *data)
{
CancelFetchDataArgs *args = static_cast<CancelFetchDataArgs *>(data);
std::u16string u16_fileIdentity(args->fileIdentityArg.begin(), args->fileIdentityArg.end());
std::unique_ptr<std::wstring> path(static_cast<std::wstring *>(data));

napi_value js_string;
napi_create_string_utf16(env, u16_fileIdentity.c_str(), u16_fileIdentity.size(), &js_string);
napi_value js_path;
napi_create_string_utf16(env, (char16_t *)path->c_str(), path->length(), &js_path);

napi_value args_to_js_callback_cancel_fetch[1] = {js_string};
std::array<napi_value, 1> args_to_js_callback = {js_path};

napi_value undefined;
napi_get_undefined(env, &undefined);
napi_value result;
napi_call_function(env, undefined, js_callback, 1, args_to_js_callback.data(), nullptr);
}

Logger::getInstance().log("Executed to call JS function in cancelFetchCallback.", LogLevel::ERROR);
napi_status status = napi_call_function(env, undefined, js_callback, 1, args_to_js_callback_cancel_fetch, &result);
if (status != napi_ok)
{
fprintf(stderr, "Failed to call JS function.\n");
Logger::getInstance().log("Failed to call JS function in cancelFetchCallback.", LogLevel::ERROR);
}
void CALLBACK cancel_fetch_data_callback_wrapper(_In_ CONST CF_CALLBACK_INFO *callbackInfo, _In_ CONST CF_CALLBACK_PARAMETERS *)
{
wprintf(L"ConnectionKey: %lld, TransferKey: %lld\n",
std::bit_cast<long long>(callbackInfo->ConnectionKey),
callbackInfo->TransferKey.QuadPart);

auto ctx = GetTransferContext(callbackInfo->TransferKey);

if (!ctx)
return;

auto path = std::make_unique<std::wstring>(ctx->path);
wprintf(L"Cancel fetch data path: %s\n", path->c_str());

{
std::lock_guard<std::mutex> lock(args->context->mtx);
args->context->ready = true;
std::scoped_lock lock(ctx->mtx);
ctx->ready = true;
}

args->context->cv.notify_one();
delete args;
ctx->cv.notify_one();

napi_call_threadsafe_function(g_cancel_fetch_data_threadsafe_callback, path.release(), napi_tsfn_blocking);
}

void register_threadsafe_cancel_fetch_data_callback(const std::string &resource_name, napi_env env, InputSyncCallbacks input)
Expand All @@ -61,11 +54,11 @@ void register_threadsafe_cancel_fetch_data_callback(const std::string &resource_
napi_value resource_name_value;
napi_create_string_utf16(env, converted_resource_name.c_str(), NAPI_AUTO_LENGTH, &resource_name_value);

napi_threadsafe_function tsfn_cancel_fetch_data;
napi_value cancel_fetch_data_value;
napi_get_reference_value(env, input.cancel_fetch_data_callback_ref, &cancel_fetch_data_value);

napi_status status = napi_create_threadsafe_function(
napi_threadsafe_function tsfn_cancel_fetch_data;
napi_create_threadsafe_function(
env,
cancel_fetch_data_value,
NULL,
Expand All @@ -78,46 +71,5 @@ void register_threadsafe_cancel_fetch_data_callback(const std::string &resource_
notify_cancel_fetch_data_call,
&tsfn_cancel_fetch_data);

if (status != napi_ok)
{
napi_throw_error(env, nullptr, "Failed to create cancel fetch data threadsafe function");
return;
}

g_cancel_fetch_data_threadsafe_callback = tsfn_cancel_fetch_data;
}

void CALLBACK cancel_fetch_data_callback_wrapper(
_In_ CONST CF_CALLBACK_INFO *callbackInfo,
_In_ CONST CF_CALLBACK_PARAMETERS *callbackParameters)
{
printf("cancel_fetch_data_callback_wrapper called\n");

LPCVOID fileIdentity = callbackInfo->FileIdentity;
DWORD fileIdentityLength = callbackInfo->FileIdentityLength;

const wchar_t *wchar_ptr = static_cast<const wchar_t *>(fileIdentity);
std::wstring fileIdentityStr(wchar_ptr, fileIdentityLength / sizeof(wchar_t));

if (g_cancel_fetch_data_threadsafe_callback == nullptr)
{
wprintf(L"Callback fetch_data_callback_wrapper called but g_fetch_data_threadsafe_callback is null\n");
return;
}

CallbackContext context;
CancelFetchDataArgs *args = new CancelFetchDataArgs(fileIdentityStr, &context);

napi_call_threadsafe_function(g_cancel_fetch_data_threadsafe_callback, args, napi_tsfn_blocking);

{
std::unique_lock<std::mutex> lock(context.mtx);
auto timeout = std::chrono::seconds(30);

if (context.cv.wait_for(lock, timeout, [&context] { return context.ready; })) {
wprintf(L"Cancel fetch completed\n");
} else {
wprintf(L"Cancel fetch timed out\n");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ static size_t file_incremental_reading(napi_env env,
} else {
UINT64 totalSize = static_cast<UINT64>(ctx.fileSize.QuadPart);
progress = static_cast<float>(ctx.lastReadOffset) / static_cast<float>(totalSize);
Utilities::ApplyTransferStateToFile(ctx.fullClientPath.c_str(),
Utilities::ApplyTransferStateToFile(ctx.path.c_str(),
ctx.callbackInfo,
totalSize,
ctx.lastReadOffset);
Expand Down Expand Up @@ -206,15 +206,15 @@ static napi_value response_callback_fn_fetch_data(napi_env env, napi_callback_in
ctxPtr->loadFinished = true;

Utilities::ApplyTransferStateToFile(
ctxPtr->fullClientPath.c_str(),
ctxPtr->path.c_str(),
ctxPtr->callbackInfo,
ctxPtr->fileSize.QuadPart,
ctxPtr->fileSize.QuadPart
);

::Sleep(CHUNKDELAYMS);

auto fileHandle = Placeholders::OpenFileHandle(ctxPtr->fullClientPath.c_str(), FILE_WRITE_ATTRIBUTES, true);
auto fileHandle = Placeholders::OpenFileHandle(ctxPtr->path.c_str(), FILE_WRITE_ATTRIBUTES, true);
CfSetPinState(fileHandle.get(), CF_PIN_STATE_PINNED, CF_SET_PIN_FLAG_NONE, nullptr);
}

Expand All @@ -240,7 +240,7 @@ static void notify_fetch_data_call(napi_env env, napi_value js_callback, void *c
Logger::getInstance().log("notify_fetch_data_call called context isolated", LogLevel::DEBUG);
napi_status status;
TransferContext *ctx = static_cast<TransferContext *>(data);
Logger::getInstance().log("notify_fetch_data_call: ctx->fullClientPath = " + Logger::fromWStringToString(ctx->fullClientPath), LogLevel::DEBUG);
Logger::getInstance().log("notify_fetch_data_call: ctx->path = " + Logger::fromWStringToString(ctx->path), LogLevel::DEBUG);

std::wstring fileIdentityWstr;
{
Expand Down Expand Up @@ -345,19 +345,20 @@ void CALLBACK fetch_data_callback_wrapper(
{
Logger::getInstance().log("fetch_data_callback_wrapper called", LogLevel::DEBUG);

auto ctx = GetOrCreateTransferContext(callbackInfo->ConnectionKey, callbackInfo->TransferKey);
auto ctx = GetTransferContext(callbackInfo->TransferKey);

ctx->connectionKey = callbackInfo->ConnectionKey;
ctx->fileSize = callbackInfo->FileSize;
ctx->requiredLength = callbackParameters->FetchData.RequiredLength;
ctx->requiredOffset = callbackParameters->FetchData.RequiredFileOffset;
ctx->callbackInfo = *callbackInfo;

std::wstring fullClientPath(callbackInfo->VolumeDosName); // e.g., "C:"
fullClientPath.append(callbackInfo->NormalizedPath); // e.g., "\Users\file.txt"
ctx->fullClientPath = fullClientPath; // Result: "C:\Users\file.txt"
std::wstring path(callbackInfo->VolumeDosName); // e.g., "C:"
path.append(callbackInfo->NormalizedPath); // e.g., "\Users\file.txt"
ctx->path = path; // Result: "C:\Users\file.txt"

Logger::getInstance().log("Full download path: "
+ Logger::fromWStringToString(fullClientPath),
+ Logger::fromWStringToString(path),
LogLevel::INFO);

if (g_fetch_data_threadsafe_callback == nullptr) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include "TransferContext.h"
#include <TransferContext.h>

struct CfTransferKeyLess {
bool operator()(const CF_TRANSFER_KEY &a, const CF_TRANSFER_KEY &b) const {
struct CfTransferKeyLess
{
bool operator()(const CF_TRANSFER_KEY &a, const CF_TRANSFER_KEY &b) const
{
return a.QuadPart < b.QuadPart;
}
};
Expand All @@ -10,22 +12,30 @@ static std::map<CF_TRANSFER_KEY, std::shared_ptr<TransferContext>, CfTransferKey

static std::mutex g_contextMapMutex;

std::shared_ptr<TransferContext> GetOrCreateTransferContext(CF_CONNECTION_KEY connKey, CF_TRANSFER_KEY transferKey) {
std::lock_guard<std::mutex> lock(g_contextMapMutex);

auto it = g_transferContextMap.find(transferKey);
if (it != g_transferContextMap.end()) {
return it->second;
}

std::shared_ptr<TransferContext> CreateTransferContext(CF_TRANSFER_KEY transferKey)
{
auto ctx = std::make_shared<TransferContext>();
ctx->connectionKey = connKey;
ctx->transferKey = transferKey;

std::scoped_lock lock(g_contextMapMutex);
g_transferContextMap[transferKey] = ctx;
return ctx;
}

void RemoveTransferContext(CF_TRANSFER_KEY transferKey) {
std::lock_guard<std::mutex> lock(g_contextMapMutex);
std::shared_ptr<TransferContext> GetTransferContext(CF_TRANSFER_KEY transferKey)
{
std::scoped_lock lock(g_contextMapMutex);

if (auto it = g_transferContextMap.find(transferKey); it != g_transferContextMap.end())
{
return it->second;
}

return nullptr;
}

void RemoveTransferContext(CF_TRANSFER_KEY transferKey)
{
std::scoped_lock lock(g_contextMapMutex);
g_transferContextMap.erase(transferKey);
}