Skip to content

Commit c318fc5

Browse files
AllenFang0xi4oHenryHengZJ
authored
Feat: Support Google Cloud Storage (#4061)
* support google cloud storage * update example and docs for supporting google cloud storage * recover the indent of pnpm-lock-yaml * populate the logs to google logging * normalize gcs storage paths --------- Co-authored-by: Ilango <[email protected]> Co-authored-by: Henry <[email protected]>
1 parent d53b1b6 commit c318fc5

File tree

11 files changed

+529
-160
lines changed

11 files changed

+529
-160
lines changed

CONTRIBUTING.md

+43-39
Large diffs are not rendered by default.

docker/.env.example

+5-1
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,18 @@ BLOB_STORAGE_PATH=/root/.flowise/storage
4848
# see https://raw.githubusercontent.com/FlowiseAI/Flowise/main/packages/components/models.json for the format
4949
# MODEL_LIST_CONFIG_JSON=/your_model_list_config_file_path
5050

51-
# STORAGE_TYPE=local (local | s3)
51+
# STORAGE_TYPE=local (local | s3 | gcs)
5252
# BLOB_STORAGE_PATH=/your_storage_path/.flowise/storage
5353
# S3_STORAGE_BUCKET_NAME=flowise
5454
# S3_STORAGE_ACCESS_KEY_ID=<your-access-key>
5555
# S3_STORAGE_SECRET_ACCESS_KEY=<your-secret-key>
5656
# S3_STORAGE_REGION=us-west-2
5757
# S3_ENDPOINT_URL=<custom-s3-endpoint-url>
5858
# S3_FORCE_PATH_STYLE=false
59+
# GOOGLE_CLOUD_STORAGE_CREDENTIAL=/the/keyfilename/path
60+
# GOOGLE_CLOUD_STORAGE_PROJ_ID=<your-gcp-project-id>
61+
# GOOGLE_CLOUD_STORAGE_BUCKET_NAME=<the-bucket-name>
62+
# GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS=true
5963

6064
# SHOW_COMMUNITY_NODES=true
6165
# DISABLED_NODES=bufferMemory,chatOpenAI (comma separated list of node names to disable)

i18n/README-TW.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<a href="https://github.com/FlowiseAI/Flowise">
1717
<img width="100%" src="https://github.com/FlowiseAI/Flowise/blob/main/images/flowise.gif?raw=true"></a>
1818

19-
## ⚡快速開始
19+
## 快速開始
2020

2121
下載並安裝 [NodeJS](https://nodejs.org/en/download) >= 18.15.0
2222

packages/components/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
"@gomomento/sdk": "^1.51.1",
3636
"@gomomento/sdk-core": "^1.51.1",
3737
"@google-ai/generativelanguage": "^2.5.0",
38+
"@google-cloud/storage": "^7.15.2",
3839
"@google/generative-ai": "^0.15.0",
3940
"@huggingface/inference": "^2.6.1",
4041
"@langchain/anthropic": "0.3.14",

packages/components/src/storageUtils.ts

+110
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
S3Client,
99
S3ClientConfig
1010
} from '@aws-sdk/client-s3'
11+
import { Storage } from '@google-cloud/storage'
1112
import { Readable } from 'node:stream'
1213
import { getUserHome } from './utils'
1314
import sanitize from 'sanitize-filename'
@@ -34,6 +35,25 @@ export const addBase64FilesToStorage = async (fileBase64: string, chatflowid: st
3435
})
3536
await s3Client.send(putObjCmd)
3637

