-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support multiple instance #20
base: master
Are you sure you want to change the base?
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,11 +71,11 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){ | |
createTables(_this.db); | ||
_this.handleShare = function(share){ | ||
persistShare(_this.db, share); | ||
_this._handleShare(share); | ||
_this._handleShare(share, _ => {}); | ||
} | ||
} | ||
else { | ||
_this.handleShare = share => _this._handleShare(share); | ||
_this.handleShare = share => _this._handleShare(share, _ => {}); | ||
} | ||
|
||
this.currentRoundKey = function(fromGroup, toGroup){ | ||
|
@@ -86,33 +86,56 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){ | |
return fromGroup + ':' + toGroup + ':shares:' + blockHash; | ||
} | ||
|
||
this.shareCacheKey = function(fromGroup, toGroup){ | ||
return fromGroup + ':' + toGroup + ':sharecache'; | ||
} | ||
|
||
var pendingBlocksKey = 'pendingBlocks'; | ||
var foundBlocksKey = 'foundBlocks'; | ||
var hashrateKey = 'hashrate'; | ||
var balancesKey = 'balances'; | ||
|
||
this._handleShare = function(share){ | ||
var redisTx = _this.redisClient.multi(); | ||
var currentMs = Date.now(); | ||
this._handleShare = function(share, callback){ | ||
var fromGroup = share.job.fromGroup; | ||
var toGroup = share.job.toGroup; | ||
var blockHash = share.blockHash; | ||
var currentRound = _this.currentRoundKey(fromGroup, toGroup); | ||
redisTx.hincrbyfloat(currentRound, share.workerAddress, share.difficulty); | ||
var cacheKey = _this.shareCacheKey(fromGroup, toGroup); | ||
|
||
var currentTs = Math.floor(currentMs / 1000); | ||
redisTx.zadd(hashrateKey, currentTs, [fromGroup, toGroup, share.worker, share.difficulty, currentMs].join(':')); | ||
_this.redisClient.sadd(cacheKey, blockHash, function(error, result){ | ||
if (error){ | ||
logger.error('Check share duplicated failed, error: ' + error); | ||
callback(error); | ||
return; | ||
} | ||
|
||
if (share.foundBlock){ | ||
var blockHash = share.blockHash; | ||
var newKey = _this.roundKey(fromGroup, toGroup, blockHash); | ||
var blockWithTs = blockHash + ':' + currentMs.toString(); | ||
if (result === 0){ | ||
logger.error('Ignore duplicated share'); | ||
callback('Duplicated share'); | ||
return; | ||
} | ||
|
||
redisTx.rename(currentRound, newKey); | ||
redisTx.sadd(pendingBlocksKey, blockWithTs); | ||
redisTx.hset(foundBlocksKey, blockHash, share.workerAddress) | ||
} | ||
redisTx.exec(function(error, _){ | ||
if (error) logger.error('Handle share failed, error: ' + error); | ||
var redisTx = _this.redisClient.multi(); | ||
var currentMs = Date.now(); | ||
redisTx.hincrbyfloat(currentRound, share.workerAddress, share.difficulty); | ||
|
||
var currentTs = Math.floor(currentMs / 1000); | ||
redisTx.zadd(hashrateKey, currentTs, [fromGroup, toGroup, share.worker, share.difficulty, currentMs].join(':')); | ||
|
||
if (share.foundBlock){ | ||
var blockHash = share.blockHash; | ||
var newKey = _this.roundKey(fromGroup, toGroup, blockHash); | ||
var blockWithTs = blockHash + ':' + currentMs.toString(); | ||
|
||
redisTx.rename(currentRound, newKey); | ||
redisTx.sadd(pendingBlocksKey, blockWithTs); | ||
redisTx.hset(foundBlocksKey, blockHash, share.workerAddress) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
redisTx.del(cacheKey); | ||
} | ||
redisTx.exec(function(error, _){ | ||
if (error) logger.error('Handle share failed, error: ' + error); | ||
callback(error); | ||
}); | ||
}); | ||
} | ||
|
||
|
@@ -265,6 +288,8 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){ | |
} | ||
|
||
this.start = function(){ | ||
setInterval(scanBlocks, config.rewardInterval * 1000); | ||
if (config.rewardEnabled){ | ||
setInterval(scanBlocks, config.rewardInterval * 1000); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,49 @@ describe('test share processor', function(){ | |
}) | ||
}) | ||
|
||
it('should ignore duplicated shares', function(done){ | ||
var shareProcessor = new ShareProcessor(test.config, test.logger); | ||
shareProcessor.redisClient = redisClient; | ||
|
||
var share1 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 1, workerAddress: 'miner1'}; | ||
var share2 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash2', difficulty: 2, workerAddress: 'miner1'}; | ||
var invalidShare = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 3, workerAddress: 'miner1'}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't it more relevant to be from a different worker address? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now we check if a share is duplicated based on hash, but it's good for testing, thanks. |
||
|
||
var checkState = function(cacheKey, roundKey, callback){ | ||
redisClient | ||
.multi() | ||
.hget(roundKey, 'miner1') | ||
.smembers(cacheKey) | ||
.exec(function(error, results){ | ||
if (error) assert.fail('Test failed: ' + error); | ||
var difficulty = results[0][1]; | ||
var shareHashes = results[1][1]; | ||
callback(difficulty, shareHashes); | ||
}); | ||
}; | ||
|
||
var cacheKey = shareProcessor.shareCacheKey(0, 1); | ||
var currentRoundKey = shareProcessor.currentRoundKey(0, 1); | ||
util.executeForEach([share1, share2, invalidShare], (share, callback) => { | ||
shareProcessor._handleShare(share, callback); | ||
}, function(){ | ||
checkState(cacheKey, currentRoundKey, function(difficulty, shareHashes){ | ||
expect(parseFloat(difficulty)).equal(share1.difficulty + share2.difficulty); | ||
expect(shareHashes).to.deep.equal([share1.blockHash, share2.blockHash]); | ||
|
||
var blockShare = {job: {fromGroup: 0, toGroup: 1}, foundBlock: true, blockHash: 'hash3', difficulty: 3, workerAddress: 'miner1'}; | ||
shareProcessor._handleShare(blockShare, function(){ | ||
var roundKey = shareProcessor.roundKey(0, 1, blockShare.blockHash); | ||
checkState(cacheKey, roundKey, function(difficulty, shareHashes){ | ||
expect(parseFloat(difficulty)).equal(share1.difficulty + share2.difficulty + blockShare.difficulty); | ||
expect(shareHashes).to.deep.equal([]); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}) | ||
|
||
it('should process shares', function(done){ | ||
var shareProcessor = new ShareProcessor(test.config, test.logger); | ||
shareProcessor.redisClient = redisClient; | ||
|
@@ -38,44 +81,47 @@ describe('test share processor', function(){ | |
foundBlock: false | ||
}; | ||
|
||
shareProcessor.handleShare(shareData); | ||
var currentRoundKey = shareProcessor.currentRoundKey( | ||
shareData.job.fromGroup, | ||
shareData.job.toGroup | ||
); | ||
|
||
redisClient.hget(currentRoundKey, shareData.workerAddress, function(error, res){ | ||
if (error) assert.fail('Test failed: ' + error); | ||
expect(parseFloat(res)).equal(shareData.difficulty); | ||
|
||
shareData.foundBlock = true; | ||
var blockHashHex = '0011'; | ||
shareData.blockHash = blockHashHex; | ||
shareProcessor.handleShare(shareData); | ||
shareProcessor._handleShare(shareData, function(){ | ||
|
||
var roundKey = shareProcessor.roundKey( | ||
var currentRoundKey = shareProcessor.currentRoundKey( | ||
shareData.job.fromGroup, | ||
shareData.job.toGroup, | ||
blockHashHex | ||
shareData.job.toGroup | ||
); | ||
|
||
redisClient | ||
.multi() | ||
.hget(roundKey, shareData.workerAddress) | ||
.smembers('pendingBlocks') | ||
.hget('foundBlocks', blockHashHex) | ||
.exec(function(error, result){ | ||
if (error) assert.fail('Test failed: ' + error); | ||
var difficulty = result[0][1]; | ||
var pendingBlocks = result[1][1]; | ||
var blockMiner = result[2][1]; | ||
redisClient.hget(currentRoundKey, shareData.workerAddress, function(error, res){ | ||
if (error) assert.fail('Test failed: ' + error); | ||
expect(parseFloat(res)).equal(shareData.difficulty); | ||
|
||
expect(parseFloat(difficulty)).equal(shareData.difficulty * 2); | ||
expect(pendingBlocks.length).equal(1); | ||
expect(pendingBlocks[0].startsWith(blockHashHex)); | ||
expect(blockMiner).equal(shareData.workerAddress); | ||
done(); | ||
shareData.foundBlock = true; | ||
var blockHashHex = '0011'; | ||
shareData.blockHash = blockHashHex; | ||
shareProcessor._handleShare(shareData, function(){ | ||
|
||
var roundKey = shareProcessor.roundKey( | ||
shareData.job.fromGroup, | ||
shareData.job.toGroup, | ||
blockHashHex | ||
); | ||
|
||
redisClient | ||
.multi() | ||
.hget(roundKey, shareData.workerAddress) | ||
.smembers('pendingBlocks') | ||
.hget('foundBlocks', blockHashHex) | ||
.exec(function(error, result){ | ||
if (error) assert.fail('Test failed: ' + error); | ||
var difficulty = result[0][1]; | ||
var pendingBlocks = result[1][1]; | ||
var blockMiner = result[2][1]; | ||
|
||
expect(parseFloat(difficulty)).equal(shareData.difficulty * 2); | ||
expect(pendingBlocks.length).equal(1); | ||
expect(pendingBlocks[0].startsWith(blockHashHex)); | ||
expect(blockMiner).equal(shareData.workerAddress); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamenx
should probably be used here, otherwise in case of found block sent with delay (but still not expired) by one worker, thesharecache
key would have been deleted and the shares for the block overwritten by new currentRound.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch, thanks, the remaining commands will continue to execute even if
renamenx
failed in redis transaction, so my thought is that we can useset hash true ex expiryPeriod nx
to check if the share is duplicated, whichexpiryPeriod > jobExpiryPeriod
, how do you think?