Skip to content

Commit

Permalink
enh: resource choice for task execution
Browse files Browse the repository at this point in the history
  • Loading branch information
anibalsolon committed Sep 1, 2024
1 parent 35cc975 commit e89f27a
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 72 deletions.
79 changes: 38 additions & 41 deletions api/controllers/resource.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ function mask_enc(resource) {
}

function is_admin(user) {
if(user.scopes.amaretti && ~user.scopes.amaretti.indexOf("admin")) return true;
return false;
return user.scopes.amaretti && user.scopes.amaretti.includes("admin");
}

function canedit(user, resource) {
Expand Down Expand Up @@ -65,45 +64,41 @@ router.get('/', common.jwt(), function(req, res, next) {
if(req.query.limit) req.query.limit = parseInt(req.query.limit);
if(req.query.skip) req.query.skip = parseInt(req.query.skip);

//if(!is_admin(req.user) || find.user_id === undefined) {
const gids = req.user.gids||[];
gids.push(config.amaretti.globalGroup);
find["$or"] = [
{user_id: req.user.sub.toString()},
{admins: req.user.sub.toString()},
{gids: {"$in": gids}},
];
/*} else if(find.user_id == null) {
//admin can set it to null and remove user_id / gids filtering
//html get method won't allow empty parameter, so by setting it to null, then I can replace with *undefined*
delete find.user_id;
}*/
if(!is_admin(req.user)) {
const gids = req.user.gids || [];
find["$or"] = [
{user_id: req.user.sub.toString()},
{admins: req.user.sub.toString()},
{gids: {"$in": gids}},
{gids: {"$in": [config.amaretti.globalGroup]}, active: true}, // only active public resources
];
}

let select = null; //select all by default
if(req.query.select) select = req.query.select+" user_id"; //we need user_id at least

db.Resource.find(find)
.select(select)
.limit(req.query.limit || 100)
.skip(req.query.skip || 0)
.sort(req.query.sort)
.lean()
.exec(function(err, resources) {
if(err) return next(err);
resources.forEach(mask_enc);

//add / remove a few more things
resources.forEach(function(resource) {
resource.salts = undefined;
resource._canedit = canedit(req.user, resource)
});

//deprecate this..
db.Resource.countDocuments(find).exec(function(err, count) {
if(err) return next(err);
res.json({resources: resources, count: count});
});
});
.select(select)
.limit(req.query.limit || 100)
.skip(req.query.skip || 0)
.sort(req.query.sort)
.lean()
.exec(function(err, resources) {
if(err) return next(err);
resources.forEach(mask_enc);

//add / remove a few more things
resources.forEach(function(resource) {
resource.salts = undefined;
resource._canedit = canedit(req.user, resource)
});

//deprecate this..
db.Resource.countDocuments(find).exec(function(err, count) {
if(err) return next(err);
res.json({resources: resources, count: count});
});
});
});

/**
Expand Down Expand Up @@ -134,11 +129,13 @@ router.get('/best', common.jwt(), (req, res, next)=>{
};

//make sure user has access to requested gids
if(req.query.gids) task.gids = req.query.gids.map(gid=>parseInt(gid)).filter(gid=>{
if(gid == config.amaretti.globalGroup) return true; //user can select from public resource
if(req.user.gids.includes(gid)) return true; //group member can selec it
return false;
});
if(req.query.gids) {
task.gids = req.query.gids.map(gid=>parseInt(gid)).filter(gid=>{
if(gid == config.amaretti.globalGroup) return true; //user can select from public resource
if(req.user.gids.includes(gid)) return true; //group member can selec it
return false;
});
}

//if(req.query.service) task.service = req.query.service;
resource_lib.select(req.user, task, (err, resource, score, considered)=>{
Expand Down
1 change: 0 additions & 1 deletion api/models.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ mongoose.set("debug", config.amaretti.debug);

exports.init = async function(cb, connectEvent = true) {
if(connectEvent) {
console.log("connecting to amqp/events");
try {
await events.init();
} catch (err) {
Expand Down
95 changes: 69 additions & 26 deletions api/resource.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,23 @@ exports.select = function(user, task, cb) {
return;
}

const isAdmin = (user.scopes?.amaretti || []).includes('admin');
const isPreferred = !!task.preferred_resource_id;

const query = {
status: {$ne: "removed"},
active: true,
gids: {"$in": task.gids},
'config.services.name': task.service,
};

if (isAdmin && isPreferred) {
delete query['config.services.name'];
delete query.active;
}

//now let's start out by all the resources that user has access and app is enabled on
db.Resource.find({
status: {$ne: "removed"},
active: true,
gids: {"$in": task.gids},
'config.services.name': task.service,
}).lean().sort('create_date').exec((err, resources)=>{
db.Resource.find(query).lean().sort('create_date').exec((err, resources)=>{
if(err) return cb(err);

//select the best resource based on the task
Expand All @@ -49,11 +59,6 @@ exports.select = function(user, task, cb) {
var considered = [];
async.eachSeries(resources, (resource, next_resource)=>{
score_resource(user, resource, task, (err, score, detail)=>{
if(score === null) {
//not configured to run on this resource.. ignore
//console.log("no score produced for "+resource.name);
return next_resource();
}

let consider = {
_id: resource._id,
Expand All @@ -75,33 +80,54 @@ exports.select = function(user, task, cb) {

owner: resource.user_id,
};

const isResourcePreferred = task.preferred_resource_id && task.preferred_resource_id == resource._id.toString();
const isPrivate = resource.gids.length === 0;
const hasScore = score > 0;
const isResourceOk = resource.status === 'ok';
const hasTaskDependency = ~dep_resource_ids.indexOf(resource._id.toString());

if (isAdmin && isResourcePreferred) {
consider.detail.msg += "admin override";
considered.push(consider);
best_score = consider.score;
best = resource;
return next_resource("admin override");
}

if (score === null) {
//not configured to run on this resource.. ignore
//console.log("no score produced for "+resource.name);
return next_resource();
}

considered.push(consider);

if(resource.status != 'ok') {
if (!isResourceOk) {
consider.detail.msg += "resource status is not ok";
return next_resource();
}

//if score is 0, assume it's disabled..
if(score === 0) {
if (!hasScore) {
consider.detail.msg+="score is set to 0.. not running here";
return next_resource();
}

//+5 if resource is listed in dep
if(~dep_resource_ids.indexOf(resource._id.toString())) {
if (hasTaskDependency) {
consider.detail.msg+="resource listed in deps/resource_ids.. +5\n";
consider.score = score+5;
}

//+10 if it's private resource
if(resource.gids.length == 0) {
if (isPrivate) {
consider.detail.msg+="private resource.. +10\n";
consider.score = score+10;
}

//+15 score if it's preferred by user (TODO need to make sure this still works)
if(task.preferred_resource_id && task.preferred_resource_id == resource._id.toString()) {
if (isResourcePreferred) {
consider.detail.msg+="user prefers this.. +15\n";
consider.score = score+15;
}
Expand All @@ -116,6 +142,9 @@ exports.select = function(user, task, cb) {
next_resource();
});
}, err=>{
if (err === "admin override") {
err = null;
}
//for debugging
if(best) {
console.debug("best resource chosen:"+best._id+" name:"+best.name+" with score:"+best_score);
Expand Down Expand Up @@ -235,6 +264,8 @@ function check_ssh(resource, cb) {
var conn = new Client();
var ready = false;

const workdir = common.getworkdir(null, resource);

function cb_once(err, status, message) {
if(cb) {
cb(err, status, message);
Expand All @@ -251,30 +282,34 @@ function check_ssh(resource, cb) {
conn.on('ready', function() {
ready = true;

//send test script
const workdir = common.getworkdir(null, resource);
let t1 = setTimeout(()=>{
cb_once(null, "failed", "got ssh connection but sftp timeout");
t1 = null;
}, 15*1000); //10 sec too short for osgconnect

conn.sftp((err, sftp)=>{
if(!t1) return; //timed out already
clearTimeout(t1);

if(err) return cb_once(err);

let to = setTimeout(()=>{
cb_once(null, "failed", "send test script timeout(10sec) - filesystem is offline?");
cb_once(null, "failed", "send test script timeout(60sec) - filesystem is offline?");
to = null;
}, 10*1000);
}, 60*1000);

let readstream = fs.createReadStream(__dirname + "/resource_test.sh");
let writestream = sftp.createWriteStream(workdir + "/resource_test.sh");

let readstream = fs.createReadStream(__dirname+"/resource_test.sh");
let writestream = sftp.createWriteStream(workdir+"/resource_test.sh");
writestream.on('close', ()=>{
if(!to) return; //timed out already
clearTimeout(to);
console.debug("resource_test.sh write stream closed - running resource_test.sh");
conn.exec('cd '+workdir+' && timeout 10 bash resource_test.sh', (err, stream)=>{
if (err) return cb_once(err);
if (err) {
clearTimeout(to);
return cb_once(err);
}
var out = "";
stream.on('close', function(code, signal) {
console.debug(out);
Expand All @@ -290,6 +325,7 @@ function check_ssh(resource, cb) {
});
writestream.on('error', err=>{
console.debug("resource_test.sh write stream errored");
console.debug(err);
if(!to) return; //timed out already
clearTimeout(to);
if(err) return cb_once(null, "failed", "failed to stream resource_test.sh");
Expand All @@ -315,8 +351,13 @@ function check_ssh(resource, cb) {


//clone resource so that decrypted content won't leak out of here
var decrypted_resource = JSON.parse(JSON.stringify(resource));
common.decrypt_resource(decrypted_resource);
let decrypted_resource;
if (resource.decrypted) {
decrypted_resource = resource;
} else {
decrypted_resource = JSON.parse(JSON.stringify(resource));
common.decrypt_resource(decrypted_resource);
}

/*
console.debug("check_ssh - connecting ssh");
Expand All @@ -340,6 +381,8 @@ function check_ssh(resource, cb) {
}
}

exports.check_ssh = check_ssh;

//TODO this is too similar to common.js:ssh_command... can we refactor?
function check_iohost(resource, cb) {
var conn = new Client();
Expand Down Expand Up @@ -374,7 +417,7 @@ function check_iohost(resource, cb) {
let to = setTimeout(()=>{
cb_once(null, "failed", "readdir timeout - filesystem is offline?");
to = null;
}, 3*1000);
}, 3*10000);
sftp.opendir(workdir, function(err, stat) {
if(!to) return; //timed out already
clearTimeout(to);
Expand Down
2 changes: 1 addition & 1 deletion bin/list_users_with_jobcount.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ db.init(err=> {
active: true,
'profile.private.notification.newsletter_general': true,
}),
limit: 5000, //TODO unsustainable?
limit: 6000, //TODO unsustainable?
},
headers: { authorization: "Bearer "+config.wf.jwt },
}, function(err, res, data) {
Expand Down
2 changes: 1 addition & 1 deletion bin/list_users_with_recentjobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function list_users(cb) {
find: JSON.stringify({
sub: {$in: user_ids},
}),
limit: 5000, //TODO unsustainable?
limit: 6000, //TODO unsustainable?
},
headers: { authorization: "Bearer "+config.wf.jwt },
}, function(err, res, _contacts) {
Expand Down
22 changes: 20 additions & 2 deletions bin/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ const os = require('os');
const redis = require('redis');
const async = require('async');
const deepmerge = require('deepmerge');
const axios = require('axios');
const yargs = require('yargs');

const stripAnsi = require('strip-ansi');

const config = require('../config');
const db = require('../api/models');
const common = require('../api/common');
const _resource_select = require('../api/resource').select;
const resource_select = require('../api/resource').select;
const _transfer = require('../api/transfer');
const _service = require('../api/service');

Expand Down Expand Up @@ -426,10 +427,27 @@ async function handle_requested(task, next) {
task.start_date = new Date();
await task.save()

_resource_select({
let scopes = [];
try {
const res = await axios.get(
`${config.api.auth}/user/${task.user_id}`,
{
headers: {
authorization: "Bearer " + config.amaretti.jwt
}
}
);
const user = res.data;
scopes = user.scopes;
} catch (error) {
console.error(error);
}

resource_select({
//mock user object
sub: task.user_id,
gids: task.gids,
scopes: scopes,
}, task, async (err, resource, score, considered)=>{
if(err) return next(err);
if(!resource || resource.status == "removed") {
Expand Down

0 comments on commit e89f27a

Please sign in to comment.