38+
fileNames.push(sanitizedFilename)
39+
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
40+
} else if (storageType === 'gcs') {
41+
const { bucket } = getGcsClient()
42+
const splitDataURI = fileBase64.split(',')
43+
const filename = splitDataURI.pop()?.split(':')[1] ?? ''
44+
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
45+
const mime = splitDataURI[0].split(':')[1].split(';')[0]
46+
const sanitizedFilename = _sanitizeFilename(filename)
47+
const normalizedChatflowid = chatflowid.replace(/\\/g, '/')
48+
const normalizedFilename = sanitizedFilename.replace(/\\/g, '/')
49+
const filePath = `${normalizedChatflowid}/${normalizedFilename}`
50+
const file = bucket.file(filePath)
51+
await new Promise<void>((resolve, reject) => {
52+
file.createWriteStream({ contentType: mime, metadata: { contentEncoding: 'base64' } })
53+
.on('error', (err) => reject(err))
54+
.on('finish', () => resolve())
55+
.end(bf)
56+
})
3757
fileNames.push(sanitizedFilename)
3858
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
3959
} else {
@@ -76,6 +96,20 @@ export const addArrayFilesToStorage = async (mime: string, bf: Buffer, fileName:
7696
await s3Client.send(putObjCmd)
7797
fileNames.push(sanitizedFilename)
7898
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
99+
} else if (storageType === 'gcs') {
100+
const { bucket } = getGcsClient()
101+
const normalizedPaths = paths.map((p) => p.replace(/\\/g, '/'))
102+
const normalizedFilename = sanitizedFilename.replace(/\\/g, '/')
103+
const filePath = [...normalizedPaths, normalizedFilename].join('/')
104+
const file = bucket.file(filePath)
105+
await new Promise<void>((resolve, reject) => {
106+
file.createWriteStream()
107+
.on('error', (err) => reject(err))
108+
.on('finish', () => resolve())
109+
.end(bf)
110+
})
111+
fileNames.push(sanitizedFilename)
112+
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
79113
} else {
80114
const dir = path.join(getStoragePath(), ...paths.map(_sanitizeFilename))
81115
if (!fs.existsSync(dir)) {
@@ -109,6 +143,19 @@ export const addSingleFileToStorage = async (mime: string, bf: Buffer, fileName:
109143
})
110144
await s3Client.send(putObjCmd)
111145
return 'FILE-STORAGE::' + sanitizedFilename
146+
} else if (storageType === 'gcs') {
147+
const { bucket } = getGcsClient()
148+
const normalizedPaths = paths.map((p) => p.replace(/\\/g, '/'))
149+
const normalizedFilename = sanitizedFilename.replace(/\\/g, '/')
150+
const filePath = [...normalizedPaths, normalizedFilename].join('/')
151+
const file = bucket.file(filePath)
152+
await new Promise<void>((resolve, reject) => {
153+
file.createWriteStream({ contentType: mime, metadata: { contentEncoding: 'base64' } })
154+
.on('error', (err) => reject(err))
155+
.on('finish', () => resolve())
156+
.end(bf)
157+
})
158+
return 'FILE-STORAGE::' + sanitizedFilename
112159
} else {
113160
const dir = path.join(getStoragePath(), ...paths.map(_sanitizeFilename))
114161
if (!fs.existsSync(dir)) {
@@ -146,6 +193,11 @@ export const getFileFromUpload = async (filePath: string): Promise<Buffer> => {
146193
// @ts-ignore
147194
const buffer = Buffer.concat(response.Body.toArray())
148195
return buffer
196+
} else if (storageType === 'gcs') {
197+
const { bucket } = getGcsClient()
198+
const file = bucket.file(filePath)
199+
const [buffer] = await file.download()
200+
return buffer
149201
} else {
150202
return fs.readFileSync(filePath)
151203
}
@@ -179,6 +231,14 @@ export const getFileFromStorage = async (file: string, ...paths: string[]): Prom
179231
// @ts-ignore
180232
const buffer = Buffer.concat(response.Body.toArray())
181233
return buffer
234+
} else if (storageType === 'gcs') {
235+
const { bucket } = getGcsClient()
236+
const normalizedPaths = paths.map((p) => p.replace(/\\/g, '/'))
237+
const normalizedFilename = sanitizedFilename.replace(/\\/g, '/')
238+
const filePath = [...normalizedPaths, normalizedFilename].join('/')
239+
const file = bucket.file(filePath)
240+
const [buffer] = await file.download()
241+
return buffer
182242
} else {
183243
const fileInStorage = path.join(getStoragePath(), ...paths.map(_sanitizeFilename), sanitizedFilename)
184244
return fs.readFileSync(fileInStorage)
@@ -208,6 +268,10 @@ export const removeFilesFromStorage = async (...paths: string[]) => {
208268
Key = Key.substring(1)
209269
}
210270
await _deleteS3Folder(Key)
271+
} else if (storageType === 'gcs') {
272+
const { bucket } = getGcsClient()
273+
const normalizedPath = paths.map((p) => p.replace(/\\/g, '/')).join('/')
274+
await bucket.deleteFiles({ prefix: `${normalizedPath}/` })
211275
} else {
212276
const directory = path.join(getStoragePath(), ...paths.map(_sanitizeFilename))
213277
_deleteLocalFolderRecursive(directory)
@@ -223,6 +287,9 @@ export const removeSpecificFileFromUpload = async (filePath: string) => {
223287
Key = Key.substring(1)
224288
}
225289
await _deleteS3Folder(Key)
290+
} else if (storageType === 'gcs') {
291+
const { bucket } = getGcsClient()
292+
await bucket.file(filePath).delete()
226293
} else {
227294
fs.unlinkSync(filePath)
228295
}
@@ -237,6 +304,15 @@ export const removeSpecificFileFromStorage = async (...paths: string[]) => {
237304
Key = Key.substring(1)
238305
}
239306
await _deleteS3Folder(Key)
307+
} else if (storageType === 'gcs') {
308+
const { bucket } = getGcsClient()
309+
const fileName = paths.pop()
310+
if (fileName) {
311+
const sanitizedFilename = _sanitizeFilename(fileName)
312+
paths.push(sanitizedFilename)
313+
}
314+
const normalizedPath = paths.map((p) => p.replace(/\\/g, '/')).join('/')
315+
await bucket.file(normalizedPath).delete()
240316
} else {
241317
const fileName = paths.pop()
242318
if (fileName) {
@@ -257,6 +333,10 @@ export const removeFolderFromStorage = async (...paths: string[]) => {
257333
Key = Key.substring(1)
258334
}
259335
await _deleteS3Folder(Key)
336+
} else if (storageType === 'gcs') {
337+
const { bucket } = getGcsClient()
338+
const normalizedPath = paths.map((p) => p.replace(/\\/g, '/')).join('/')
339+
await bucket.deleteFiles({ prefix: `${normalizedPath}/` })
260340
} else {
261341
const directory = path.join(getStoragePath(), ...paths.map(_sanitizeFilename))
262342
_deleteLocalFolderRecursive(directory, true)
@@ -355,6 +435,14 @@ export const streamStorageFile = async (
355435
const blob = await body.transformToByteArray()
356436
return Buffer.from(blob)
357437
}
438+
} else if (storageType === 'gcs') {
439+
const { bucket } = getGcsClient()
440+
const normalizedChatflowId = chatflowId.replace(/\\/g, '/')
441+
const normalizedChatId = chatId.replace(/\\/g, '/')
442+
const normalizedFilename = sanitizedFilename.replace(/\\/g, '/')
443+
const filePath = `${normalizedChatflowId}/${normalizedChatId}/${normalizedFilename}`
444+
const [buffer] = await bucket.file(filePath).download()
445+
return buffer
358446
} else {
359447
const filePath = path.join(getStoragePath(), chatflowId, chatId, sanitizedFilename)
360448
//raise error if file path is not absolute
@@ -372,6 +460,28 @@ export const streamStorageFile = async (
372460
}
373461
}
374462

463+
export const getGcsClient = () => {
464+
const pathToGcsCredential = process.env.GOOGLE_CLOUD_STORAGE_CREDENTIAL
465+
const projectId = process.env.GOOGLE_CLOUD_STORAGE_PROJ_ID
466+
const bucketName = process.env.GOOGLE_CLOUD_STORAGE_BUCKET_NAME
467+
468+
if (!pathToGcsCredential) {
469+
throw new Error('GOOGLE_CLOUD_STORAGE_CREDENTIAL env variable is required')
470+
}
471+
if (!bucketName) {
472+
throw new Error('GOOGLE_CLOUD_STORAGE_BUCKET_NAME env variable is required')
473+
}
474+
475+
const storageConfig = {
476+
keyFilename: pathToGcsCredential,
477+
...(projectId ? { projectId } : {})
478+
}
479+
480+
const storage = new Storage(storageConfig)
481+
const bucket = storage.bucket(bucketName)
482+
return { storage, bucket }
483+
}
484+
375485
export const getS3Config = () => {
376486
const accessKeyId = process.env.S3_STORAGE_ACCESS_KEY_ID
377487
const secretAccessKey = process.env.S3_STORAGE_SECRET_ACCESS_KEY

packages/server/.env.example

+4
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ PORT=3000
5454
# S3_STORAGE_REGION=us-west-2
5555
# S3_ENDPOINT_URL=<custom-s3-endpoint-url>
5656
# S3_FORCE_PATH_STYLE=false
57+
# GOOGLE_CLOUD_STORAGE_CREDENTIAL=/the/keyfilename/path
58+
# GOOGLE_CLOUD_STORAGE_PROJ_ID=<your-gcp-project-id>
59+
# GOOGLE_CLOUD_STORAGE_BUCKET_NAME=<the-bucket-name>
60+
# GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS=true
5761

5862
# SHOW_COMMUNITY_NODES=true
5963
# DISABLED_NODES=bufferMemory,chatOpenAI (comma separated list of node names to disable)

packages/server/package.json

+2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
"license": "SEE LICENSE IN LICENSE.md",
5858
"dependencies": {
5959
"@aws-sdk/client-secrets-manager": "^3.699.0",
60+
"@google-cloud/logging-winston": "^6.0.0",
6061
"@oclif/core": "4.0.7",
6162
"@opentelemetry/api": "^1.3.0",
6263
"@opentelemetry/auto-instrumentations-node": "^0.52.0",
@@ -95,6 +96,7 @@
9596
"moment": "^2.29.3",
9697
"moment-timezone": "^0.5.34",
9798
"multer": "^1.4.5-lts.1",
99+
"multer-cloud-storage": "^4.0.0",
98100
"multer-s3": "^3.0.1",
99101
"mysql2": "^3.11.3",
100102
"flowise-nim-container-manager": "^1.0.11",

packages/server/src/commands/base.ts

+9
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ export abstract class BaseCommand extends Command {
4949
S3_STORAGE_REGION: Flags.string(),
5050
S3_ENDPOINT_URL: Flags.string(),
5151
S3_FORCE_PATH_STYLE: Flags.string(),
52+
GOOGLE_CLOUD_STORAGE_CREDENTIAL: Flags.string(),
53+
GOOGLE_CLOUD_STORAGE_PROJ_ID: Flags.string(),
54+
GOOGLE_CLOUD_STORAGE_BUCKET_NAME: Flags.string(),
55+
GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS: Flags.string(),
5256
SHOW_COMMUNITY_NODES: Flags.string(),
5357
SECRETKEY_STORAGE_TYPE: Flags.string(),
5458
SECRETKEY_PATH: Flags.string(),
@@ -184,6 +188,11 @@ export abstract class BaseCommand extends Command {
184188
if (flags.S3_STORAGE_REGION) process.env.S3_STORAGE_REGION = flags.S3_STORAGE_REGION
185189
if (flags.S3_ENDPOINT_URL) process.env.S3_ENDPOINT_URL = flags.S3_ENDPOINT_URL
186190
if (flags.S3_FORCE_PATH_STYLE) process.env.S3_FORCE_PATH_STYLE = flags.S3_FORCE_PATH_STYLE
191+
if (flags.GOOGLE_CLOUD_STORAGE_CREDENTIAL) process.env.GOOGLE_CLOUD_STORAGE_CREDENTIAL = flags.GOOGLE_CLOUD_STORAGE_CREDENTIAL
192+
if (flags.GOOGLE_CLOUD_STORAGE_PROJ_ID) process.env.GOOGLE_CLOUD_STORAGE_PROJ_ID = flags.GOOGLE_CLOUD_STORAGE_PROJ_ID
193+
if (flags.GOOGLE_CLOUD_STORAGE_BUCKET_NAME) process.env.GOOGLE_CLOUD_STORAGE_BUCKET_NAME = flags.GOOGLE_CLOUD_STORAGE_BUCKET_NAME
194+
if (flags.GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS)
195+
process.env.GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS = flags.GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS
187196

188197
// Queue
189198
if (flags.MODE) process.env.MODE = flags.MODE

packages/server/src/utils/index.ts

+11
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import { randomBytes } from 'crypto'
4343
import { AES, enc } from 'crypto-js'
4444
import multer from 'multer'
4545
import multerS3 from 'multer-s3'
46+
import MulterGoogleCloudStorage from 'multer-cloud-storage'
4647
import { ChatFlow } from '../database/entities/ChatFlow'
4748
import { ChatMessage } from '../database/entities/ChatMessage'
4849
import { Credential } from '../database/entities/Credential'
@@ -1799,6 +1800,16 @@ export const getMulterStorage = () => {
17991800
})
18001801
})
18011802
return upload
1803+
} else if (storageType === 'gcs') {
1804+
return multer({
1805+
storage: new MulterGoogleCloudStorage({
1806+
projectId: process.env.GOOGLE_CLOUD_STORAGE_PROJ_ID,
1807+
bucket: process.env.GOOGLE_CLOUD_STORAGE_BUCKET_NAME,
1808+
keyFilename: process.env.GOOGLE_CLOUD_STORAGE_CREDENTIAL,
1809+
uniformBucketLevelAccess: Boolean(process.env.GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS) ?? true,
1810+
destination: `uploads/${getOrgId()}`
1811+
})
1812+
})
18021813
} else {
18031814
return multer({ dest: getUploadPath() })
18041815
}

packages/server/src/utils/logger.ts

+37-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import config from './config' // should be replaced by node-config or similar
55
import { createLogger, transports, format } from 'winston'
66
import { NextFunction, Request, Response } from 'express'
77
import { S3ClientConfig } from '@aws-sdk/client-s3'
8+
import { LoggingWinston } from '@google-cloud/logging-winston'
89

910
const { S3StreamLogger } = require('s3-streamlogger')
1011

@@ -13,6 +14,11 @@ const { combine, timestamp, printf, errors } = format
1314
let s3ServerStream: any
1415
let s3ErrorStream: any
1516
let s3ServerReqStream: any
17+
18+
let gcsServerStream: any
19+
let gcsErrorStream: any
20+
let gcsServerReqStream: any
21+
1622
if (process.env.STORAGE_TYPE === 's3') {
1723
const accessKeyId = process.env.S3_STORAGE_ACCESS_KEY_ID
1824
const secretAccessKey = process.env.S3_STORAGE_SECRET_ACCESS_KEY
@@ -60,6 +66,29 @@ if (process.env.STORAGE_TYPE === 's3') {
6066
})
6167
}
6268

69+
if (process.env.STORAGE_TYPE === 'gcs') {
70+
const config = {
71+
projectId: process.env.GOOGLE_CLOUD_STORAGE_PROJ_ID,
72+
keyFilename: process.env.GOOGLE_CLOUD_STORAGE_CREDENTIAL,
73+
defaultCallback: (err: any) => {
74+
if (err) {
75+
console.error('Error logging to GCS: ' + err)
76+
}
77+
}
78+
}
79+
gcsServerStream = new LoggingWinston({
80+
...config,
81+
logName: 'server'
82+
})
83+
gcsErrorStream = new LoggingWinston({
84+
...config,
85+
logName: 'error'
86+
})
87+
gcsServerReqStream = new LoggingWinston({
88+
...config,
89+
logName: 'requests'
90+
})
91+
}
6392
// expect the log dir be relative to the projects root
6493
const logDir = config.logging.dir
6594

@@ -101,7 +130,8 @@ const logger = createLogger({
101130
stream: s3ServerStream
102131
})
103132
]
104-
: [])
133+
: []),
134+
...(process.env.STORAGE_TYPE === 'gcs' ? [gcsServerStream] : [])
105135
],
106136
exceptionHandlers: [
107137
...(!process.env.STORAGE_TYPE || process.env.STORAGE_TYPE === 'local'
@@ -117,7 +147,8 @@ const logger = createLogger({
117147
stream: s3ErrorStream
118148
})
119149
]
120-
: [])
150+
: []),
151+
...(process.env.STORAGE_TYPE === 'gcs' ? [gcsErrorStream] : [])
121152
],
122153
rejectionHandlers: [
123154
...(!process.env.STORAGE_TYPE || process.env.STORAGE_TYPE === 'local'
@@ -133,7 +164,8 @@ const logger = createLogger({
133164
stream: s3ErrorStream
134165
})
135166
]
136-
: [])
167+
: []),
168+
...(process.env.STORAGE_TYPE === 'gcs' ? [gcsErrorStream] : [])
137169
]
138170
})
139171

@@ -168,7 +200,8 @@ export function expressRequestLogger(req: Request, res: Response, next: NextFunc
168200
stream: s3ServerReqStream
169201
})
170202
]
171-
: [])
203+
: []),
204+
...(process.env.STORAGE_TYPE === 'gcs' ? [gcsServerReqStream] : [])
172205
]
173206
})
174207

0 commit comments

Comments
 (0)