Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: improve nsolid::CustomCommand() #44

Merged
merged 1 commit into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 40 additions & 50 deletions src/nsolid/nsolid_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -712,18 +712,11 @@ int EnvInst::CustomCommand(std::string req_id,
return UV_EEXIST;
}

std::string* req_id_alloced = new std::string(req_id);
if (req_id_alloced == nullptr) {
return UV_ENOMEM;
}

int err = EnvInst::RunCommand(EnvInst::GetInst(thread_id()),
custom_command_,
static_cast<void*>(req_id_alloced),
CommandType::EventLoop);

int err = nsolid::RunCommand(EnvInst::GetInst(thread_id_),
CommandType::EventLoop,
custom_command_,
req_id);
if (err) {
delete req_id_alloced;
custom_command_stor_map_.erase(iter.first);
}

Expand All @@ -734,16 +727,16 @@ int EnvInst::CustomCommand(std::string req_id,
int EnvInst::CustomCommandResponse(const std::string& req_id,
const char* value,
bool is_return) {
custom_command_stor_map_lock_.lock();
auto it = custom_command_stor_map_.find(req_id);
if (it == custom_command_stor_map_.end()) {
custom_command_stor_map_lock_.unlock();
return UV_ENOENT;
}
CustomCommandStor stor;
{
ns_mutex::scoped_lock lock(custom_command_stor_map_lock_);
auto el = custom_command_stor_map_.extract(req_id);
if (el.empty()) {
return UV_ENOENT;
}

const CustomCommandStor stor(std::move(it->second));
custom_command_stor_map_.erase(it);
custom_command_stor_map_lock_.unlock();
stor = std::move(el.mapped());
}

stor.cb(req_id,
stor.command,
Expand Down Expand Up @@ -1714,34 +1707,44 @@ void EnvList::fill_trace_id_q() {
}


void EnvInst::custom_command_(SharedEnvInst envinst_sp, void* data) {
std::string* req_id = static_cast<std::string*>(data);
void EnvInst::custom_command_(SharedEnvInst envinst_sp,
const std::string req_id) {
auto error_cb = [](const std::string& req_id,
const CustomCommandStor& stor,
int err,
SharedEnvInst& envinst_sp) {
trevnorris marked this conversation as resolved.
Show resolved Hide resolved
stor.cb(req_id,
stor.command,
err,
{ false, std::string() },
{ false, std::string() },
stor.data);
ns_mutex::scoped_lock lock(envinst_sp->custom_command_stor_map_lock_);
envinst_sp->custom_command_stor_map_.erase(req_id);
};

CustomCommandStor stor;
{
ns_mutex::scoped_lock lock(envinst_sp->custom_command_stor_map_lock_);
auto it = envinst_sp->custom_command_stor_map_.find(req_id);
CHECK_NE(it, envinst_sp->custom_command_stor_map_.end());
stor = it->second;
}

Environment* env = envinst_sp->env();
if (env->nsolid_on_command_fn().IsEmpty() ||
!envinst_sp->can_call_into_js()) {
delete req_id;
error_cb(req_id, stor, UV_EINVAL, envinst_sp);
return;
}

envinst_sp->custom_command_stor_map_lock_.lock();
auto it = envinst_sp->custom_command_stor_map_.find(*req_id);
if (it == envinst_sp->custom_command_stor_map_.end()) {
envinst_sp->custom_command_stor_map_lock_.unlock();
delete req_id;
return;
}

// A copy of CustomCommandStor is made in order to release the lock asap.
const CustomCommandStor stor(it->second);
envinst_sp->custom_command_stor_map_lock_.unlock();

Isolate* isolate = envinst_sp->isolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(env->context());
Local<Value> argv[] = {
v8::String::NewFromUtf8(
isolate,
req_id->c_str(),
req_id.c_str(),
v8::NewStringType::kNormal).ToLocalChecked(),
v8::String::NewFromUtf8(
isolate,
Expand All @@ -1757,23 +1760,10 @@ void EnvInst::custom_command_(SharedEnvInst envinst_sp, void* data) {
env->process_object(),
arraysize(argv),
argv).ToLocalChecked();
// delete from map if no custom command listener attached
int r = ret->Int32Value(env->context()).ToChecked();
if (r != 0) {
{
ns_mutex::scoped_lock lock(envinst_sp->custom_command_stor_map_lock_);
envinst_sp->custom_command_stor_map_.erase(*req_id);
}

stor.cb(*req_id,
stor.command,
r,
{ false, std::string() },
{ false, std::string() },
stor.data);
error_cb(req_id, stor, r, envinst_sp);
}

delete req_id;
}


Expand Down
3 changes: 2 additions & 1 deletion src/nsolid/nsolid_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ class EnvInst {
static void close_nsolid_loader_(void* ptr);
static void run_nsolid_loader_(nsuv::ns_timer* handle, Environment* env);

static void custom_command_(SharedEnvInst envinst_sp, void* data);
static void custom_command_(SharedEnvInst envinst_sp,
const std::string req_id);

void add_metric_datapoint_(MetricsStream::Type, double);
void send_datapoint(MetricsStream::Type, double);
Expand Down
59 changes: 51 additions & 8 deletions test/addons/nsolid-custom-command/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ static void custom_command_cb(std::string req_id,
fn->Call(context,
Undefined(isolate),
5,
argv).ToLocalChecked();
argv);

cb_map_.erase(iter);
}
Expand All @@ -103,19 +103,62 @@ static void CustomCommand(const FunctionCallbackInfo<Value>& args) {
Local<String> value_s = args[2].As<String>();
String::Utf8Value value(isolate, value_s);

Global<Function> cb(isolate, args[3].As<Function>());
int err = node::nsolid::CustomCommand(envinst_sp,
*req_id,
*command,
*value,
custom_command_cb);

if (err == 0) {
Global<Function> cb(isolate, args[3].As<Function>());
auto iter = cb_map_.emplace(*req_id, std::move(cb));
assert(iter.second);
}

args.GetReturnValue().Set(err);
}

static void CustomCommandThread(const FunctionCallbackInfo<Value>& args) {
assert(4 == args.Length());
assert(args[0]->IsNumber());
assert(args[1]->IsString());
assert(args[2]->IsString());
assert(args[3]->IsString());
Isolate* isolate = args.GetIsolate();
uint64_t thread_id = args[0].As<v8::Number>()->Value();
Local<String> req_id_s = args[1].As<String>();
String::Utf8Value req_id(isolate, req_id_s);
Local<String> command_s = args[2].As<String>();
String::Utf8Value command(isolate, command_s);
Local<String> value_s = args[3].As<String>();
String::Utf8Value value(isolate, value_s);

auto envinst_sp = node::nsolid::GetEnvInst(thread_id);

int err = node::nsolid::CustomCommand(envinst_sp,
*req_id,
*command,
*value,
custom_command_cb);
args.GetReturnValue().Set(err);
}

static void SetCustomCommandCb(const FunctionCallbackInfo<Value>& args) {
assert(2 == args.Length());
assert(args[0]->IsString());
assert(args[1]->IsFunction());
Isolate* isolate = args.GetIsolate();
Local<String> req_id_s = args[0].As<String>();
String::Utf8Value req_id(isolate, req_id_s);
Global<Function> cb(isolate, args[1].As<Function>());
auto iter = cb_map_.emplace(*req_id, std::move(cb));
assert(iter.second);

node::nsolid::CustomCommand(envinst_sp,
*req_id,
*command,
*value,
custom_command_cb);
}

NODE_MODULE_INIT(/* exports, module, context */) {
NODE_SET_METHOD(exports, "customCommand", CustomCommand);
NODE_SET_METHOD(exports, "customCommandThread", CustomCommandThread);
NODE_SET_METHOD(exports, "setCustomCommandCb", SetCustomCommandCb);
// While NODE_MODULE_INIT will run for every Worker, the first execution
// won't run in parallel with another. So this won't cause a race condition.
if (EnvInst::GetEnvLocalInst(context->GetIsolate())->thread_id() == 0)
Expand Down
106 changes: 106 additions & 0 deletions test/addons/nsolid-custom-command/nsolid-custom-command-workers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
'use strict';
// Flags: --expose-gc --expose-internals

const common = require('../../common');
const assert = require('assert');
const {
Worker,
isMainThread,
threadId,
parentPort,
} = require('worker_threads');
const bindingPath = require.resolve(`./build/${common.buildType}/binding`);
const {
customCommandThread,
setCustomCommandCb,
} = require(bindingPath);
const nsolid = require('nsolid');
const { internalBinding } = require('internal/test/binding');
const { UV_ESRCH } = internalBinding('uv');

if (process.env.NSOLID_COMMAND)
common.skip('required to run without the Console');

if (!isMainThread && +process.argv[2] !== process.pid)
common.skip('Test must first run as the main thread');

nsolid.on('custom', common.mustCall((request) => {
request.return('goodbye');
}));

if (!isMainThread) {
// This is a Worker thread, so we exit early.
let exit = false;
const interval = setInterval(() => {
if (exit) {
// For gc on worker so EnvInst::CustomCommandReqWeakCallback is called.
global.gc();
clearInterval(interval);
}
}, 300);

const reqId = process.argv[3];
console.log('reqId:', reqId);
setCustomCommandCb(reqId, common.mustCall((id, cmd, st, error, value) => {
assert.strictEqual(id, reqId);
assert.strictEqual(cmd, 'custom');
assert.strictEqual(st, 0);
assert.strictEqual(error, null);
assert.strictEqual(value, '"goodbye"');
exit = true;
}));

parentPort.postMessage('ready');
return;
}

let counter = 0;

const req1 = `${nsolid.id}${++counter}`;
{
// Sending a custom command to a non-existent thread should return UV_ESRCH
const ret = customCommandThread(threadId + 1,
req1,
'custom',
JSON.stringify('hello'));
assert.strictEqual(ret, UV_ESRCH);
}
{
// Sending a custom command with the same request id should succeed as the
// previous did fail.
setCustomCommandCb(req1, common.mustCall((id, cmd, st, error, value) => {
assert.strictEqual(id, req1);
assert.strictEqual(cmd, 'custom');
assert.strictEqual(st, 0);
assert.strictEqual(error, null);
assert.strictEqual(value, '"goodbye"');
}));

const ret = customCommandThread(threadId,
req1,
'custom',
JSON.stringify('hello'));

assert.strictEqual(ret, 0);
}

// It also works with Worker threads.
const req2 = `${nsolid.id}${++counter}`;
const worker = new Worker(__filename, { argv: [process.pid, req2] });
worker.on('message', common.mustCall((msg) => {
assert.strictEqual(msg, 'ready');
const ret = customCommandThread(worker.threadId,
req2,
'custom',
JSON.stringify('hello'));
assert.strictEqual(ret, 0);
}));

worker.on('exit', common.mustCall((code) => {
assert.strictEqual(code, 0);
clearInterval(interval);
}));

// To keep the process alive.
const interval = setInterval(() => {
}, 300);
12 changes: 9 additions & 3 deletions test/addons/nsolid-custom-command/nsolid-custom-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const bindingPath = require.resolve(`./build/${common.buildType}/binding`);
const binding = require(bindingPath);
const nsolid = require('nsolid');
const { internalBinding } = require('internal/test/binding');
const { UV_EINVAL, UV_ENOENT } = internalBinding('uv');
const { UV_EEXIST, UV_EINVAL, UV_ENOENT } = internalBinding('uv');

const circularObj = {
a: 1,
Expand Down Expand Up @@ -171,7 +171,7 @@ already returned`,
});

const requestId = `${nsolid.id}${++counter}`;
binding.customCommand(
assert.strictEqual(binding.customCommand(
requestId,
command,
args,
Expand Down Expand Up @@ -215,7 +215,13 @@ already returned`,

assert.ok(false);
},
);
), 0);

assert.strictEqual(binding.customCommand(requestId,
command,
args,
common.mustNotCall()),
UV_EEXIST);
}

setTimeout(() => {
Expand Down
Loading