Skip to content

Commit

Permalink
src: improve nsolid::CustomCommand()
Browse files Browse the repository at this point in the history
Make sure `custom_command_proxy_()` is called in every path so there are
no potential leaks.
Simplify/improve code avoiding unnecessary extra heap allocation for the
`req_id`.
Added tests covering worker threads and other cases not previously
checked.

PR-URL: #44
Reviewed-by: Trevor Norris <trev.norris@gmail.com>
  • Loading branch information
santigimeno committed Dec 11, 2023
1 parent c0ed8e7 commit fccddd8
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 62 deletions.
90 changes: 40 additions & 50 deletions src/nsolid/nsolid_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -712,18 +712,11 @@ int EnvInst::CustomCommand(std::string req_id,
return UV_EEXIST;
}

std::string* req_id_alloced = new std::string(req_id);
if (req_id_alloced == nullptr) {
return UV_ENOMEM;
}

int err = EnvInst::RunCommand(EnvInst::GetInst(thread_id()),
custom_command_,
static_cast<void*>(req_id_alloced),
CommandType::EventLoop);

int err = nsolid::RunCommand(EnvInst::GetInst(thread_id_),
CommandType::EventLoop,
custom_command_,
req_id);
if (err) {
delete req_id_alloced;
custom_command_stor_map_.erase(iter.first);
}

