Skip to content

Commit

Permalink
Merge pull request #123 from FlowFuse/122-fix-deadlock
Browse files Browse the repository at this point in the history
Add locking in the app to avoid deadlock
  • Loading branch information
knolleary authored Aug 8, 2024
2 parents 7ab5d1b + fd67700 commit 7dc5b32
Showing 1 changed file with 137 additions and 96 deletions.
233 changes: 137 additions & 96 deletions forge/context-driver/sequelize.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,40 @@ const path = require('path')

let sequelize, app

/**
*/
const activeLocks = new Map()
/**
* This is a simple instanceId-level locking mechanism that ensures we single-thread
* requests related to a single instance.
*
* This is not scalable, but solves an immediate issue around deadlocks caused by
* parallel requests to update different context scopes for the same instance.
*
* See https://github.com/FlowFuse/file-server/issues/122
* @param {*} instanceId the id of the instance to lock
* @returns a promise that resolves once the lock is held. The promise resolves with a function that must be called to release the lock
*/
async function getInstanceLock (instanceId) {
let lockingPromise
if (!activeLocks.has(instanceId)) {
lockingPromise = Promise.resolve()
activeLocks.set(instanceId, lockingPromise)
} else {
lockingPromise = activeLocks.get(instanceId)
}
let unlockNextPromise
const nextPromise = new Promise(resolve => {
unlockNextPromise = () => {
resolve()
}
return unlockNextPromise
})
const unlockPromise = lockingPromise.then(() => unlockNextPromise)
activeLocks.set(instanceId, lockingPromise.then(() => nextPromise))
return unlockPromise
}

