Skip to content

Commit

Permalink
🚀 refactor: Enhance Custom Endpoints, Message Logic, and Payload Hand…
Browse files Browse the repository at this point in the history
…ling (danny-avila#2895)

* chore: use node-fetch for OpenAIClient fetch key for non-crashing usage of AbortController in Bun runtime

* chore: variable order

* fix(useSSE): prevent finalHandler call in abortConversation to update messages/conversation after user navigated away

* chore: params order

* refactor: organize intermediate message logic and ensure correct variables are passed

* fix: Add stt and tts routes before upload limiters, prevent bans

* fix(abortRun): temp fix to delete unfinished messages to avoid message thread parent relationship issues

* refactor: Update AnthropicClient to use node-fetch for fetch key and add proxy support

* fix(gptPlugins): ensure parentMessageId/messageId relationship is maintained

* feat(BaseClient): custom fetch function to analyze/edit payloads just before sending (also prevents abortController crash on Bun runtime)

* feat: `directEndpoint` and `titleMessageRole` custom endpoint options

* chore: Bump version to 0.6.6 in data-provider package.json
  • Loading branch information
danny-avila authored May 28, 2024
1 parent 0ee060d commit 40685f6
Show file tree
Hide file tree
Showing 17 changed files with 137 additions and 29 deletions.
6 changes: 6 additions & 0 deletions api/app/clients/AnthropicClient.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const Anthropic = require('@anthropic-ai/sdk');
const { HttpsProxyAgent } = require('https-proxy-agent');
const { encoding_for_model: encodingForModel, get_encoding: getEncoding } = require('tiktoken');
const {
getResponseSender,
Expand Down Expand Up @@ -123,9 +124,14 @@ class AnthropicClient extends BaseClient {
getClient() {
/** @type {Anthropic.default.RequestOptions} */
const options = {
fetch: this.fetch,
apiKey: this.apiKey,
};

if (this.options.proxy) {
options.httpAgent = new HttpsProxyAgent(this.options.proxy);
}

if (this.options.reverseProxyUrl) {
options.baseURL = this.options.reverseProxyUrl;
}
Expand Down
18 changes: 18 additions & 0 deletions api/app/clients/BaseClient.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const crypto = require('crypto');
const fetch = require('node-fetch');
const { supportsBalanceCheck, Constants } = require('librechat-data-provider');
const { getConvo, getMessages, saveMessage, updateMessage, saveConvo } = require('~/models');
const { addSpaceIfNeeded, isEnabled } = require('~/server/utils');
Expand All @@ -17,6 +18,7 @@ class BaseClient {
month: 'long',
day: 'numeric',
});
this.fetch = this.fetch.bind(this);
}

setOptions() {
Expand Down Expand Up @@ -54,6 +56,22 @@ class BaseClient {
});
}

/**
* Makes an HTTP request and logs the process.
*
* @param {RequestInfo} url - The URL to make the request to. Can be a string or a Request object.
* @param {RequestInit} [init] - Optional init options for the request.
* @returns {Promise<Response>} - A promise that resolves to the response of the fetch request.
*/
async fetch(_url, init) {
let url = _url;
if (this.options.directEndpoint) {
url = this.options.reverseProxyUrl;
}
logger.debug(`Making request to ${url}`);
return await fetch(url, init);
}

getBuildMessagesOptions() {
throw new Error('Subclasses must implement getBuildMessagesOptions');
}
Expand Down
5 changes: 3 additions & 2 deletions api/app/clients/OpenAIClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ class OpenAIClient extends BaseClient {
let streamResult = null;
this.modelOptions.user = this.user;
const invalidBaseUrl = this.completionsUrl && extractBaseURL(this.completionsUrl) === null;
const useOldMethod = !!(invalidBaseUrl || !this.isChatCompletion || typeof Bun !== 'undefined');
const useOldMethod = !!(invalidBaseUrl || !this.isChatCompletion);
if (typeof opts.onProgress === 'function' && useOldMethod) {
const completionResult = await this.getCompletion(
payload,
Expand Down Expand Up @@ -829,7 +829,7 @@ class OpenAIClient extends BaseClient {

const instructionsPayload = [
{
role: 'system',
role: this.options.titleMessageRole ?? 'system',
content: `Please generate ${titleInstruction}
${convo}
Expand Down Expand Up @@ -1134,6 +1134,7 @@ ${convo}
let chatCompletion;
/** @type {OpenAI} */
const openai = new OpenAI({
fetch: this.fetch,
apiKey: this.apiKey,
...opts,
});
Expand Down
2 changes: 1 addition & 1 deletion api/app/clients/PluginsClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class PluginsClient extends OpenAIClient {
if (opts.progressCallback) {
opts.onProgress = opts.progressCallback.call(null, {
...(opts.progressOptions ?? {}),
parentMessageId: opts.progressOptions?.parentMessageId ?? userMessage.messageId,
parentMessageId: userMessage.messageId,
messageId: responseMessageId,
});
}
Expand Down
8 changes: 8 additions & 0 deletions api/models/Message.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ module.exports = {
throw new Error('Failed to save message.');
}
},
async updateMessageText({ messageId, text }) {
try {
await Message.updateOne({ messageId }, { text });
} catch (err) {
logger.error('Error updating message text:', err);
throw new Error('Failed to update message text.');
}
},
async updateMessage(message) {
try {
const { messageId, ...update } = message;
Expand Down
2 changes: 1 addition & 1 deletion api/server/controllers/assistants/chatV2.js
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ const chatV2 = async (req, res) => {
handlers,
thread_id,
attachedFileIds,
parentMessageId,
parentMessageId: userMessageId,
responseMessage: openai.responseMessage,
// streamOptions: {

Expand Down
11 changes: 9 additions & 2 deletions api/server/middleware/abortRun.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { CacheKeys, RunStatus, isUUID } = require('librechat-data-provider');
const { initializeClient } = require('~/server/services/Endpoints/assistants');
const { checkMessageGaps, recordUsage } = require('~/server/services/Threads');
const { deleteMessages } = require('~/models/Message');
const { getConvo } = require('~/models/Conversation');
const getLogStores = require('~/cache/getLogStores');
const { sendMessage } = require('~/server/utils');
Expand Down Expand Up @@ -66,13 +67,19 @@ async function abortRun(req, res) {
logger.error('[abortRun] Error fetching or processing run', error);
}

/* TODO: a reconciling strategy between the existing intermediate message would be more optimal than deleting it */
await deleteMessages({
user: req.user.id,
unfinished: true,
conversationId,
});
runMessages = await checkMessageGaps({
openai,
run_id,
endpoint,
thread_id,
run_id,
latestMessageId,
conversationId,
latestMessageId,
});

const finalEvent = {
Expand Down
18 changes: 15 additions & 3 deletions api/server/routes/ask/gptPlugins.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ router.post(
const pluginMap = new Map();
const onAgentAction = async (action, runId) => {
pluginMap.set(runId, action.tool);
sendIntermediateMessage(res, { plugins });
sendIntermediateMessage(res, {
plugins,
parentMessageId: userMessage.messageId,
messageId: responseMessageId,
});
};

const onToolStart = async (tool, input, runId, parentRunId) => {
Expand All @@ -124,7 +128,11 @@ router.post(
}
const extraTokens = ':::plugin:::\n';
plugins.push(latestPlugin);
sendIntermediateMessage(res, { plugins }, extraTokens);
sendIntermediateMessage(
res,
{ plugins, parentMessageId: userMessage.messageId, messageId: responseMessageId },
extraTokens,
);
};

const onToolEnd = async (output, runId) => {
Expand All @@ -142,7 +150,11 @@ router.post(

const onChainEnd = () => {
saveMessage({ ...userMessage, user });
sendIntermediateMessage(res, { plugins });
sendIntermediateMessage(res, {
plugins,
parentMessageId: userMessage.messageId,
messageId: responseMessageId,
});
};

const getAbortData = () => ({
Expand Down
12 changes: 10 additions & 2 deletions api/server/routes/edit/gptPlugins.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ router.post(
if (!start) {
saveMessage({ ...userMessage, user });
}
sendIntermediateMessage(res, { plugin });
sendIntermediateMessage(res, {
plugin,
parentMessageId: userMessage.messageId,
messageId: responseMessageId,
});
// logger.debug('PLUGIN ACTION', formattedAction);
};

Expand All @@ -119,7 +123,11 @@ router.post(
plugin.outputs = steps && steps[0].action ? formatSteps(steps) : 'An error occurred.';
plugin.loading = false;
saveMessage({ ...userMessage, user });
sendIntermediateMessage(res, { plugin });
sendIntermediateMessage(res, {
plugin,
parentMessageId: userMessage.messageId,
messageId: responseMessageId,
});
// logger.debug('CHAIN END', plugin.outputs);
};

Expand Down
4 changes: 4 additions & 0 deletions api/server/routes/files/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ const initialize = async () => {
router.use(checkBan);
router.use(uaParser);

/* Important: stt/tts routes must be added before the upload limiters */
router.use('/stt', stt);
router.use('/tts', tts);

const upload = await createMulterInstance();
const { fileUploadIpLimiter, fileUploadUserLimiter } = createFileLimiters();
router.post('*', fileUploadIpLimiter, fileUploadUserLimiter);
Expand Down
2 changes: 2 additions & 0 deletions api/server/services/Endpoints/custom/initializeClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ const initializeClient = async ({ req, res, endpointOption }) => {
modelDisplayLabel: endpointConfig.modelDisplayLabel,
titleMethod: endpointConfig.titleMethod ?? 'completion',
contextStrategy: endpointConfig.summarize ? 'summarize' : null,
directEndpoint: endpointConfig.directEndpoint,
titleMessageRole: endpointConfig.titleMessageRole,
endpointTokenConfig,
};

Expand Down
54 changes: 40 additions & 14 deletions api/server/services/Runs/StreamRunManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ const {
} = require('librechat-data-provider');
const { retrieveAndProcessFile } = require('~/server/services/Files/process');
const { processRequiredActions } = require('~/server/services/ToolService');
const { saveMessage, updateMessageText } = require('~/models/Message');
const { createOnProgress, sendMessage } = require('~/server/utils');
const { processMessages } = require('~/server/services/Threads');
const { saveMessage } = require('~/models');
const { logger } = require('~/config');

/**
Expand Down Expand Up @@ -68,6 +68,8 @@ class StreamRunManager {
this.attachedFileIds = fields.attachedFileIds;
/** @type {undefined | Promise<ChatCompletion>} */
this.visionPromise = fields.visionPromise;
/** @type {boolean} */
this.savedInitialMessage = false;

/**
* @type {Object.<AssistantStreamEvents, (event: AssistantStreamEvent) => Promise<void>>}
Expand Down Expand Up @@ -129,6 +131,33 @@ class StreamRunManager {
sendMessage(this.res, contentData);
}

/* <------------------ Misc. Helpers ------------------> */
/** Returns the latest intermediate text
* @returns {string}
*/
getText() {
return this.intermediateText;
}

/** Saves the initial intermediate message
* @returns {Promise<void>}
*/
async saveInitialMessage() {
return saveMessage({
conversationId: this.finalMessage.conversationId,
messageId: this.finalMessage.messageId,
parentMessageId: this.parentMessageId,
model: this.req.body.assistant_id,
endpoint: this.req.body.endpoint,
isCreatedByUser: false,
user: this.req.user.id,
text: this.getText(),
sender: 'Assistant',
unfinished: true,
error: false,
});
}

/* <------------------ Main Event Handlers ------------------> */

/**
Expand Down Expand Up @@ -530,23 +559,20 @@ class StreamRunManager {
const stepKey = message_creation.message_id;
const index = this.getStepIndex(stepKey);
this.orderedRunSteps.set(index, message_creation);
const getText = () => this.intermediateText;

// Create the Factory Function to stream the message
const { onProgress: progressCallback } = createOnProgress({
onProgress: throttle(
() => {
const text = getText();
saveMessage({
messageId: this.finalMessage.messageId,
conversationId: this.finalMessage.conversationId,
parentMessageId: this.parentMessageId,
model: this.req.body.model,
user: this.req.user.id,
sender: 'Assistant',
unfinished: true,
error: false,
text,
});
if (!this.savedInitialMessage) {
this.saveInitialMessage();
this.savedInitialMessage = true;
} else {
updateMessageText({
messageId: this.finalMessage.messageId,
text: this.getText(),
});
}
},
2000,
{ trailing: false },
Expand Down
Binary file modified bun.lockb
Binary file not shown.
2 changes: 1 addition & 1 deletion client/src/hooks/Audio/usePauseGlobalAudio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import store from '~/store';
function usePauseGlobalAudio(index = 0) {
/* Global Audio Variables */
const setAudioRunId = useSetRecoilState(store.audioRunFamily(index));
const setGlobalIsPlaying = useSetRecoilState(store.globalAudioPlayingFamily(index));
const setIsGlobalAudioFetching = useSetRecoilState(store.globalAudioFetchingFamily(index));
const [globalAudioURL, setGlobalAudioURL] = useRecoilState(store.globalAudioURLFamily(index));
const setGlobalIsPlaying = useSetRecoilState(store.globalAudioPlayingFamily(index));

const pauseGlobalAudio = useCallback(() => {
if (globalAudioURL) {
Expand Down
16 changes: 15 additions & 1 deletion client/src/hooks/SSE/useSSE.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ export default function useSSE(submission: TSubmission | null, index = 0) {
setShowStopButton(false);
setCompleted((prev) => new Set(prev.add(submission?.initialResponse?.messageId)));

const currentMessages = getMessages();
// Early return if messages are empty; i.e., the user navigated away
if (!currentMessages?.length) {
return setIsSubmitting(false);
}

// update the messages; if assistants endpoint, client doesn't receive responseMessage
if (runMessages) {
setMessages([...runMessages]);
Expand Down Expand Up @@ -323,7 +329,15 @@ export default function useSSE(submission: TSubmission | null, index = 0) {

setIsSubmitting(false);
},
[genTitle, queryClient, setMessages, setConversation, setIsSubmitting, setShowStopButton],
[
genTitle,
queryClient,
getMessages,
setMessages,
setConversation,
setIsSubmitting,
setShowStopButton,
],
);

const errorHandler = useCallback(
Expand Down
2 changes: 1 addition & 1 deletion packages/data-provider/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "librechat-data-provider",
"version": "0.6.5",
"version": "0.6.6",
"description": "data services for librechat apps",
"main": "dist/index.js",
"module": "dist/index.es.js",
Expand Down
4 changes: 3 additions & 1 deletion packages/data-provider/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ export const endpointSchema = z.object({
addParams: z.record(z.any()).optional(),
dropParams: z.array(z.string()).optional(),
customOrder: z.number().optional(),
directEndpoint: z.boolean().optional(),
titleMessageRole: z.string().optional(),
});

export type TEndpoint = z.infer<typeof endpointSchema>;
Expand Down Expand Up @@ -747,7 +749,7 @@ export enum Constants {
/** Key for the app's version. */
VERSION = 'v0.7.2',
/** Key for the Custom Config's version (librechat.yaml). */
CONFIG_VERSION = '1.1.2',
CONFIG_VERSION = '1.1.3',
/** Standard value for the first message's `parentMessageId` value, to indicate no parent exists. */
NO_PARENT = '00000000-0000-0000-0000-000000000000',
/** Fixed, encoded domain length for Azure OpenAI Assistants Function name parsing. */
Expand Down

0 comments on commit 40685f6

Please sign in to comment.