Expand All @@ -734,16 +727,16 @@ int EnvInst::CustomCommand(std::string req_id,
int EnvInst::CustomCommandResponse(const std::string& req_id,
const char* value,
bool is_return) {
custom_command_stor_map_lock_.lock();
auto it = custom_command_stor_map_.find(req_id);
if (it == custom_command_stor_map_.end()) {
custom_command_stor_map_lock_.unlock();
return UV_ENOENT;
}
CustomCommandStor stor;
{
ns_mutex::scoped_lock lock(custom_command_stor_map_lock_);
auto el = custom_command_stor_map_.extract(req_id);
if (el.empty()) {
return UV_ENOENT;
}

const CustomCommandStor stor(std::move(it->second));
custom_command_stor_map_.erase(it);
custom_command_stor_map_lock_.unlock();
stor = std::move(el.mapped());
}

stor.cb(req_id,
stor.command,
Expand Down Expand Up @@ -1714,34 +1707,44 @@ void EnvList::fill_trace_id_q() {
}


void EnvInst::custom_command_(SharedEnvInst envinst_sp, void* data) {
std::string* req_id = static_cast<std::string*>(data);
void EnvInst::custom_command_(SharedEnvInst envinst_sp,
const std::string req_id) {
auto error_cb = [](const std::string& req_id,
const CustomCommandStor& stor,
int err,
SharedEnvInst& envinst_sp) {
stor.cb(req_id,
stor.command,
err,
{ false, std::string() },
{ false, std::string() },
stor.data);
ns_mutex::scoped_lock lock(envinst_sp->custom_command_stor_map_lock_);
envinst_sp->custom_command_stor_map_.erase(req_id);
};

CustomCommandStor stor;
{
ns_mutex::scoped_lock lock(envinst_sp->custom_command_stor_map_lock_);
auto it = envinst_sp->custom_command_stor_map_.find(req_id);
CHECK_NE(it, envinst_sp->custom_command_stor_map_.end());
stor = it->second;
}

Environment* env = envinst_sp->env();
if (env->nsolid_on_command_fn().IsEmpty() ||
!envinst_sp->can_call_into_js()) {
delete req_id;
error_cb(req_id, stor, UV_EINVAL, envinst_sp);
return;
}

envinst_sp->custom_command_stor_map_lock_.lock();
auto it = envinst_sp->custom_command_stor_map_.find(*req_id);
if (it == envinst_sp->custom_command_stor_map_.end()) {
envinst_sp->custom_command_stor_map_lock_.unlock();
delete req_id;
return;
}

// A copy of CustomCommandStor is made in order to release the lock asap.
const CustomCommandStor stor(it->second);
envinst_sp->custom_command_stor_map_lock_.unlock();

Isolate* isolate = envinst_sp->isolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(env->context());
Local<Value> argv[] = {
v8::String::NewFromUtf8(
isolate,
req_id->c_str(),
req_id.c_str(),
v8::NewStringType::kNormal).ToLocalChecked(),
v8::String::NewFromUtf8(
isolate,
Expand All @@ -1757,23 +1760,10 @@ void EnvInst::custom_command_(SharedEnvInst envinst_sp, void* data) {
env->process_object(),
arraysize(argv),
argv).ToLocalChecked();
// delete from map if no custom command listener attached
int r = ret->Int32Value(env->context()).ToChecked();
if (r != 0) {
{
ns_mutex::scoped_lock lock(envinst_sp->custom_command_stor_map_lock_);
envinst_sp->custom_command_stor_map_.erase(*req_id);
}

stor.cb(*req_id,
stor.command,
r,
{ false, std::string() },
{ false, std::string() },
stor.data);
error_cb(req_id, stor, r, envinst_sp);
}

delete req_id;
}


Expand Down
3 changes: 2 additions & 1 deletion src/nsolid/nsolid_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ class EnvInst {
static void close_nsolid_loader_(void* ptr);
static void run_nsolid_loader_(nsuv::ns_timer* handle, Environment* env);

static void custom_command_(SharedEnvInst envinst_sp, void* data);
static void custom_command_(SharedEnvInst envinst_sp,
const std::string req_id);

void add_metric_datapoint_(MetricsStream::Type, double);
void send_datapoint(MetricsStream::Type, double);
Expand Down
59 changes: 51 additions & 8 deletions test/addons/nsolid-custom-command/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ static void custom_command_cb(std::string req_id,
fn->Call(context,
Undefined(isolate),
5,
argv).ToLocalChecked();
argv);

cb_map_.erase(iter);
}
Expand All @@ -103,19 +103,62 @@ static void CustomCommand(const FunctionCallbackInfo<Value>& args) {
Local<String> value_s = args[2].As<String>();
String::Utf8Value value(isolate, value_s);

Global<Function> cb(isolate, args[3].As<Function>());
int err = node::nsolid::CustomCommand(envinst_sp,
*req_id,
*command,
*value,
custom_command_cb);

if (err == 0) {
Global<Function> cb(isolate, args[3].As<Function>());
auto iter = cb_map_.emplace(*req_id, std::move(cb));
assert(iter.second);
}

args.GetReturnValue().Set(err);
}

static void CustomCommandThread(const FunctionCallbackInfo<Value>& args) {
assert(4 == args.Length());
assert(args[0]->IsNumber());
assert(args[1]->IsString());
assert(args[2]->IsString());
assert(args[3]->IsString());
Isolate* isolate = args.GetIsolate();
uint64_t thread_id = args[0].As<v8::Number>()->Value();
Local<String> req_id_s = args[1].As<String>();
String::Utf8Value req_id(isolate, req_id_s);
Local<String> command_s = args[2].As<String>();
String::Utf8Value command(isolate, command_s);
Local<String> value_s = args[3].As<String>();
String::Utf8Value value(isolate, value_s);

auto envinst_sp = node::nsolid::GetEnvInst(thread_id);

int err = node::nsolid::CustomCommand(envinst_sp,
*req_id,
*command,
*value,
custom_command_cb);
args.GetReturnValue().Set(err);
}

static void SetCustomCommandCb(const FunctionCallbackInfo<Value>& args) {
assert(2 == args.Length());
assert(args[0]->IsString());
assert(args[1]->IsFunction());
Isolate* isolate = args.GetIsolate();
Local<String> req_id_s = args[0].As<String>();
String::Utf8Value req_id(isolate, req_id_s);
Global<Function> cb(isolate, args[1].As<Function>());
auto iter = cb_map_.emplace(*req_id, std::move(cb));
assert(iter.second);

node::nsolid::CustomCommand(envinst_sp,
*req_id,
*command,
*value,
custom_command_cb);
}

NODE_MODULE_INIT(/* exports, module, context */) {
NODE_SET_METHOD(exports, "customCommand", CustomCommand);
NODE_SET_METHOD(exports, "customCommandThread", CustomCommandThread);
NODE_SET_METHOD(exports, "setCustomCommandCb", SetCustomCommandCb);
// While NODE_MODULE_INIT will run for every Worker, the first execution
// won't run in parallel with another. So this won't cause a race condition.
if (EnvInst::GetEnvLocalInst(context->GetIsolate())->thread_id() == 0)
Expand Down
106 changes: 106 additions & 0 deletions test/addons/nsolid-custom-command/nsolid-custom-command-workers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
'use strict';
// Flags: --expose-gc --expose-internals

const common = require('../../common');
const assert = require('assert');
const {
Worker,
isMainThread,
threadId,
parentPort,
} = require('worker_threads');
const bindingPath = require.resolve(`./build/${common.buildType}/binding`);
const {
customCommandThread,
setCustomCommandCb,
} = require(bindingPath);
const nsolid = require('nsolid');
const { internalBinding } = require('internal/test/binding');
const { UV_ESRCH } = internalBinding('uv');

if (process.env.NSOLID_COMMAND)
common.skip('required to run without the Console');

if (!isMainThread && +process.argv[2] !== process.pid)
common.skip('Test must first run as the main thread');

nsolid.on('custom', common.mustCall((request) => {
request.return('goodbye');
}));

if (!isMainThread) {
// This is a Worker thread, so we exit early.
let exit = false;
const interval = setInterval(() => {
if (exit) {
// For gc on worker so EnvInst::CustomCommandReqWeakCallback is called.
global.gc();
clearInterval(interval);
}
}, 300);

const reqId = process.argv[3];
console.log('reqId:', reqId);
setCustomCommandCb(reqId, common.mustCall((id, cmd, st, error, value) => {
assert.strictEqual(id, reqId);
assert.strictEqual(cmd, 'custom');
assert.strictEqual(st, 0);
assert.strictEqual(error, null);
assert.strictEqual(value, '"goodbye"');
exit = true;
}));

parentPort.postMessage('ready');
return;
}

let counter = 0;

const req1 = `${nsolid.id}${++counter}`;
{
// Sending a custom command to a non-existent thread should return UV_ESRCH
const ret = customCommandThread(threadId + 1,
req1,
'custom',
JSON.stringify('hello'));
assert.strictEqual(ret, UV_ESRCH);
}
{
// Sending a custom command with the same request id should succeed as the
// previous did fail.
setCustomCommandCb(req1, common.mustCall((id, cmd, st, error, value) => {
assert.strictEqual(id, req1);
assert.strictEqual(cmd, 'custom');
assert.strictEqual(st, 0);
assert.strictEqual(error, null);
assert.strictEqual(value, '"goodbye"');
}));

const ret = customCommandThread(threadId,
req1,
'custom',
JSON.stringify('hello'));

assert.strictEqual(ret, 0);
}

// It also works with Worker threads.
const req2 = `${nsolid.id}${++counter}`;
const worker = new Worker(__filename, { argv: [process.pid, req2] });
worker.on('message', common.mustCall((msg) => {
assert.strictEqual(msg, 'ready');
const ret = customCommandThread(worker.threadId,
req2,
'custom',
JSON.stringify('hello'));
assert.strictEqual(ret, 0);
}));

worker.on('exit', common.mustCall((code) => {
assert.strictEqual(code, 0);
clearInterval(interval);
}));

// To keep the process alive.
const interval = setInterval(() => {
}, 300);
12 changes: 9 additions & 3 deletions test/addons/nsolid-custom-command/nsolid-custom-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const bindingPath = require.resolve(`./build/${common.buildType}/binding`);
const binding = require(bindingPath);
const nsolid = require('nsolid');
const { internalBinding } = require('internal/test/binding');
const { UV_EINVAL, UV_ENOENT } = internalBinding('uv');
const { UV_EEXIST, UV_EINVAL, UV_ENOENT } = internalBinding('uv');

const circularObj = {
a: 1,
Expand Down Expand Up @@ -171,7 +171,7 @@ already returned`,
});

const requestId = `${nsolid.id}${++counter}`;
binding.customCommand(
assert.strictEqual(binding.customCommand(
requestId,
command,
args,
Expand Down Expand Up @@ -215,7 +215,13 @@ already returned`,

assert.ok(false);
},
);
), 0);

assert.strictEqual(binding.customCommand(requestId,
command,
args,
common.mustNotCall()),
UV_EEXIST);
}

setTimeout(() => {
Expand Down

0 comments on commit fccddd8

Please sign in to comment.