module.exports = {
init: async function (_app) {
app = _app
Expand Down Expand Up @@ -44,7 +78,7 @@ module.exports = {

sequelize = new Sequelize(dbOptions)

app.log.info(`FlowForge File Server Sequelize Context connected to ${dbOptions.dialect} on ${dbOptions.host || dbOptions.storage}`)
app.log.info(`FlowFuse File Server Sequelize Context connected to ${dbOptions.dialect} on ${dbOptions.host || dbOptions.storage}`)

const Context = sequelize.define('Context', {
project: { type: DataTypes.STRING, allowNull: false, unique: 'context-project-scope-unique' },
Expand All @@ -56,106 +90,113 @@ module.exports = {
},
/**
* Set the context data for a given scope
* @param {string} projectId - The project id
* @param {string} instanceId - The instance id
* @param {string} scope - The context scope to write to
* @param {[{key:string, value:any}]} input - The context data to write
* @param {boolean} [overwrite=false] - If true, any context data will be overwritten (i.e. for a cache dump). If false, the context data will be merged with the existing data.
* @param {number} quotaOverride - if set overrides the locally configured limit
*/
set: async function (projectId, scope, input, overwrite = false, quotaOverride = 0) {
const { path } = parseScope(scope)
await sequelize.transaction({
type: Sequelize.Transaction.TYPES.IMMEDIATE
},
async (t) => {
// get the existing row of context data from the database (if any)
let existingRow = await this.Context.findOne({
where: {
project: projectId,
scope: path
},
lock: t.LOCK.UPDATE,
transaction: t
})
const quotaLimit = quotaOverride || app.config?.context?.quota || 0
// if quota is set, check if we are over quota or will be after this update
if (quotaLimit > 0) {
// Difficulties implementing this correctly
// - The final size of data can only be determined after the data is stored.
// This is due to the fact that some keys may be deleted and some may be added
// and the size of the data is not the same as the size of the keys.
// This implementation is not ideal, but it is a good approximation and will
// prevent the possibility of runaway storage usage.
let changeSize = 0
let hasValues = false
// if we are overwriting, then we need to remove the existing size to get the final size
if (existingRow) {
if (overwrite) {
changeSize -= getItemSize(existingRow.values || '')
} else {
hasValues = existingRow?.values && Object.keys(existingRow.values).length > 0
set: async function (instanceId, scope, input, overwrite = false, quotaOverride = 1000) {
// Obtain the lock for this instance
const unlock = await getInstanceLock(instanceId)
try {
const { path } = parseScope(scope)
await sequelize.transaction({
type: Sequelize.Transaction.TYPES.IMMEDIATE
},
async (t) => {
// get the existing row of context data from the database (if any)
let existingRow = await this.Context.findOne({
where: {
project: instanceId,
scope: path
},
lock: t.LOCK.UPDATE,
transaction: t
})
const quotaLimit = quotaOverride || app.config?.context?.quota || 0
// if quota is set, check if we are over quota or will be after this update
if (quotaLimit > 0) {
// Difficulties implementing this correctly
// - The final size of data can only be determined after the data is stored.
// This is due to the fact that some keys may be deleted and some may be added
// and the size of the data is not the same as the size of the keys.
// This implementation is not ideal, but it is a good approximation and will
// prevent the possibility of runaway storage usage.
let changeSize = 0
let hasValues = false
// if we are overwriting, then we need to remove the existing size to get the final size
if (existingRow) {
if (overwrite) {
changeSize -= getItemSize(existingRow.values || '')
} else {
hasValues = existingRow?.values && Object.keys(existingRow.values).length > 0
}
}
}
// calculate the change in size
for (const element of input) {
const currentItem = hasValues ? getObjectProperty(existingRow.values, element.key) : undefined
if (currentItem === undefined && element.value !== undefined) {
// this is an addition
changeSize += getItemSize(element.value)
} else if (currentItem !== undefined && element.value === undefined) {
// this is an deletion
changeSize -= getItemSize(currentItem)
} else {
// this is an update
changeSize -= getItemSize(currentItem)
changeSize += getItemSize(element.value)
// calculate the change in size
for (const element of input) {
const currentItem = hasValues ? getObjectProperty(existingRow.values, element.key) : undefined
if (currentItem === undefined && element.value !== undefined) {
// this is an addition
changeSize += getItemSize(element.value)
} else if (currentItem !== undefined && element.value === undefined) {
// this is an deletion
changeSize -= getItemSize(currentItem)
} else {
// this is an update
changeSize -= getItemSize(currentItem)
changeSize += getItemSize(element.value)
}
}
}
// only calculate the current size if we are going to need it
if (changeSize >= 0) {
const currentSize = await this.quota(projectId)
if (currentSize + changeSize > quotaLimit) {
const err = new Error('Over Quota')
err.code = 'over_quota'
err.error = err.message
err.limit = quotaLimit
throw err
// only calculate the current size if we are going to need it
if (changeSize >= 0) {
const currentSize = await this.quota(instanceId)
if (currentSize + changeSize > quotaLimit) {
const err = new Error('Over Quota')
err.code = 'over_quota'
err.error = err.message
err.limit = quotaLimit
throw err
}
}
}
}

// if we are overwriting, then we need to reset the values in the existing row (if any)
if (existingRow && overwrite) {
existingRow.values = {} // reset the values since this is a mem cache -> DB dump
}

// if there is no input, then we are probably deleting the row
if (input?.length > 0) {
if (!existingRow) {
existingRow = await this.Context.create({
project: projectId,
scope: path,
values: {}
},
{
transaction: t
})
// if we are overwriting, then we need to reset the values in the existing row (if any)
if (existingRow && overwrite) {
existingRow.values = {} // reset the values since this is a mem cache -> DB dump
}
for (const i in input) {
const path = input[i].key
const value = input[i].value
util.setMessageProperty(existingRow.values, path, value)

// if there is no input, then we are probably deleting the row
if (input?.length > 0) {
if (!existingRow) {
existingRow = await this.Context.create({
project: instanceId,
scope: path,
values: {}
},
{
transaction: t
})
}
for (const i in input) {
const path = input[i].key
const value = input[i].value
util.setMessageProperty(existingRow.values, path, value)
}
}
}
if (existingRow) {
if (existingRow.values && Object.keys(existingRow.values).length === 0) {
await existingRow.destroy({ transaction: t })
} else {
existingRow.changed('values', true)
await existingRow.save({ transaction: t })
if (existingRow) {
if (existingRow.values && Object.keys(existingRow.values).length === 0) {
await existingRow.destroy({ transaction: t })
} else {
existingRow.changed('values', true)
await existingRow.save({ transaction: t })
}
}
}
})
})
} finally {
// Regardless of the result, release the lock
await unlock()
}
},
/**
* Get the context data for a given scope
Expand Down Expand Up @@ -295,17 +336,17 @@ module.exports = {
}
},
quota: async function (projectId) {
const scopesResults = await this.Context.findAll({
// Sum the lengths in the query
// - note for postgres, we have to cast the values column from JSON to text
const sizeResult = await this.Context.findOne({
where: {
project: projectId
}
})
let size = 0
scopesResults.forEach(scope => {
const strValues = JSON.stringify(scope.values)
size += strValues.length
},
attributes: [
[sequelize.fn('SUM', sequelize.fn('LENGTH', sequelize.cast(sequelize.col('values'), 'text'))), 'length']
]
})
return size
return sizeResult.getDataValue('length') || 0
}
}

Expand Down

0 comments on commit 7dc5b32

Please sign in to comment.