Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple instance #20

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ configs explanation:
},

"withholdPercent": 0.005, // coinbase reward withhold percent(0.5% by default), used for tx fee mainly
"rewardEnabled": true, // enabled by default
"rewardInterval": 600, // update miner balances every this many seconds
"confirmationTime": 30600, // 510m by default, you can decrease this if your payment addresses have enough balance

"paymentEnabled": true, // enabled by default
"minPaymentCoins": "3.5", // minimum number of coins that a miner must earn before sending payment
"paymentInterval": 600, // send payment every this many seconds

Expand Down
2 changes: 2 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@
},

"withholdPercent": 0.005,
"rewardEnabled": true,
"rewardInterval": 600,
"confirmationTime": 30600,

"paymentEnabled": true,
"minPaymentCoins": "0.5",
"paymentInterval": 3600,
"txConfirmations": {
Expand Down
4 changes: 2 additions & 2 deletions lib/jobManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ function JobManager(jobExpiryPeriod){
difficulty: difficulty,
error: error[1]
});
return {error: error, result: null};
return {error: error};
};

var job = _this.validJobs.getJob(params.jobId);
Expand Down Expand Up @@ -214,7 +214,7 @@ function JobManager(jobExpiryPeriod){
foundBlock: foundBlock
});

return {result: true, error: null, blockHash: hash};
return {error: null};
};
};
JobManager.prototype.__proto__ = events.EventEmitter.prototype;
Expand Down
10 changes: 6 additions & 4 deletions lib/paymentProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -578,10 +578,12 @@ var PaymentProcessor = module.exports = function PaymentProcessor(config, logger
}

this.start = function(){
checkAddress(config.addresses);
loadPublicKey(config.wallet, function(){
setTimeout(payment, config.paymentInterval * 1000);
});
if (config.paymentEnabled){
checkAddress(config.addresses);
loadPublicKey(config.wallet, function(){
setTimeout(payment, config.paymentInterval * 1000);
});
}
}

function loadPublicKey(walletConfig, callback){
Expand Down
6 changes: 3 additions & 3 deletions lib/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ var pool = module.exports = function pool(config, logger){
share_difficulty: shareData.shareDiff,
ip: shareData.ip
}))
_this.shareProcessor.handleShare(shareData);
if (shareData.foundBlock){
logger.info('Found block for chainIndex: ' + chainIndex +
', hash: ' + shareData.blockHash +
Expand All @@ -114,6 +113,7 @@ var pool = module.exports = function pool(config, logger){
}
});
}
_this.shareProcessor.handleShare(shareData);
})
}

