Skip to content

Feature/Multer S3 #3850

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions packages/components/src/storageUtils.ts
Original file line number Diff line number Diff line change
@@ -11,6 +11,64 @@ import {
import { Readable } from 'node:stream'
import { getUserHome } from './utils'
import sanitize from 'sanitize-filename'
import multer from 'multer'
const multerS3 = require('multer-s3')

/**
* Get user settings file
* TODO: move env variables to settings json file, easier configuration
*/
export const getUserSettingsFilePath = () => {
if (process.env.SECRETKEY_PATH) return path.join(process.env.SECRETKEY_PATH, 'settings.json')
const checkPaths = [path.join(getUserHome(), '.flowise', 'settings.json')]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}

export const getOrgId = () => {
const settingsContent = fs.readFileSync(getUserSettingsFilePath(), 'utf8')
try {
const settings = JSON.parse(settingsContent)
return settings.instanceId
} catch (error) {
return ''
}
}

const getUploadPath = (): string => {
return process.env.BLOB_STORAGE_PATH
? path.join(process.env.BLOB_STORAGE_PATH, 'uploads', getOrgId())
: path.join(getUserHome(), '.flowise', 'uploads', getOrgId())
}

export const getMulterStorage = () => {
const storageType = getStorageType()

if (storageType === 's3') {
const s3Client = getS3Config().s3Client
const Bucket = getS3Config().Bucket

const upload = multer({
storage: multerS3({
s3: s3Client,
bucket: Bucket,
metadata: function (req: Request, file: Express.Multer.File, cb: (error: any, metadata: any) => void) {
cb(null, { fieldName: file.fieldname, originalName: file.originalname, orgId: getOrgId() })
},
key: function (req: Request, file: Express.Multer.File, cb: (error: any, metadata: any) => void) {
cb(null, `${getOrgId()}/${Date.now().toString()}`)
}
})
})
return upload
} else {
return multer({ dest: getUploadPath() })
}
}

export const addBase64FilesToStorage = async (fileBase64: string, chatflowid: string, fileNames: string[]) => {
const storageType = getStorageType()
@@ -120,6 +178,37 @@ export const addSingleFileToStorage = async (mime: string, bf: Buffer, fileName:
}
}

export const getFileFromUpload = async (filePath: string): Promise<Buffer> => {
const storageType = getStorageType()
if (storageType === 's3') {
const { s3Client, Bucket } = getS3Config()

let Key = filePath
// remove the first '/' if it exists
if (Key.startsWith('/')) {
Key = Key.substring(1)
}
const getParams = {
Bucket,
Key
}

const response = await s3Client.send(new GetObjectCommand(getParams))
const body = response.Body
if (body instanceof Readable) {
const streamToString = await body.transformToString('base64')
if (streamToString) {
return Buffer.from(streamToString, 'base64')
}
}
// @ts-ignore
const buffer = Buffer.concat(response.Body.toArray())
return buffer
} else {
return fs.readFileSync(filePath)
}
}

export const getFileFromStorage = async (file: string, ...paths: string[]): Promise<Buffer> => {
const storageType = getStorageType()
const sanitizedFilename = _sanitizeFilename(file)
@@ -183,6 +272,20 @@ export const removeFilesFromStorage = async (...paths: string[]) => {
}
}

export const removeSpecificFileFromUpload = async (filePath: string) => {
const storageType = getStorageType()
if (storageType === 's3') {
let Key = filePath
// remove the first '/' if it exists
if (Key.startsWith('/')) {
Key = Key.substring(1)
}
await _deleteS3Folder(Key)
} else {
fs.unlinkSync(filePath)
}
}

export const removeSpecificFileFromStorage = async (...paths: string[]) => {
const storageType = getStorageType()
if (storageType === 's3') {
1 change: 1 addition & 0 deletions packages/server/package.json
Original file line number Diff line number Diff line change
@@ -91,6 +91,7 @@
"moment": "^2.29.3",
"moment-timezone": "^0.5.34",
"multer": "^1.4.5-lts.1",
"multer-s3": "^3.0.1",
"mysql2": "^3.11.3",
"openai": "^4.57.3",
"pg": "^8.11.1",
Original file line number Diff line number Diff line change
@@ -143,7 +143,7 @@ const uploadFilesToAssistantVectorStore = async (req: Request, res: Response, ne
// Address file name with special characters: https://github.com/expressjs/multer/issues/1104
file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8')
uploadFiles.push({
filePath: file.path,
filePath: file.path ?? (file as any).key,
fileName: file.originalname
})
}
2 changes: 1 addition & 1 deletion packages/server/src/controllers/openai-assistants/index.ts
Original file line number Diff line number Diff line change
@@ -84,7 +84,7 @@ const uploadAssistantFiles = async (req: Request, res: Response, next: NextFunct
// Address file name with special characters: https://github.com/expressjs/multer/issues/1104
file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8')
uploadFiles.push({
filePath: file.path,
filePath: file.path ?? (file as any).key,
fileName: file.originalname
})
}
7 changes: 2 additions & 5 deletions packages/server/src/routes/attachments/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import express from 'express'
import multer from 'multer'
import attachmentsController from '../../controllers/attachments'
import { getUploadPath } from '../../utils'
import { getMulterStorage } from 'flowise-components'

const router = express.Router()

const upload = multer({ dest: getUploadPath() })

// CREATE
router.post('/:chatflowId/:chatId', upload.array('files'), attachmentsController.createAttachment)
router.post('/:chatflowId/:chatId', getMulterStorage().array('files'), attachmentsController.createAttachment)

export default router
6 changes: 2 additions & 4 deletions packages/server/src/routes/documentstore/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import express from 'express'
import multer from 'multer'
import { getUploadPath } from '../../utils'
import documentStoreController from '../../controllers/documentstore'
import { getMulterStorage } from 'flowise-components'

const router = express.Router()
const upload = multer({ dest: getUploadPath() })

router.post(['/upsert/', '/upsert/:id'], upload.array('files'), documentStoreController.upsertDocStoreMiddleware)
router.post(['/upsert/', '/upsert/:id'], getMulterStorage().array('files'), documentStoreController.upsertDocStoreMiddleware)

router.post(['/refresh/', '/refresh/:id'], documentStoreController.refreshDocStoreMiddleware)

6 changes: 2 additions & 4 deletions packages/server/src/routes/openai-assistants-files/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import express from 'express'
import multer from 'multer'
import openaiAssistantsController from '../../controllers/openai-assistants'
import { getUploadPath } from '../../utils'
import { getMulterStorage } from 'flowise-components'

const router = express.Router()
const upload = multer({ dest: getUploadPath() })

router.post('/download/', openaiAssistantsController.getFileFromAssistant)
router.post('/upload/', upload.array('files'), openaiAssistantsController.uploadAssistantFiles)
router.post('/upload/', getMulterStorage().array('files'), openaiAssistantsController.uploadAssistantFiles)

export default router
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import express from 'express'
import multer from 'multer'
import openaiAssistantsVectorStoreController from '../../controllers/openai-assistants-vector-store'
import { getUploadPath } from '../../utils'
import { getMulterStorage } from 'flowise-components'

const router = express.Router()
const upload = multer({ dest: getUploadPath() })

// CREATE
router.post('/', openaiAssistantsVectorStoreController.createAssistantVectorStore)
@@ -22,7 +20,7 @@ router.put(['/', '/:id'], openaiAssistantsVectorStoreController.updateAssistantV
router.delete(['/', '/:id'], openaiAssistantsVectorStoreController.deleteAssistantVectorStore)

// POST
router.post('/:id', upload.array('files'), openaiAssistantsVectorStoreController.uploadFilesToAssistantVectorStore)
router.post('/:id', getMulterStorage().array('files'), openaiAssistantsVectorStoreController.uploadFilesToAssistantVectorStore)

// DELETE
router.patch(['/', '/:id'], openaiAssistantsVectorStoreController.deleteFilesFromAssistantVectorStore)
12 changes: 7 additions & 5 deletions packages/server/src/routes/predictions/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import express from 'express'
import multer from 'multer'
import predictionsController from '../../controllers/predictions'
import { getUploadPath } from '../../utils'
import { getMulterStorage } from 'flowise-components'

const router = express.Router()

const upload = multer({ dest: getUploadPath() })

// CREATE
router.post(['/', '/:id'], upload.array('files'), predictionsController.getRateLimiterMiddleware, predictionsController.createPrediction)
router.post(
['/', '/:id'],
getMulterStorage().array('files'),
predictionsController.getRateLimiterMiddleware,
predictionsController.createPrediction
)

export default router
9 changes: 3 additions & 6 deletions packages/server/src/routes/vectors/index.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import express from 'express'
import multer from 'multer'
import vectorsController from '../../controllers/vectors'
import { getUploadPath } from '../../utils'
import { getMulterStorage } from 'flowise-components'

const router = express.Router()

const upload = multer({ dest: getUploadPath() })

// CREATE
router.post(
['/upsert/', '/upsert/:id'],
upload.array('files'),
getMulterStorage().array('files'),
vectorsController.getRateLimiterMiddleware,
vectorsController.upsertVectorMiddleware
)
router.post(['/internal-upsert/', '/internal-upsert/:id'], upload.array('files'), vectorsController.createInternalUpsert)
router.post(['/internal-upsert/', '/internal-upsert/:id'], getMulterStorage().array('files'), vectorsController.createInternalUpsert)

export default router
9 changes: 5 additions & 4 deletions packages/server/src/services/documentstore/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { DocumentStore } from '../../database/entities/DocumentStore'
import * as fs from 'fs'
import * as path from 'path'
import {
addArrayFilesToStorage,
addSingleFileToStorage,
getFileFromStorage,
getFileFromUpload,
ICommonObject,
IDocument,
mapExtToInputField,
mapMimeTypeToInputField,
removeFilesFromStorage,
removeSpecificFileFromStorage
removeSpecificFileFromStorage,
removeSpecificFileFromUpload
} from 'flowise-components'
import {
addLoaderSource,
@@ -1441,7 +1442,7 @@ const upsertDocStoreMiddleware = async (
const filesLoaderConfig: ICommonObject = {}
for (const file of files) {
const fileNames: string[] = []
const fileBuffer = fs.readFileSync(file.path)
const fileBuffer = await getFileFromUpload(file.path ?? (file as any).key)
// Address file name with special characters: https://github.com/expressjs/multer/issues/1104
file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8')

@@ -1481,7 +1482,7 @@ const upsertDocStoreMiddleware = async (
filesLoaderConfig[fileInputField] = JSON.stringify([storagePath])
}

fs.unlinkSync(file.path)
await removeSpecificFileFromUpload(file.path ?? (file as any).key)
}

loaderConfig = {
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import OpenAI from 'openai'
import { StatusCodes } from 'http-status-codes'
import fs from 'fs'
import { Credential } from '../../database/entities/Credential'
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
import { getErrorMessage } from '../../errors/utils'
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { decryptCredentialData } from '../../utils'
import { getFileFromUpload, removeSpecificFileFromUpload } from 'flowise-components'

const getAssistantVectorStore = async (credentialId: string, vectorStoreId: string) => {
try {
@@ -178,13 +178,14 @@ const uploadFilesToAssistantVectorStore = async (
const openai = new OpenAI({ apiKey: openAIApiKey })
const uploadedFiles = []
for (const file of files) {
const toFile = await OpenAI.toFile(fs.readFileSync(file.filePath), file.fileName)
const fileBuffer = await getFileFromUpload(file.filePath)
const toFile = await OpenAI.toFile(fileBuffer, file.fileName)
const createdFile = await openai.files.create({
file: toFile,
purpose: 'assistants'
})
uploadedFiles.push(createdFile)
fs.unlinkSync(file.filePath)
await removeSpecificFileFromUpload(file.filePath)
}

const file_ids = [...uploadedFiles.map((file) => file.id)]
7 changes: 4 additions & 3 deletions packages/server/src/services/openai-assistants/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import OpenAI from 'openai'
import fs from 'fs'
import { StatusCodes } from 'http-status-codes'
import { decryptCredentialData } from '../../utils'
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { Credential } from '../../database/entities/Credential'
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
import { getErrorMessage } from '../../errors/utils'
import { getFileFromUpload, removeSpecificFileFromUpload } from 'flowise-components'

// ----------------------------------------
// Assistants
@@ -101,13 +101,14 @@ const uploadFilesToAssistant = async (credentialId: string, files: { filePath: s
const uploadedFiles = []

for (const file of files) {
const toFile = await OpenAI.toFile(fs.readFileSync(file.filePath), file.fileName)
const fileBuffer = await getFileFromUpload(file.filePath)
const toFile = await OpenAI.toFile(fileBuffer, file.fileName)
const createdFile = await openai.files.create({
file: toFile,
purpose: 'assistants'
})
uploadedFiles.push(createdFile)
fs.unlinkSync(file.filePath)
await removeSpecificFileFromUpload(file.filePath)
}

return uploadedFiles
9 changes: 5 additions & 4 deletions packages/server/src/utils/buildChatflow.ts
Original file line number Diff line number Diff line change
@@ -9,7 +9,9 @@ import {
mapMimeTypeToInputField,
mapExtToInputField,
generateFollowUpPrompts,
IServerSideEventStreamer
IServerSideEventStreamer,
removeSpecificFileFromUpload,
getFileFromUpload
} from 'flowise-components'
import { StatusCodes } from 'http-status-codes'
import {
@@ -49,7 +51,6 @@ import { validateChatflowAPIKey } from './validateKey'
import { databaseEntities } from '.'
import { v4 as uuidv4 } from 'uuid'
import { omit } from 'lodash'
import * as fs from 'fs'
import logger from './logger'
import { utilAddChatMessage } from './addChatMesage'
import { buildAgentGraph } from './buildAgentGraph'
@@ -162,7 +163,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
const overrideConfig: ICommonObject = { ...req.body }
const fileNames: string[] = []
for (const file of files) {
const fileBuffer = fs.readFileSync(file.path)
const fileBuffer = await getFileFromUpload(file.path ?? (file as any).key)
// Address file name with special characters: https://github.com/expressjs/multer/issues/1104
file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8')
const storagePath = await addArrayFilesToStorage(file.mimetype, fileBuffer, file.originalname, fileNames, chatflowid)
@@ -195,7 +196,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
overrideConfig[fileInputField] = storagePath
}

fs.unlinkSync(file.path)
await removeSpecificFileFromUpload(file.path ?? (file as any).key)
}
if (overrideConfig.vars && typeof overrideConfig.vars === 'string') {
overrideConfig.vars = JSON.parse(overrideConfig.vars)
14 changes: 10 additions & 4 deletions packages/server/src/utils/createAttachment.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { Request } from 'express'
import * as path from 'path'
import * as fs from 'fs'
import { addArrayFilesToStorage, IDocument, mapExtToInputField, mapMimeTypeToInputField } from 'flowise-components'
import {
addArrayFilesToStorage,
getFileFromUpload,
removeSpecificFileFromUpload,
IDocument,
mapExtToInputField,
mapMimeTypeToInputField
} from 'flowise-components'
import { getRunningExpressApp } from './getRunningExpressApp'
import { getErrorMessage } from '../errors/utils'

@@ -41,7 +47,7 @@ export const createFileAttachment = async (req: Request) => {
if (files.length) {
const isBase64 = req.body.base64
for (const file of files) {
const fileBuffer = fs.readFileSync(file.path)
const fileBuffer = await getFileFromUpload(file.path ?? (file as any).key)
const fileNames: string[] = []

// Address file name with special characters: https://github.com/expressjs/multer/issues/1104
@@ -63,7 +69,7 @@ export const createFileAttachment = async (req: Request) => {
fileInputField = fileInputFieldFromExt
}

fs.unlinkSync(file.path)
await removeSpecificFileFromUpload(file.path ?? (file as any).key)

try {
const nodeData = {
21 changes: 0 additions & 21 deletions packages/server/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1690,21 +1690,6 @@ export const getTelemetryFlowObj = (nodes: IReactFlowNode[], edges: IReactFlowEd
return { nodes: nodeData, edges: edgeData }
}

/**
* Get user settings file
* TODO: move env variables to settings json file, easier configuration
*/
export const getUserSettingsFilePath = () => {
if (process.env.SECRETKEY_PATH) return path.join(process.env.SECRETKEY_PATH, 'settings.json')
const checkPaths = [path.join(getUserHome(), '.flowise', 'settings.json')]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}

/**
* Get app current version
*/
@@ -1773,9 +1758,3 @@ export const getAPIOverrideConfig = (chatflow: IChatFlow) => {
return { nodeOverrides: {}, variableOverrides: [], apiOverrideStatus: false }
}
}

export const getUploadPath = (): string => {
return process.env.BLOB_STORAGE_PATH
? path.join(process.env.BLOB_STORAGE_PATH, 'uploads')
: path.join(getUserHome(), '.flowise', 'uploads')
}
3 changes: 2 additions & 1 deletion packages/server/src/utils/telemetry.ts
Original file line number Diff line number Diff line change
@@ -2,7 +2,8 @@ import { v4 as uuidv4 } from 'uuid'
import { PostHog } from 'posthog-node'
import path from 'path'
import fs from 'fs'
import { getUserHome, getUserSettingsFilePath } from '.'
import { getUserHome } from '.'
import { getUserSettingsFilePath } from 'flowise-components'

export class Telemetry {
postHog?: PostHog
15 changes: 11 additions & 4 deletions packages/server/src/utils/upsertVector.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import { Request } from 'express'
import * as fs from 'fs'
import * as path from 'path'
import { cloneDeep, omit } from 'lodash'
import { ICommonObject, IMessage, addArrayFilesToStorage, mapMimeTypeToInputField, mapExtToInputField } from 'flowise-components'
import {
ICommonObject,
IMessage,
addArrayFilesToStorage,
mapMimeTypeToInputField,
mapExtToInputField,
getFileFromUpload,
removeSpecificFileFromUpload
} from 'flowise-components'
import logger from '../utils/logger'
import {
buildFlow,
@@ -57,7 +64,7 @@ export const upsertVector = async (req: Request, isInternal: boolean = false) =>
const overrideConfig: ICommonObject = { ...req.body }
for (const file of files) {
const fileNames: string[] = []
const fileBuffer = fs.readFileSync(file.path)
const fileBuffer = await getFileFromUpload(file.path ?? (file as any).key)
// Address file name with special characters: https://github.com/expressjs/multer/issues/1104
file.originalname = Buffer.from(file.originalname, 'latin1').toString('utf8')
const storagePath = await addArrayFilesToStorage(file.mimetype, fileBuffer, file.originalname, fileNames, chatflowid)
@@ -90,7 +97,7 @@ export const upsertVector = async (req: Request, isInternal: boolean = false) =>
overrideConfig[fileInputField] = storagePath
}

fs.unlinkSync(file.path)
await removeSpecificFileFromUpload(file.path ?? (file as any).key)
}
if (overrideConfig.vars && typeof overrideConfig.vars === 'string') {
overrideConfig.vars = JSON.parse(overrideConfig.vars)
61 changes: 61 additions & 0 deletions pnpm-lock.yaml