Skip to content

Commit a82d36c

Browse files
authored
Merge branch 'main' into fix/cohere-rerank-custom-model
2 parents cb66cd9 + 34aa82e commit a82d36c

12 files changed

Lines changed: 413 additions & 530 deletions

File tree

packages/server/src/controllers/internal-predictions/index.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const createInternalPrediction = async (req: Request, res: Response, next: NextF
3333
const createAndStreamInternalPrediction = async (req: Request, res: Response, next: NextFunction) => {
3434
const chatId = req.body.chatId
3535
const sseStreamer = getRunningExpressApp().sseStreamer
36+
const isQueueMode = process.env.MODE === MODE.QUEUE
3637

3738
try {
3839
sseStreamer.addClient(chatId, res)
@@ -42,8 +43,8 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
4243
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
4344
res.flushHeaders()
4445

45-
if (process.env.MODE === MODE.QUEUE) {
46-
getRunningExpressApp().redisSubscriber.subscribe(chatId)
46+
if (isQueueMode) {
47+
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
4748
}
4849

4950
const apiResponse = await utilBuildChatflow(req, true)
@@ -54,6 +55,9 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
5455
}
5556
next(error)
5657
} finally {
58+
if (isQueueMode && chatId) {
59+
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
60+
}
5761
sseStreamer.removeClient(chatId)
5862
}
5963
}

packages/server/src/controllers/predictions/index.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
6464
chatId = req.body.chatId ?? req.body.overrideConfig?.sessionId ?? uuidv4()
6565
req.body.chatId = chatId
6666
}
67+
const isQueueMode = process.env.MODE === MODE.QUEUE
6768
try {
6869
sseStreamer.addExternalClient(chatId, res)
6970
res.setHeader('Content-Type', 'text/event-stream')
@@ -72,8 +73,8 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
7273
res.setHeader('X-Accel-Buffering', 'no') //nginx config: https://serverfault.com/a/801629
7374
res.flushHeaders()
7475

75-
if (process.env.MODE === MODE.QUEUE) {
76-
getRunningExpressApp().redisSubscriber.subscribe(chatId)
76+
if (isQueueMode) {
77+
await getRunningExpressApp().redisSubscriber.subscribe(chatId)
7778
}
7879

7980
const apiResponse = await predictionsServices.buildChatflow(req)
@@ -84,6 +85,9 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
8485
}
8586
next(error)
8687
} finally {
88+
if (isQueueMode && chatId) {
89+
await getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
90+
}
8791
sseStreamer.removeClient(chatId)
8892
}
8993
} else {

packages/server/src/controllers/variables/index.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ const createVariable = async (req: Request, res: Response, next: NextFunction) =
2222
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Error: toolsController.createTool - workspace ${workspaceId} not found!`)
2323
}
2424
const body = req.body
25-
body.workspaceId = workspaceId
25+
// Explicit allowlist — id/workspaceId/timestamps must not be overrideable by client
2626
const newVariable = new Variable()
27-
Object.assign(newVariable, body)
27+
if (body.name !== undefined) newVariable.name = body.name
28+
if (body.value !== undefined) newVariable.value = body.value
29+
if (body.type !== undefined) newVariable.type = body.type
30+
newVariable.workspaceId = workspaceId
2831
const apiResponse = await variablesService.createVariable(newVariable, orgId)
2932
return res.json(apiResponse)
3033
} catch (error) {
@@ -91,8 +94,11 @@ const updateVariable = async (req: Request, res: Response, next: NextFunction) =
9194
return res.status(404).send('Variable not found in the database')
9295
}
9396
const body = req.body
97+
// Explicit allowlist — id/workspaceId/timestamps must not be overrideable by client
9498
const updatedVariable = new Variable()
95-
Object.assign(updatedVariable, body)
99+
if (body.name !== undefined) updatedVariable.name = body.name
100+
if (body.value !== undefined) updatedVariable.value = body.value
101+
if (body.type !== undefined) updatedVariable.type = body.type
96102
const apiResponse = await variablesService.updateVariable(variable, updatedVariable)
97103
return res.json(apiResponse)
98104
} catch (error) {

packages/server/src/enterprise/controllers/user.controller.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ export class UserController {
5656
if (!currentUser) {
5757
throw new InternalFlowiseError(StatusCodes.UNAUTHORIZED, UserErrorMessage.USER_NOT_FOUND)
5858
}
59-
const { id } = req.body
59+
const { id, name, oldPassword, newPassword, confirmPassword } = req.body
6060
if (currentUser.id !== id) {
6161
throw new InternalFlowiseError(StatusCodes.FORBIDDEN, UserErrorMessage.USER_NOT_FOUND)
6262
}
63-
const user = await userService.updateUser(req.body)
63+
const user = await userService.updateUser({ id, name, updatedBy: currentUser.id, oldPassword, newPassword, confirmPassword })
6464
return res.status(StatusCodes.OK).json(user)
6565
} catch (error) {
6666
next(error)

packages/server/src/enterprise/services/user.service.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,16 +150,10 @@ export class UserService {
150150
if (!updateUserData) throw new InternalFlowiseError(StatusCodes.NOT_FOUND, UserErrorMessage.USER_NOT_FOUND)
151151
}
152152

153-
newUserData.createdBy = oldUserData.createdBy
154-
155153
if (newUserData.name) {
156154
this.validateUserName(newUserData.name)
157155
}
158156

159-
if (newUserData.status) {
160-
this.validateUserStatus(newUserData.status)
161-
}
162-
163157
if (newUserData.oldPassword && newUserData.newPassword && newUserData.confirmPassword) {
164158
if (!oldUserData.credential) {
165159
throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, UserErrorMessage.INVALID_USER_CREDENTIAL)
@@ -176,7 +170,23 @@ export class UserService {
176170
newUserData.tokenExpiry = undefined
177171
}
178172

179-
updatedUser = queryRunner.manager.merge(User, oldUserData, newUserData)
173+
const safePatch: Partial<User> = {
174+
createdBy: oldUserData.createdBy // always preserve from DB
175+
}
176+
177+
if (newUserData.name) {
178+
safePatch.name = newUserData.name
179+
}
180+
181+
safePatch.updatedBy = newUserData.updatedBy // always set (controller forces req.user.id)
182+
if (newUserData.oldPassword && newUserData.newPassword && newUserData.confirmPassword) {
183+
// credential/tempToken/tokenExpiry were set by the validated workflow above
184+
safePatch.credential = newUserData.credential
185+
safePatch.tempToken = newUserData.tempToken
186+
safePatch.tokenExpiry = newUserData.tokenExpiry
187+
}
188+
189+
updatedUser = queryRunner.manager.merge(User, oldUserData, safePatch)
180190
await queryRunner.startTransaction()
181191
await this.saveUser(updatedUser, queryRunner)
182192
await queryRunner.commitTransaction()

packages/server/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ export class App {
128128

129129
// Initialize SSE Streamer
130130
this.sseStreamer = new SSEStreamer()
131+
this.sseStreamer.startHeartbeat()
131132
logger.info('🌊 [server]: SSE Streamer initialized successfully')
132133

133134
// Init Queues
@@ -148,6 +149,7 @@ export class App {
148149

149150
this.redisSubscriber = new RedisEventSubscriber(this.sseStreamer)
150151
await this.redisSubscriber.connect()
152+
this.redisSubscriber.startPeriodicCleanup()
151153
logger.info('🔗 [server]: Redis event subscriber connected successfully')
152154
}
153155

@@ -356,6 +358,7 @@ export class App {
356358

357359
async stopApp() {
358360
try {
361+
this.sseStreamer.stopHeartbeat()
359362
const removePromises: any[] = []
360363
removePromises.push(this.telemetry.flush())
361364
if (this.queueManager) {

packages/server/src/queue/PredictionQueue.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ export class PredictionQueue extends BaseQueue {
6363
}
6464

6565
async processJob(data: IExecuteFlowParams | IGenerateAgentflowv2Params) {
66+
if (this.redisPublisher) {
67+
await this.redisPublisher.connect()
68+
}
6669
if (this.appDataSource) data.appDataSource = this.appDataSource
6770
if (this.telemetry) data.telemetry = this.telemetry
6871
if (this.cachePool) data.cachePool = this.cachePool

0 commit comments

Comments
 (0)