-
Notifications
You must be signed in to change notification settings - Fork 229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/direct message api #252
Conversation
WalkthroughThe pull request introduces several updates to the API and conversation modules. A new endpoint in the API allows creating a conversation and posting a message in a single request. The response handling for message posting is modified to return a single message chunk instead of a streaming response. Additionally, a new data type for structured chat message responses is introduced and integrated into the conversation controller, service, and router, with stream handling and error processing adjustments. A new request class for direct messages is also added. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API_Router as "API Router"
participant ConversationService as "Conversation Service"
Client->>API_Router: POST /project/{project_id}/message/ with DirectMessageRequest
API_Router->>API_Router: Validate request (check agent_id, default if needed)
API_Router->>ConversationService: Create conversation (CreateConversationRequest)
ConversationService-->>API_Router: Conversation created
API_Router->>ConversationService: Post message to the conversation
ConversationService-->>API_Router: Return first chunk (ChatMessageResponse)
API_Router->>Client: Return response (ChatMessageResponse)
sequenceDiagram
participant Client
participant ConversationsRouter as "Conversations Router"
participant ConversationController as "Conversation Controller"
participant ConversationService as "Conversation Service"
Client->>ConversationsRouter: Request to post/regenerate message
ConversationsRouter->>ConversationController: Invoke post_message/regenerate_last_message
ConversationController->>ConversationService: Process message and stream response
ConversationService-->>ConversationController: Yield AsyncGenerator[ChatMessageResponse]
ConversationController-->>ConversationsRouter: Yield AsyncGenerator[ChatMessageResponse]
ConversationsRouter->>Client: Stream JSON chunks via get_stream
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Nitpick comments (3)
app/modules/conversations/conversation/conversation_schema.py (1)
50-52
: Consider making citations optional.The
citations
field is currently required. Since not all messages may have citations, consider making it optional with a default empty list.class ChatMessageResponse(BaseModel): message: str - citations: List[str] + citations: List[str] = []app/api/router.py (2)
120-121
: Consider extracting message validation.This validation logic is duplicated from the
post_message
endpoint. Consider extracting it into a reusable function.+def validate_message_content(content: str): + if content == "" or content is None or content.isspace(): + raise HTTPException(status_code=400, detail="Message content cannot be empty") + @router.post("/project/{project_id}/message/") async def create_conversation_and_message( project_id: str, message: DirectMessageRequest, db: Session = Depends(get_db), user=Depends(get_api_key_user), ): - if message.content == "" or message.content is None or message.content.isspace(): - raise HTTPException(status_code=400, detail="Message content cannot be empty") + validate_message_content(message.content)
125-127
: Consider moving default agent_id to configuration.The default agent_id is hardcoded. Consider moving it to a configuration file or environment variable for better maintainability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
app/api/router.py
(2 hunks)app/modules/conversations/conversation/conversation_controller.py
(3 hunks)app/modules/conversations/conversation/conversation_schema.py
(1 hunks)app/modules/conversations/conversation/conversation_service.py
(12 hunks)app/modules/conversations/conversations_router.py
(4 hunks)app/modules/conversations/message/message_schema.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
app/api/router.py
117-117: Do not perform function call Depends
in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
118-118: Do not perform function call Depends
in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
🔇 Additional comments (6)
app/modules/conversations/message/message_schema.py (1)
19-22
: LGTM! Well-structured request class.The
DirectMessageRequest
class follows consistent patterns with existing request classes and properly uses Pydantic's type system.app/api/router.py (1)
109-110
: Document the change in response behavior.The endpoint now returns the first chunk instead of streaming the response. This breaking change should be documented in the API documentation.
app/modules/conversations/conversation/conversation_controller.py (1)
86-86
: LGTM! Consistent return type updates.The return type changes to
ChatMessageResponse
are consistent across both methods and align with the new structured response format.Also applies to: 104-104
app/modules/conversations/conversation/conversation_service.py (3)
507-526
: Clarify TODO comment and message storage functionality.The code has the following issues:
- There is a TODO comment with no clear purpose.
- The commented-out code suggests that message storage functionality might be missing.
Please clarify:
- What is the purpose of the TODO comment?
- Is message storage functionality still needed? If yes, why is it commented out?
621-636
: Clarify message storage functionality.The commented-out code suggests that message storage functionality might be missing.
Please clarify if message storage functionality is still needed. If yes, why is it commented out?
155-158
: LGTM!The agent selection logic is clear, readable, and properly handles both agent ID and status checks.
message_stream = controller.post_message( | ||
conversation_id=res.conversation_id, | ||
message=MessageRequest(content=message.content, node_ids=message.node_ids), | ||
stream=False, | ||
) | ||
|
||
async for chunk in message_stream: | ||
return chunk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for message posting.
The message posting call lacks error handling. Consider wrapping it in a try-except block to handle potential errors.
+ try:
message_stream = controller.post_message(
conversation_id=res.conversation_id,
message=MessageRequest(content=message.content, node_ids=message.node_ids),
stream=False,
)
async for chunk in message_stream:
return chunk
+ except Exception as e:
+ # Clean up the created conversation on failure
+ await controller.delete_conversation(res.conversation_id)
+ raise HTTPException(status_code=500, detail=str(e))
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
message_stream = controller.post_message( | |
conversation_id=res.conversation_id, | |
message=MessageRequest(content=message.content, node_ids=message.node_ids), | |
stream=False, | |
) | |
async for chunk in message_stream: | |
return chunk | |
try: | |
message_stream = controller.post_message( | |
conversation_id=res.conversation_id, | |
message=MessageRequest(content=message.content, node_ids=message.node_ids), | |
stream=False, | |
) | |
async for chunk in message_stream: | |
return chunk | |
except Exception as e: | |
# Clean up the created conversation on failure | |
await controller.delete_conversation(res.conversation_id) | |
raise HTTPException(status_code=500, detail=str(e)) |
async def get_stream(data_stream: AsyncGenerator[Any, None]): | ||
async for chunk in data_stream: | ||
yield json.dumps(chunk.dict()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for dict() method call.
The function assumes that each chunk has a dict()
method. Add error handling to gracefully handle cases where the chunk doesn't support the dict()
method.
Apply this diff to add error handling:
async def get_stream(data_stream: AsyncGenerator[Any, None]):
async for chunk in data_stream:
- yield json.dumps(chunk.dict())
+ try:
+ yield json.dumps(chunk.dict())
+ except (AttributeError, TypeError) as e:
+ logger.error(f"Failed to serialize chunk: {e}")
+ raise ConversationServiceError("Failed to serialize response") from e
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def get_stream(data_stream: AsyncGenerator[Any, None]): | |
async for chunk in data_stream: | |
yield json.dumps(chunk.dict()) | |
async def get_stream(data_stream: AsyncGenerator[Any, None]): | |
async for chunk in data_stream: | |
try: | |
yield json.dumps(chunk.dict()) | |
except (AttributeError, TypeError) as e: | |
logger.error(f"Failed to serialize chunk: {e}") | |
raise ConversationServiceError("Failed to serialize response") from e |
return StreamingResponse( | ||
get_stream(message_stream), media_type="text/event-stream" | ||
) | ||
else: | ||
# Collect all chunks into a complete response | ||
full_response = "" | ||
async for chunk in message_stream: | ||
full_response += chunk | ||
return {"content": full_response} | ||
return chunk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix non-streaming response handling.
Similar to post_message
, the non-streaming case only returns the first chunk from the stream, which might miss important data.
Apply this diff to handle the non-streaming case properly:
- async for chunk in message_stream:
- return chunk
+ # For non-streaming case, collect all chunks into a single response
+ chunks = []
+ async for chunk in message_stream:
+ chunks.append(chunk)
+ return chunks[-1] if chunks else None # Return the last chunk or None if empty
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
return StreamingResponse( | |
get_stream(message_stream), media_type="text/event-stream" | |
) | |
else: | |
# Collect all chunks into a complete response | |
full_response = "" | |
async for chunk in message_stream: | |
full_response += chunk | |
return {"content": full_response} | |
return chunk | |
return StreamingResponse( | |
get_stream(message_stream), media_type="text/event-stream" | |
) | |
else: | |
# For non-streaming case, collect all chunks into a single response | |
chunks = [] | |
async for chunk in message_stream: | |
chunks.append(chunk) | |
return chunks[-1] if chunks else None # Return the last chunk or None if empty |
return StreamingResponse( | ||
get_stream(message_stream), media_type="text/event-stream" | ||
) | ||
else: | ||
# Collect all chunks into a complete response | ||
full_response = "" | ||
# TODO: fix this, add types. In below stream we have only one output. | ||
async for chunk in message_stream: | ||
full_response += chunk | ||
return {"content": full_response} | ||
return chunk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix non-streaming response handling.
The non-streaming case only returns the first chunk from the stream, which might miss important data. Also, the TODO comment indicates that types need to be fixed.
Apply this diff to handle the non-streaming case properly:
- # TODO: fix this, add types. In below stream we have only one output.
- async for chunk in message_stream:
- return chunk
+ # For non-streaming case, collect all chunks into a single response
+ chunks = []
+ async for chunk in message_stream:
+ chunks.append(chunk)
+ return chunks[-1] if chunks else None # Return the last chunk or None if empty
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
return StreamingResponse( | |
get_stream(message_stream), media_type="text/event-stream" | |
) | |
else: | |
# Collect all chunks into a complete response | |
full_response = "" | |
# TODO: fix this, add types. In below stream we have only one output. | |
async for chunk in message_stream: | |
full_response += chunk | |
return {"content": full_response} | |
return chunk | |
return StreamingResponse( | |
get_stream(message_stream), media_type="text/event-stream" | |
) | |
else: | |
# For non-streaming case, collect all chunks into a single response | |
chunks = [] | |
async for chunk in message_stream: | |
chunks.append(chunk) | |
return chunks[-1] if chunks else None # Return the last chunk or None if empty |
def parse_str_to_message(self, chunk: str) -> ChatMessageResponse: | ||
try: | ||
data = json.loads(chunk) | ||
except json.JSONDecodeError as e: | ||
logger.error(f"Failed to parse chunk as JSON: {e}") | ||
raise ConversationServiceError("Failed to parse AI response") from e | ||
|
||
# Extract the 'message' and 'citations' | ||
message: str = data.get("message", "") | ||
citations: List[str] = data.get("citations", []) | ||
|
||
return ChatMessageResponse(message=message, citations=citations) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add type validation for extracted fields.
The function doesn't validate the types of the extracted 'message' and 'citations' fields, which could lead to runtime errors.
Apply this diff to add type validation:
def parse_str_to_message(self, chunk: str) -> ChatMessageResponse:
try:
data = json.loads(chunk)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse chunk as JSON: {e}")
raise ConversationServiceError("Failed to parse AI response") from e
- # Extract the 'message' and 'citations'
- message: str = data.get("message", "")
- citations: List[str] = data.get("citations", [])
+ # Extract and validate 'message' and 'citations'
+ message = data.get("message", "")
+ citations = data.get("citations", [])
+
+ if not isinstance(message, str):
+ raise ConversationServiceError("Message must be a string")
+ if not isinstance(citations, list) or not all(isinstance(c, str) for c in citations):
+ raise ConversationServiceError("Citations must be a list of strings")
return ChatMessageResponse(message=message, citations=citations)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def parse_str_to_message(self, chunk: str) -> ChatMessageResponse: | |
try: | |
data = json.loads(chunk) | |
except json.JSONDecodeError as e: | |
logger.error(f"Failed to parse chunk as JSON: {e}") | |
raise ConversationServiceError("Failed to parse AI response") from e | |
# Extract the 'message' and 'citations' | |
message: str = data.get("message", "") | |
citations: List[str] = data.get("citations", []) | |
return ChatMessageResponse(message=message, citations=citations) | |
def parse_str_to_message(self, chunk: str) -> ChatMessageResponse: | |
try: | |
data = json.loads(chunk) | |
except json.JSONDecodeError as e: | |
logger.error(f"Failed to parse chunk as JSON: {e}") | |
raise ConversationServiceError("Failed to parse AI response") from e | |
# Extract and validate 'message' and 'citations' | |
message = data.get("message", "") | |
citations = data.get("citations", []) | |
if not isinstance(message, str): | |
raise ConversationServiceError("Message must be a string") | |
if not isinstance(citations, list) or not all(isinstance(c, str) for c in citations): | |
raise ConversationServiceError("Citations must be a list of strings") | |
return ChatMessageResponse(message=message, citations=citations) |
yield self.parse_str_to_message(response) | ||
else: | ||
# For other agents that support streaming | ||
async for chunk in supervisor.process_query( | ||
query, project_id, conversation.id, user_id, node_ids, agent_id | ||
): | ||
yield chunk | ||
yield self.parse_str_to_message(chunk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for parse_str_to_message calls.
The function doesn't handle potential errors from parse_str_to_message
, which could lead to unhandled exceptions.
Apply this diff to add error handling:
if isinstance(agent, CustomAgentsService):
# Custom agent doesn't support streaming, so we'll yield the entire response at once
response = await agent.run(
agent_id, query, project_id, user_id, conversation.id, node_ids
)
- yield self.parse_str_to_message(response)
+ try:
+ yield self.parse_str_to_message(response)
+ except ConversationServiceError as e:
+ logger.error(f"Failed to parse custom agent response: {e}")
+ raise
else:
# For other agents that support streaming
async for chunk in supervisor.process_query(
query, project_id, conversation.id, user_id, node_ids, agent_id
):
- yield self.parse_str_to_message(chunk)
+ try:
+ yield self.parse_str_to_message(chunk)
+ except ConversationServiceError as e:
+ logger.error(f"Failed to parse agent response chunk: {e}")
+ raise
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
yield self.parse_str_to_message(response) | |
else: | |
# For other agents that support streaming | |
async for chunk in supervisor.process_query( | |
query, project_id, conversation.id, user_id, node_ids, agent_id | |
): | |
yield chunk | |
yield self.parse_str_to_message(chunk) | |
try: | |
yield self.parse_str_to_message(response) | |
except ConversationServiceError as e: | |
logger.error(f"Failed to parse custom agent response: {e}") | |
raise | |
else: | |
# For other agents that support streaming | |
async for chunk in supervisor.process_query( | |
query, project_id, conversation.id, user_id, node_ids, agent_id | |
): | |
try: | |
yield self.parse_str_to_message(chunk) | |
except ConversationServiceError as e: | |
logger.error(f"Failed to parse agent response chunk: {e}") | |
raise |
Summary by CodeRabbit
New Features
Refactor