diff --git a/README.md b/README.md index 8aa8d05746a..390f9a3f63c 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,7 @@ Try our demo at [https://demo.ragflow.io](https://demo.ragflow.io). ## πŸ”₯ Latest Updates +- 2025-12-03 Adds WebSocket API for streaming responses, enabling real-time communication for WeChat Mini Programs and other WebSocket clients. - 2025-11-19 Supports Gemini 3 Pro. - 2025-11-12 Supports data synchronization from Confluence, S3, Notion, Discord, Google Drive. - 2025-10-23 Supports MinerU & Docling as document parsing methods. @@ -132,6 +133,7 @@ releases! 🌟 - Configurable LLMs as well as embedding models. - Multiple recall paired with fused re-ranking. - Intuitive APIs for seamless integration with business. +- WebSocket support for real-time streaming responses (ideal for WeChat Mini Programs and mobile apps). ## πŸ”Ž System Architecture diff --git a/api/apps/sdk/websocket.py b/api/apps/sdk/websocket.py new file mode 100644 index 00000000000..54466d2d834 --- /dev/null +++ b/api/apps/sdk/websocket.py @@ -0,0 +1,250 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +WebSocket SDK API for RAGFlow Streaming Responses + +This module provides WebSocket endpoints following the SDK API pattern, +mirroring the structure of session.py for consistency. +""" + +import logging +import json +from quart import websocket + +from api.db.services.dialog_service import DialogService +from api.db.services.canvas_service import UserCanvasService +from api.db.services.conversation_service import async_completion as rag_completion +from api.db.services.canvas_service import completion as agent_completion +from api.utils.api_utils import ws_token_required +from common.constants import StatusEnum + + +async def send_ws_error(error_message, code=500): + """Send error message to WebSocket client.""" + error_response = { + "code": code, + "message": error_message, + "data": { + "answer": f"**ERROR**: {error_message}", + "reference": [] + } + } + await websocket.send(json.dumps(error_response, ensure_ascii=False)) + + +async def send_ws_message(data, code=0, message=""): + """Send message to WebSocket client.""" + response = { + "code": code, + "message": message, + "data": data + } + await websocket.send(json.dumps(response, ensure_ascii=False)) + + +@manager.websocket("/ws/chats//completions") # noqa: F821 +@ws_token_required +async def chat_completions_ws(tenant_id, chat_id): + """ + WebSocket endpoint for streaming chat completions. + Follows the same pattern as the HTTP POST /chats//completions endpoint. + Uses /ws/ prefix to avoid routing conflicts with HTTP endpoints. + """ + # Verify chat ownership + if not DialogService.query(tenant_id=tenant_id, id=chat_id, status=StatusEnum.VALID.value): + await send_ws_error(f"You don't own the chat {chat_id}", code=404) + await websocket.close(1008) + return + + logging.info(f"WebSocket chat connection established for chat_id: {chat_id}, tenant: {tenant_id}") + + try: + while True: + message = await websocket.receive() + + try: + req = json.loads(message) + except json.JSONDecodeError as e: + await send_ws_error(f"Invalid JSON format: {str(e)}", code=400) + continue + + question = req.get("question", "") + session_id = req.get("session_id") + stream = req.get("stream", True) + + if question is None: + await send_ws_error("Missing required parameter: question", code=400) + continue + + try: + if stream: + async for response_chunk in rag_completion( + tenant_id=tenant_id, + chat_id=chat_id, + question=question, + session_id=session_id, + stream=True, + **{k: v for k, v in req.items() if k not in ["question", "session_id", "stream"]} + ): + if response_chunk.startswith("data:"): + json_str = response_chunk[5:].strip() + try: + response_data = json.loads(json_str) + await websocket.send(json.dumps(response_data, ensure_ascii=False)) + except json.JSONDecodeError: + continue + + logging.info(f"Chat completion streamed successfully for chat_id: {chat_id}") + else: + response = None + async for resp in rag_completion( + tenant_id=tenant_id, + chat_id=chat_id, + question=question, + session_id=session_id, + stream=False, + **{k: v for k, v in req.items() if k not in ["question", "session_id", "stream"]} + ): + response = resp + break + + if response: + await send_ws_message(response) + else: + await send_ws_error("No response generated", code=500) + + except Exception as e: + logging.exception(f"Error during chat completion: {str(e)}") + await send_ws_error(str(e)) + + except Exception as e: + logging.exception(f"WebSocket error: {str(e)}") + try: + await send_ws_error(str(e)) + except Exception: + pass + await websocket.close(1011) + + finally: + logging.info(f"WebSocket chat connection closed for chat_id: {chat_id}") + + +@manager.websocket("/ws/agents//completions") # noqa: F821 +@ws_token_required +async def agent_completions_ws(tenant_id, agent_id): + """ + WebSocket endpoint for streaming agent completions. + Follows the same pattern as the HTTP POST /agents//completions endpoint. + Uses /ws/ prefix to avoid routing conflicts with HTTP endpoints. + """ + # Verify agent ownership + if not UserCanvasService.query(user_id=tenant_id, id=agent_id): + await send_ws_error(f"You don't own the agent {agent_id}", code=404) + await websocket.close(1008) + return + + logging.info(f"WebSocket agent connection established for agent_id: {agent_id}, tenant: {tenant_id}") + + try: + while True: + message = await websocket.receive() + + try: + req = json.loads(message) + except json.JSONDecodeError as e: + await send_ws_error(f"Invalid JSON format: {str(e)}", code=400) + continue + + question = req.get("question", "") + session_id = req.get("session_id") + stream = req.get("stream", True) + + if not question: + await send_ws_error("Missing required parameter: question", code=400) + continue + + try: + if stream: + async for response_chunk in agent_completion( + tenant_id=tenant_id, + agent_id=agent_id, + question=question, + session_id=session_id, + stream=True, + **{k: v for k, v in req.items() if k not in ["question", "session_id", "stream"]} + ): + if isinstance(response_chunk, str) and response_chunk.startswith("data:"): + json_str = response_chunk[5:].strip() + try: + response_data = json.loads(json_str) + if response_data.get("event") in ["message", "message_end"]: + await websocket.send(json.dumps({ + "code": 0, + "message": "", + "data": response_data + }, ensure_ascii=False)) + except json.JSONDecodeError: + continue + + await send_ws_message(True) + logging.info(f"Agent completion streamed successfully for agent_id: {agent_id}") + else: + full_content = "" + reference = {} + final_ans = None + + async for response_chunk in agent_completion( + tenant_id=tenant_id, + agent_id=agent_id, + question=question, + session_id=session_id, + stream=False, + **{k: v for k, v in req.items() if k not in ["question", "session_id", "stream"]} + ): + if isinstance(response_chunk, str) and response_chunk.startswith("data:"): + try: + ans = json.loads(response_chunk[5:]) + if ans["event"] == "message": + full_content += ans["data"]["content"] + if ans.get("data", {}).get("reference", None): + reference.update(ans["data"]["reference"]) + final_ans = ans + except Exception as e: + await send_ws_error(str(e)) + continue + + if final_ans: + final_ans["data"]["content"] = full_content + final_ans["data"]["reference"] = reference + await send_ws_message(final_ans) + else: + await send_ws_error("No response generated", code=500) + + except Exception as e: + logging.exception(f"Error during agent completion: {str(e)}") + await send_ws_error(str(e)) + + except Exception as e: + logging.exception(f"WebSocket error: {str(e)}") + try: + await send_ws_error(str(e)) + except Exception: + pass + await websocket.close(1011) + + finally: + logging.info(f"WebSocket agent connection closed for agent_id: {agent_id}") + diff --git a/api/utils/api_utils.py b/api/utils/api_utils.py index 6518e9c61c1..eef255a1c8d 100644 --- a/api/utils/api_utils.py +++ b/api/utils/api_utils.py @@ -1,5 +1,5 @@ # -# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -31,7 +31,6 @@ jsonify, request ) - from peewee import OperationalError from common.constants import ActiveEnum @@ -283,6 +282,93 @@ async def adecorated_function(*args, **kwargs): return decorated_function +def ws_token_required(func): + """ + WebSocket authentication decorator for SDK endpoints. + Follows the same pattern as token_required but for WebSocket connections. + """ + from quart import websocket + from itsdangerous.url_safe import URLSafeTimedSerializer as Serializer + from api.db.services.user_service import UserService + from common.constants import StatusEnum + + async def get_tenant_id_from_websocket(**kwargs): + """Extract tenant_id from WebSocket authentication.""" + # Method 1: Try API Token authentication from Authorization header + authorization = websocket.headers.get("Authorization", "") + + if authorization: + try: + authorization_parts = authorization.split() + if len(authorization_parts) >= 2: + token = authorization_parts[1] + objs = APIToken.query(token=token) + if objs: + kwargs["tenant_id"] = objs[0].tenant_id + logging.info("WebSocket authenticated via API token") + return True, kwargs + except Exception as e: + logging.error(f"WebSocket API token auth error: {str(e)}") + + # Method 2: Try User Session authentication (JWT) + try: + jwt = Serializer(secret_key=settings.SECRET_KEY) + auth_token = websocket.headers.get("Authorization") or \ + websocket.args.get("authorization") or \ + websocket.args.get("token") + + if auth_token: + try: + if auth_token.startswith("Bearer "): + auth_token = auth_token[7:] + access_token = str(jwt.loads(auth_token)) + if access_token and len(access_token.strip()) >= 32: + user = UserService.query(access_token=access_token, status=StatusEnum.VALID.value) + if user and user[0]: + kwargs["tenant_id"] = user[0].id + logging.info("WebSocket authenticated via user session") + return True, kwargs + except Exception: + pass + except Exception: + pass + + # Method 3: Try query parameter authentication + token_param = websocket.args.get("token") + if token_param: + try: + objs = APIToken.query(token=token_param) + if objs: + kwargs["tenant_id"] = objs[0].tenant_id + logging.info("WebSocket authenticated via query parameter") + return True, kwargs + except Exception: + pass + + return False, "Authentication required. Please provide valid API token or user session." + + @wraps(func) + async def adecorated_function(*args, **kwargs): + """Async wrapper for WebSocket endpoint.""" + success, result = await get_tenant_id_from_websocket(**kwargs) + + if not success: + # Authentication failed - send error and close connection + error_response = { + "code": RetCode.AUTHENTICATION_ERROR, + "message": result, + "data": {"answer": f"**ERROR**: {result}", "reference": []} + } + await websocket.send(json.dumps(error_response, ensure_ascii=False)) + await websocket.close(1008, result) # 1008 = Policy Violation + return + + # Authentication successful - call the actual handler + return await func(*args, **result) + + return adecorated_function + + def get_result(code=RetCode.SUCCESS, message="", data=None, total=None): """ Standard API response format: diff --git a/docs/references/websocket_api_reference.md b/docs/references/websocket_api_reference.md new file mode 100644 index 00000000000..741cf0d6f5d --- /dev/null +++ b/docs/references/websocket_api_reference.md @@ -0,0 +1,624 @@ +--- +sidebar_position: 10 +slug: /websocket_api_reference +--- + +# WebSocket API for Streaming Responses + +--- + +:::info KUDOS +This document is contributed by our community contributor [SmartDever02](https://github.com/SmartDever02). πŸ‘ +::: + +## Overview + +RAGFlow now supports WebSocket connections for real-time streaming responses. This feature is particularly useful for platforms like **WeChat Mini Programs** that require persistent bidirectional connections for interactive chat experiences. + +## Why WebSocket? + +Traditional HTTP-based Server-Sent Events (SSE) work well for most web applications, but some platforms have specific requirements: + +- **WeChat Mini Programs** require WebSocket for real-time communication +- **Mobile apps** benefit from persistent connections with lower latency +- **Interactive applications** need bidirectional communication +- **Network efficiency** - reuse connections instead of creating new ones for each message + +## Connection URL + +``` +ws://your-ragflow-host/v1/ws/chat +wss://your-ragflow-host/v1/ws/chat (for SSL/TLS) +``` + +## Authentication + +WebSocket connections support multiple authentication methods: + +### 1. API Token (Recommended for Integrations) + +Pass your API token in the Authorization header or as a query parameter: + +**Header-based (preferred):** +```javascript +const ws = new WebSocket('ws://host/v1/ws/chat', { + headers: { + 'Authorization': 'Bearer ragflow-your-api-token' + } +}); +``` + +**Query parameter (fallback for clients that can't set headers):** +```javascript +const ws = new WebSocket('ws://host/v1/ws/chat?token=ragflow-your-api-token'); +``` + +### 2. User Session (For Web Applications) + +If you're already logged in via the web interface, you can use your session JWT: + +```javascript +const ws = new WebSocket('ws://host/v1/ws/chat', { + headers: { + 'Authorization': 'your-jwt-token' + } +}); +``` + +## Message Format + +### Client β†’ Server (Request) + +Send a JSON message to start a chat completion: + +```json +{ + "type": "chat", + "chat_id": "your-dialog-id", + "question": "What is RAGFlow?", + "stream": true, + "session_id": "optional-session-id", + "kb_ids": ["optional-kb-id"], + "doc_ids": "optional-doc-ids" +} +``` + +**Fields:** +- `type` (string, required): Message type, currently only `"chat"` is supported +- `chat_id` (string, required): Your dialog/chat ID +- `question` (string, required): User's question or message +- `stream` (boolean, optional): Enable streaming responses (default: `true`) +- `session_id` (string, optional): Conversation session ID. If not provided, a new session is created +- `kb_ids` (array, optional): Knowledge base IDs to query for RAG +- `doc_ids` (string, optional): Comma-separated document IDs to prioritize +- `files` (array, optional): File IDs attached to this message + +### Server β†’ Client (Response) + +The server sends multiple messages for a streaming response: + +**Streaming chunk:** +```json +{ + "code": 0, + "message": "", + "data": { + "answer": "RAGFlow is an open-source", + "reference": {}, + "id": "message-id", + "session_id": "session-id" + } +} +``` + +**Completion marker:** +```json +{ + "code": 0, + "message": "", + "data": true +} +``` + +**Error message:** +```json +{ + "code": 500, + "message": "Error description", + "data": { + "answer": "**ERROR**: Error details", + "reference": [] + } +} +``` + +## Example Clients + +### JavaScript (Web Browser / Node.js) + +```javascript +// Create WebSocket connection +const ws = new WebSocket('ws://localhost/v1/ws/chat?token=ragflow-your-token'); + +// Connection opened +ws.addEventListener('open', function (event) { + console.log('Connected to RAGFlow WebSocket'); + + // Send a chat message + ws.send(JSON.stringify({ + type: 'chat', + chat_id: 'your-chat-id', + question: 'What is artificial intelligence?', + stream: true + })); +}); + +// Listen for messages +ws.addEventListener('message', function (event) { + const response = JSON.parse(event.data); + + // Check for completion + if (response.data === true) { + console.log('Stream completed'); + return; + } + + // Check for errors + if (response.code !== 0) { + console.error('Error:', response.message); + return; + } + + // Display incremental answer + console.log('Received chunk:', response.data.answer); + + // You can append to UI here + // document.getElementById('answer').innerText += response.data.answer; +}); + +// Handle errors +ws.addEventListener('error', function (event) { + console.error('WebSocket error:', event); +}); + +// Handle connection close +ws.addEventListener('close', function (event) { + console.log('WebSocket closed:', event.code, event.reason); +}); +``` + +### WeChat Mini Program + +```javascript +// WeChat Mini Program WebSocket example +const app = getApp(); + +Page({ + data: { + answer: '', + socket: null + }, + + onLoad: function() { + // Connect to WebSocket + const socket = wx.connectSocket({ + url: 'wss://your-ragflow-host/v1/ws/chat?token=ragflow-your-token', + success: () => { + console.log('WebSocket connected'); + } + }); + + // Connection opened + socket.onOpen(() => { + console.log('WebSocket connection established'); + this.setData({ socket: socket }); + + // Send chat message + socket.send({ + data: JSON.stringify({ + type: 'chat', + chat_id: 'your-chat-id', + question: 'δ½ ε₯½οΌŒδ»€δΉˆζ˜―RAGFlow?', + stream: true + }) + }); + }); + + // Receive messages + socket.onMessage((res) => { + const response = JSON.parse(res.data); + + // Check for completion + if (response.data === true) { + console.log('Stream completed'); + return; + } + + // Check for errors + if (response.code !== 0) { + wx.showToast({ + title: response.message, + icon: 'none' + }); + return; + } + + // Append incremental answer + this.setData({ + answer: this.data.answer + response.data.answer + }); + }); + + // Handle errors + socket.onError((error) => { + console.error('WebSocket error:', error); + wx.showToast({ + title: 'Connection error', + icon: 'none' + }); + }); + + // Handle close + socket.onClose(() => { + console.log('WebSocket connection closed'); + }); + }, + + onUnload: function() { + // Close WebSocket when leaving page + if (this.data.socket) { + this.data.socket.close(); + } + } +}); +``` + +### Python + +```python +import websocket +import json +import threading + +class RAGFlowWebSocketClient: + def __init__(self, url, token): + self.url = f"{url}?token={token}" + self.ws = None + + def on_message(self, ws, message): + """Handle incoming messages""" + response = json.loads(message) + + # Check for completion + if response['data'] is True: + print('\nStream completed') + return + + # Check for errors + if response['code'] != 0: + print(f"Error: {response['message']}") + return + + # Print incremental answer + print(response['data']['answer'], end='', flush=True) + + def on_error(self, ws, error): + """Handle errors""" + print(f"Error: {error}") + + def on_close(self, ws, close_status_code, close_msg): + """Handle connection close""" + print(f"\nConnection closed: {close_status_code} - {close_msg}") + + def on_open(self, ws): + """Handle connection open""" + print("Connected to RAGFlow") + + # Send chat message in a separate thread + def send_message(): + message = { + 'type': 'chat', + 'chat_id': 'your-chat-id', + 'question': 'What is machine learning?', + 'stream': True + } + ws.send(json.dumps(message)) + + threading.Thread(target=send_message).start() + + def connect(self): + """Establish WebSocket connection""" + self.ws = websocket.WebSocketApp( + self.url, + on_open=self.on_open, + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close + ) + + # Run forever (blocking) + self.ws.run_forever() + +# Usage +if __name__ == '__main__': + client = RAGFlowWebSocketClient( + url='ws://localhost/v1/ws/chat', + token='ragflow-your-api-token' + ) + client.connect() +``` + +### Go + +```go +package main + +import ( + "encoding/json" + "fmt" + "log" + "github.com/gorilla/websocket" +) + +type ChatRequest struct { + Type string `json:"type"` + ChatID string `json:"chat_id"` + Question string `json:"question"` + Stream bool `json:"stream"` +} + +type ChatResponse struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data"` +} + +func main() { + // Connect to WebSocket + url := "ws://localhost/v1/ws/chat?token=ragflow-your-token" + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + log.Fatal("dial:", err) + } + defer conn.Close() + + // Send chat request + request := ChatRequest{ + Type: "chat", + ChatID: "your-chat-id", + Question: "What is deep learning?", + Stream: true, + } + + if err := conn.WriteJSON(request); err != nil { + log.Fatal("write:", err) + } + + // Read responses + for { + var response ChatResponse + if err := conn.ReadJSON(&response); err != nil { + log.Println("read:", err) + return + } + + // Check for completion + if data, ok := response.Data.(bool); ok && data { + fmt.Println("\nStream completed") + break + } + + // Check for errors + if response.Code != 0 { + log.Printf("Error: %s\n", response.Message) + break + } + + // Print incremental answer + if dataMap, ok := response.Data.(map[string]interface{}); ok { + if answer, ok := dataMap["answer"].(string); ok { + fmt.Print(answer) + } + } + } +} +``` + +## Connection Management + +### Persistent Connections + +WebSocket connections are persistent and can handle multiple request/response cycles without reconnecting: + +```javascript +const ws = new WebSocket('ws://host/v1/ws/chat?token=your-token'); + +ws.onopen = () => { + // Send first question + ws.send(JSON.stringify({ + type: 'chat', + chat_id: 'chat-id', + question: 'First question?' + })); + + // After receiving the complete response, you can send another question + // without reconnecting +}; + +let responseCount = 0; +ws.onmessage = (event) => { + const response = JSON.parse(event.data); + + if (response.data === true) { + responseCount++; + + // Send next question + if (responseCount === 1) { + ws.send(JSON.stringify({ + type: 'chat', + chat_id: 'chat-id', + session_id: 'same-session-id', // Continue conversation + question: 'Follow-up question?' + })); + } + } +}; +``` + +### Error Handling + +Always implement proper error handling: + +```javascript +ws.onerror = (error) => { + console.error('WebSocket error:', error); + // Implement reconnection logic if needed +}; + +ws.onclose = (event) => { + if (event.code !== 1000) { + // Abnormal closure - implement reconnection + console.log('Reconnecting in 3 seconds...'); + setTimeout(() => { + // Reconnect logic here + }, 3000); + } +}; +``` + +### Close Codes + +Common WebSocket close codes: + +- `1000` - Normal closure +- `1008` - Policy violation (authentication failed) +- `1011` - Internal server error +- `1006` - Abnormal closure (connection lost) + +## Session Management + +### Creating a New Session + +Don't provide a `session_id` in your first message: + +```json +{ + "type": "chat", + "chat_id": "your-chat-id", + "question": "First question" +} +``` + +The server will create a new session and return the `session_id` in the response. + +### Continuing a Session + +Use the `session_id` from previous responses: + +```json +{ + "type": "chat", + "chat_id": "your-chat-id", + "session_id": "returned-session-id", + "question": "Follow-up question" +} +``` + +## Health Check + +Test WebSocket connectivity without authentication: + +```javascript +const ws = new WebSocket('ws://host/v1/ws/health'); + +ws.onopen = () => { + ws.send('ping'); +}; + +ws.onmessage = (event) => { + console.log('Health check:', JSON.parse(event.data)); +}; +``` + +## Troubleshooting + +### Connection Refused + +- Check if RAGFlow server is running +- Verify the correct host and port +- Ensure WebSocket support is enabled + +### Authentication Failed + +- Verify your API token is correct +- Check if the token has expired +- Ensure proper authorization format: `Bearer ` + +### No Response + +- Verify the `chat_id` exists and you have access +- Check if the dialog has knowledge bases configured +- Review server logs for errors + +### Connection Drops + +- Implement reconnection logic +- Use heartbeat/ping messages to keep connection alive +- Check network stability + +## Performance Tips + +1. **Reuse connections**: Don't create new WebSocket for each message +2. **Implement backoff**: Wait before reconnecting after errors +3. **Buffer messages**: Queue messages if connection is temporarily down +4. **Clean up**: Always close WebSocket when done +5. **Monitor latency**: Track round-trip times for optimization + +## Security Considerations + +1. **Always use WSS (WebSocket Secure)** in production +2. **Never expose API tokens** in client-side code +3. **Implement rate limiting** on client side +4. **Validate all inputs** before sending +5. **Handle sensitive data** according to your security policies + +## Migration from SSE + +If you're currently using Server-Sent Events (SSE), here's how to migrate: + +**SSE (Old):** +```javascript +const eventSource = new EventSource('/v1/conversation/completion'); +eventSource.onmessage = (event) => { + const data = JSON.parse(event.data); + console.log(data); +}; +``` + +**WebSocket (New):** +```javascript +const ws = new WebSocket('ws://host/v1/ws/chat?token=your-token'); +ws.onmessage = (event) => { + const data = JSON.parse(event.data); + console.log(data); +}; +ws.send(JSON.stringify({ + type: 'chat', + chat_id: 'your-chat-id', + question: 'Your question' +})); +``` + +## Additional Resources + +- [WebSocket API Standard](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) +- [WeChat Mini Program WebSocket](https://developers.weixin.qq.com/miniprogram/dev/api/network/websocket/wx.connectSocket.html) +- [RAGFlow HTTP API Documentation](./http_api_reference.md) + +## Support + +For issues or questions: +- GitHub Issues: https://github.com/infiniflow/ragflow/issues +- Documentation: https://ragflow.io/docs +- Community: Join our Discord/Slack channel + diff --git a/example/websocket/index.html b/example/websocket/index.html new file mode 100644 index 00000000000..6ea1f7150fa --- /dev/null +++ b/example/websocket/index.html @@ -0,0 +1,590 @@ + + + + + + + RAGFlow WebSocket Demo + + + +
+ +
+

πŸš€ RAGFlow WebSocket Demo

+

Real-time streaming chat with RAGFlow

+
+ + +
+

Connection Settings

+
+ + +
+
+ + +
+
+ + +
+ + Disconnected + +
+ + +
+
+
+ πŸ‘† Configure connection settings above and click Connect +
+
+
+ + +
+
+
+ + + + + diff --git a/example/websocket/python_client.py b/example/websocket/python_client.py new file mode 100644 index 00000000000..c5eb733489b --- /dev/null +++ b/example/websocket/python_client.py @@ -0,0 +1,402 @@ +#!/usr/bin/env python3 +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +RAGFlow WebSocket Client Example (Python) + +This example demonstrates how to connect to RAGFlow's WebSocket API +and stream chat responses in real-time. + +Requirements: + pip install websocket-client + +Usage: + python python_client.py --url ws://localhost/v1/ws/chat \ + --token your-api-token \ + --chat-id your-chat-id \ + --question "What is RAGFlow?" +""" + +import argparse +import json +import threading +import websocket + + +class RAGFlowWebSocketClient: + """ + WebSocket client for RAGFlow streaming chat completions. + + This client demonstrates: + - Connection establishment with authentication + - Sending chat requests + - Receiving and displaying streaming responses + - Error handling and reconnection + - Multi-turn conversations + """ + + def __init__(self, url, token, chat_id, debug=False): + """ + Initialize the WebSocket client. + + Args: + url (str): WebSocket URL (e.g., ws://localhost/v1/ws/chat) + token (str): API token for authentication + chat_id (str): Dialog/Chat ID to use + debug (bool): Enable debug output + """ + # Append token to URL for authentication + self.url = f"{url}?token={token}" + self.chat_id = chat_id + self.debug = debug + self.ws = None + self.session_id = None # Track session for multi-turn conversations + self.current_answer = "" # Accumulate streaming chunks + + def on_message(self, ws, message): + """ + Handle incoming WebSocket messages. + + This callback is invoked for each message received from the server. + Messages contain incremental response chunks or completion markers. + + Args: + ws: WebSocket connection object + message (str): JSON message from server + """ + try: + # Parse JSON response + response = json.loads(message) + + if self.debug: + print(f"\n[DEBUG] Received: {json.dumps(response, indent=2)}") + + # Check if this is a completion marker + if response.get('data') is True: + print("\n\nβœ“ Stream completed") + print("-" * 60) + return + + # Check for errors + if response.get('code', 0) != 0: + print(f"\nβœ— Error {response['code']}: {response.get('message', 'Unknown error')}") + return + + # Extract response data + data = response.get('data', {}) + + if isinstance(data, dict): + # Extract answer chunk + answer = data.get('answer', '') + + # Save session ID for multi-turn conversations + if 'session_id' in data and not self.session_id: + self.session_id = data['session_id'] + if self.debug: + print(f"\n[DEBUG] Session ID: {self.session_id}") + + # Display incremental answer + if answer: + print(answer, end='', flush=True) + self.current_answer += answer + + # Display references if available + reference = data.get('reference', {}) + if reference and reference.get('chunks'): + print(f"\n\nπŸ“š References: {len(reference['chunks'])} sources") + if self.debug: + for i, chunk in enumerate(reference['chunks'][:3], 1): + doc_name = chunk.get('doc_name', 'Unknown') + print(f" {i}. {doc_name}") + + except json.JSONDecodeError as e: + print(f"\nβœ— Failed to parse response: {e}") + except Exception as e: + print(f"\nβœ— Error handling message: {e}") + + def on_error(self, ws, error): + """ + Handle WebSocket errors. + + Args: + ws: WebSocket connection object + error: Error object or message + """ + print(f"\nβœ— WebSocket error: {error}") + + def on_close(self, ws, close_status_code, close_msg): + """ + Handle WebSocket connection close. + + Args: + ws: WebSocket connection object + close_status_code (int): Close status code + close_msg (str): Close message + """ + if close_status_code == 1000: + # Normal closure + print("\nβœ“ Connection closed normally") + else: + # Abnormal closure + print(f"\nβœ— Connection closed: {close_status_code} - {close_msg}") + + def on_open(self, ws): + """ + Handle WebSocket connection open. + + This callback is invoked when the connection is established. + It sends the initial chat message to start the conversation. + + Args: + ws: WebSocket connection object + """ + print("βœ“ Connected to RAGFlow") + print("-" * 60) + + def send_message(self, question, session_id=None): + """ + Send a chat message through the WebSocket. + + Args: + question (str): User's question or message + session_id (str, optional): Session ID for continuing a conversation + """ + if not self.ws: + print("βœ— Not connected") + return False + + # Construct chat request message + message = { + 'type': 'chat', + 'chat_id': self.chat_id, + 'question': question, + 'stream': True + } + + # Include session ID if continuing a conversation + if session_id: + message['session_id'] = session_id + + if self.debug: + print(f"\n[DEBUG] Sending: {json.dumps(message, indent=2)}") + + # Reset answer accumulator + self.current_answer = "" + + # Send message + try: + self.ws.send(json.dumps(message)) + print(f"\nπŸ’¬ Question: {question}\n") + print("πŸ€– Answer: ", end='', flush=True) + return True + except Exception as e: + print(f"\nβœ— Failed to send message: {e}") + return False + + def connect(self): + """ + Establish WebSocket connection. + + This creates the WebSocket connection and sets up event handlers. + The connection runs in the main thread (blocking). + """ + # Enable debug traces if requested + if self.debug: + websocket.enableTrace(True) + + # Create WebSocket app with event handlers + self.ws = websocket.WebSocketApp( + self.url, + on_open=self.on_open, + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close + ) + + # Run forever (blocking call) + self.ws.run_forever() + + def close(self): + """Close the WebSocket connection.""" + if self.ws: + self.ws.close() + + +def interactive_mode(client): + """ + Run interactive mode for multi-turn conversations. + + This allows users to have ongoing conversations with the AI + by typing questions and receiving responses in real-time. + + Args: + client (RAGFlowWebSocketClient): WebSocket client instance + """ + print("\n" + "=" * 60) + print("Interactive Mode - Type 'quit' or 'exit' to end") + print("=" * 60) + + def connection_thread(): + """Run WebSocket connection in background thread.""" + client.connect() + + # Start connection in background thread + thread = threading.Thread(target=connection_thread, daemon=True) + thread.start() + + # Wait for connection to establish + import time + time.sleep(2) + + # Interactive loop + try: + while True: + # Get user input + question = input("\n\nπŸ‘€ You: ").strip() + + if not question: + continue + + if question.lower() in ['quit', 'exit', 'q']: + print("\nπŸ‘‹ Goodbye!") + break + + # Send question (continue session if available) + client.send_message(question, session_id=client.session_id) + + # Wait for response to complete + # In production, you'd use proper async/event handling + time.sleep(1) + + except KeyboardInterrupt: + print("\n\nπŸ‘‹ Goodbye!") + + finally: + client.close() + + +def main(): + """ + Main entry point for the WebSocket client example. + + Parses command-line arguments and runs the client in either + single-question or interactive mode. + """ + # Parse command-line arguments + parser = argparse.ArgumentParser( + description='RAGFlow WebSocket Client Example', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Single question + python python_client.py --url ws://localhost/v1/ws/chat \\ + --token your-token \\ + --chat-id your-chat-id \\ + --question "What is RAGFlow?" + + # Interactive mode + python python_client.py --url ws://localhost/v1/ws/chat \\ + --token your-token \\ + --chat-id your-chat-id \\ + --interactive + """ + ) + + parser.add_argument( + '--url', + required=True, + help='WebSocket URL (e.g., ws://localhost/v1/ws/chat)' + ) + + parser.add_argument( + '--token', + required=True, + help='API token for authentication' + ) + + parser.add_argument( + '--chat-id', + required=True, + help='Dialog/Chat ID to use' + ) + + parser.add_argument( + '--question', + help='Question to ask (single question mode)' + ) + + parser.add_argument( + '--session-id', + help='Session ID to continue existing conversation' + ) + + parser.add_argument( + '--interactive', + action='store_true', + help='Enable interactive mode for multi-turn conversations' + ) + + parser.add_argument( + '--debug', + action='store_true', + help='Enable debug output' + ) + + args = parser.parse_args() + + # Validate arguments + if not args.interactive and not args.question: + parser.error("Either --question or --interactive must be specified") + + # Create client + client = RAGFlowWebSocketClient( + url=args.url, + token=args.token, + chat_id=args.chat_id, + debug=args.debug + ) + + print("\n" + "=" * 60) + print("RAGFlow WebSocket Client") + print("=" * 60) + + # Run in appropriate mode + if args.interactive: + # Interactive mode - ongoing conversation + interactive_mode(client) + else: + # Single question mode + def send_after_connect(ws): + """Send question after connection is established.""" + client.on_open(ws) + client.send_message(args.question, session_id=args.session_id) + + # Override on_open to send question + client.on_open = send_after_connect + + # Connect and run (blocking) + try: + client.connect() + except KeyboardInterrupt: + print("\n\nπŸ‘‹ Interrupted") + finally: + client.close() + + +if __name__ == '__main__': + main() + diff --git a/pyproject.toml b/pyproject.toml index dbe407937d5..f3320c8d9a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -170,6 +170,7 @@ test = [ "reportlab>=4.4.1", "requests>=2.32.2", "requests-toolbelt>=1.0.0", + "websockets>=14.0", ] [[tool.uv.index]]