Expand Down Expand Up @@ -204,14 +204,14 @@ var pool = module.exports = function pool(config, logger){
_this.varDiff.manageClient(client);

client.on('submit', function(params, resultCallback){
var result =_this.jobManager.processShare(
var result = _this.jobManager.processShare(
params,
client.previousDifficulty,
client.difficulty,
client.remoteAddress,
client.socket.localPort
);
resultCallback(result.error, result.result ? true : null);
resultCallback(result.error);

}).on('malformedMessage', function (message) {
logger.warn('Malformed message from ' + client.getLabel() + ': ' + message);
Expand Down
63 changes: 44 additions & 19 deletions lib/shareProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const { Pool } = require('pg');
var ShareProcessor = module.exports = function ShareProcessor(config, logger){
var confirmationTime = config.confirmationTime * 1000;
var rewardPercent = 1 - config.withholdPercent;
var shareExpiryPeriod = 15;

var _this = this;
this.redisClient = new Redis(config.redis.port, config.redis.host);
Expand Down Expand Up @@ -71,11 +72,11 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){
createTables(_this.db);
_this.handleShare = function(share){
persistShare(_this.db, share);
_this._handleShare(share);
_this._handleShare(share, _ => {});
}
}
else {
_this.handleShare = share => _this._handleShare(share);
_this.handleShare = share => _this._handleShare(share, _ => {});
}

this.currentRoundKey = function(fromGroup, toGroup){
Expand All @@ -91,28 +92,50 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){
var hashrateKey = 'hashrate';
var balancesKey = 'balances';

this._handleShare = function(share){
var redisTx = _this.redisClient.multi();
var currentMs = Date.now();
this.shareCacheKey = function(fromGroup, toGroup, hash){
return fromGroup + ':' + toGroup + ':hashes:' + hash;
}

this._handleShare = function(share, callback){
var fromGroup = share.job.fromGroup;
var toGroup = share.job.toGroup;
var blockHash = share.blockHash;
var currentRound = _this.currentRoundKey(fromGroup, toGroup);
redisTx.hincrbyfloat(currentRound, share.workerAddress, share.difficulty);
var hashKey = _this.shareCacheKey(fromGroup, toGroup, blockHash);

var currentTs = Math.floor(currentMs / 1000);
redisTx.zadd(hashrateKey, currentTs, [fromGroup, toGroup, share.worker, share.difficulty, currentMs].join(':'));
_this.redisClient.set(hashKey, true, 'EX', shareExpiryPeriod, 'NX', function(error, result){
if (error){
logger.error('Check share duplicated failed, error: ' + error);
callback(error);
return;
}

if (share.foundBlock){
var blockHash = share.blockHash;
var newKey = _this.roundKey(fromGroup, toGroup, blockHash);
var blockWithTs = blockHash + ':' + currentMs.toString();
if (result == null){
logger.error('Ignore duplicated share, key: ' + hashKey);
callback('duplicated share');
return;
}

redisTx.rename(currentRound, newKey);
redisTx.sadd(pendingBlocksKey, blockWithTs);
redisTx.hset(foundBlocksKey, blockHash, share.workerAddress)
}
redisTx.exec(function(error, _){
if (error) logger.error('Handle share failed, error: ' + error);
var redisTx = _this.redisClient.multi();
redisTx.hincrbyfloat(currentRound, share.workerAddress, share.difficulty);

var currentMs = Date.now();
var currentTs = Math.floor(currentMs / 1000);
redisTx.zadd(hashrateKey, currentTs, [fromGroup, toGroup, share.worker, share.difficulty, currentMs].join(':'));

if (share.foundBlock){
var blockHash = share.blockHash;
var newKey = _this.roundKey(fromGroup, toGroup, blockHash);
var blockWithTs = blockHash + ':' + currentMs.toString();

redisTx.rename(currentRound, newKey);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamenx should probably be used here, otherwise in case of found block sent with delay (but still not expired) by one worker, the sharecache key would have been deleted and the shares for the block overwritten by new currentRound.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch, thanks, the remaining commands will continue to execute even if renamenx failed in redis transaction, so my thought is that we can use set hash true ex expiryPeriod nx to check if the share is duplicated, which expiryPeriod > jobExpiryPeriod, how do you think?

redisTx.sadd(pendingBlocksKey, blockWithTs);
redisTx.hset(foundBlocksKey, blockHash, share.workerAddress)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hsetnx should be used here, so that "paternity" of the block is kept to the first worker who submitted it (in case of same corner case than above)

}
redisTx.exec(function(error, _){
if (error) logger.error('Handle share failed, error: ' + error);
callback(error);
});
});
}

Expand Down Expand Up @@ -265,6 +288,8 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){
}

this.start = function(){
setInterval(scanBlocks, config.rewardInterval * 1000);
if (config.rewardEnabled){
setInterval(scanBlocks, config.rewardInterval * 1000);
}
}
}
10 changes: 4 additions & 6 deletions lib/stratum.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,13 @@ var StratumClient = function(params){
function handleSubmit(message){
_this.emit('submit',
message.params,
function(error, result){
if (!error && result){
_this.emit('submitAccepted');
}
if (!considerBan(result)){
function(error){
var accepted = !error;
if (!considerBan(accepted)){
sendJson({
id: message.id,
method: 'mining.submit_result',
result: result,
result: accepted,
error: error
});
}
Expand Down
117 changes: 85 additions & 32 deletions test/shareProcessorTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,56 @@ describe('test share processor', function(){
})
})

it('should ignore duplicated shares', function(done){
var shareProcessor = new ShareProcessor(test.config, test.logger);
shareProcessor.redisClient = redisClient;

var share1 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 1, workerAddress: 'miner1'};
var share2 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash2', difficulty: 2, workerAddress: 'miner1'};
var invalidShare1 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 3, workerAddress: 'miner1'};
var invalidShare2 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 3, workerAddress: 'miner2'};

var checkState = function(roundKey, callback){
redisClient
.multi()
.hget(roundKey, 'miner1')
.hget(roundKey, 'miner2')
.exec(function(error, results){
if (error) assert.fail('Test failed: ' + error);
var difficulty1 = results[0][1];
var difficulty2 = results[1][1];
callback(difficulty1, difficulty2);
});
};

var currentRoundKey = shareProcessor.currentRoundKey(0, 1);
var key1 = shareProcessor.shareCacheKey(0, 1, share1.blockHash);
var key2 = shareProcessor.shareCacheKey(0, 1, share2.blockHash);
util.executeForEach([share1, share2, invalidShare1, invalidShare2], (share, callback) => {
shareProcessor._handleShare(share, callback);
}, function(){
checkState(currentRoundKey, function(diff1, diff2){
expect(parseFloat(diff1)).equal(share1.difficulty + share2.difficulty);
expect(diff2).equal(null);

var blockShare = {job: {fromGroup: 0, toGroup: 1}, foundBlock: true, blockHash: 'hash3', difficulty: 3, workerAddress: 'miner1'};
shareProcessor._handleShare(blockShare, function(){
var roundKey = shareProcessor.roundKey(0, 1, blockShare.blockHash);
checkState(roundKey, function(diff1, diff2){
expect(parseFloat(diff1)).equal(share1.difficulty + share2.difficulty + blockShare.difficulty);
expect(diff2).equal(null);

redisClient.exists(key1, key2, function(error, result){
if (error) assert.fail('Test failed: ' + error);
expect(result).equal(2);
done();
});
});
});
});
});
})

it('should process shares', function(done){
var shareProcessor = new ShareProcessor(test.config, test.logger);
shareProcessor.redisClient = redisClient;
Expand All @@ -38,44 +88,47 @@ describe('test share processor', function(){
foundBlock: false
};

shareProcessor.handleShare(shareData);
var currentRoundKey = shareProcessor.currentRoundKey(
shareData.job.fromGroup,
shareData.job.toGroup
);

redisClient.hget(currentRoundKey, shareData.workerAddress, function(error, res){
if (error) assert.fail('Test failed: ' + error);
expect(parseFloat(res)).equal(shareData.difficulty);

shareData.foundBlock = true;
var blockHashHex = '0011';
shareData.blockHash = blockHashHex;
shareProcessor.handleShare(shareData);
shareProcessor._handleShare(shareData, function(){

var roundKey = shareProcessor.roundKey(
var currentRoundKey = shareProcessor.currentRoundKey(
shareData.job.fromGroup,
shareData.job.toGroup,
blockHashHex
shareData.job.toGroup
);

redisClient
.multi()
.hget(roundKey, shareData.workerAddress)
.smembers('pendingBlocks')
.hget('foundBlocks', blockHashHex)
.exec(function(error, result){
if (error) assert.fail('Test failed: ' + error);
var difficulty = result[0][1];
var pendingBlocks = result[1][1];
var blockMiner = result[2][1];
redisClient.hget(currentRoundKey, shareData.workerAddress, function(error, res){
if (error) assert.fail('Test failed: ' + error);
expect(parseFloat(res)).equal(shareData.difficulty);

expect(parseFloat(difficulty)).equal(shareData.difficulty * 2);
expect(pendingBlocks.length).equal(1);
expect(pendingBlocks[0].startsWith(blockHashHex));
expect(blockMiner).equal(shareData.workerAddress);
done();
shareData.foundBlock = true;
var blockHashHex = '0011';
shareData.blockHash = blockHashHex;
shareProcessor._handleShare(shareData, function(){

var roundKey = shareProcessor.roundKey(
shareData.job.fromGroup,
shareData.job.toGroup,
blockHashHex
);

redisClient
.multi()
.hget(roundKey, shareData.workerAddress)
.smembers('pendingBlocks')
.hget('foundBlocks', blockHashHex)
.exec(function(error, result){
if (error) assert.fail('Test failed: ' + error);
var difficulty = result[0][1];
var pendingBlocks = result[1][1];
var blockMiner = result[2][1];

expect(parseFloat(difficulty)).equal(shareData.difficulty * 2);
expect(pendingBlocks.length).equal(1);
expect(pendingBlocks[0].startsWith(blockHashHex));
expect(blockMiner).equal(shareData.workerAddress);
done();
});
});
});
});
})

Expand Down
2 changes: 1 addition & 1 deletion test/stratumTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ describe('test stratum server', function(){

stratumClient.on('submit', function(params, callback){
expect(params).equal(submitMessage.params);
callback(null, false);
callback('invalid share');
});
});

Expand Down