Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 254 additions & 27 deletions core/http/endpoints/anthropic/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/mudler/LocalAI/core/http/middleware"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/core/templates"
"github.com/mudler/LocalAI/pkg/functions"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/xlog"
)
Expand Down Expand Up @@ -44,6 +45,9 @@ func MessagesEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evalu
// Convert Anthropic messages to OpenAI format for internal processing
openAIMessages := convertAnthropicToOpenAIMessages(input)

// Convert Anthropic tools to internal Functions format
funcs, shouldUseFn := convertAnthropicTools(input, cfg)

// Create an OpenAI-compatible request for internal processing
openAIReq := &schema.OpenAIRequest{
PredictionOptions: schema.PredictionOptions{
Expand Down Expand Up @@ -79,19 +83,19 @@ func MessagesEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evalu
cfg.StopWords = append(cfg.StopWords, input.StopSequences...)
}

// Template the prompt
predInput := evaluator.TemplateMessages(*openAIReq, openAIReq.Messages, cfg, nil, false)
// Template the prompt with tools if available
predInput := evaluator.TemplateMessages(*openAIReq, openAIReq.Messages, cfg, funcs, shouldUseFn)
xlog.Debug("Anthropic Messages - Prompt (after templating)", "prompt", predInput)

if input.Stream {
return handleAnthropicStream(c, id, input, cfg, ml, predInput)
return handleAnthropicStream(c, id, input, cfg, ml, predInput, openAIReq, funcs, shouldUseFn)
}

return handleAnthropicNonStream(c, id, input, cfg, ml, predInput, openAIReq)
return handleAnthropicNonStream(c, id, input, cfg, ml, predInput, openAIReq, funcs, shouldUseFn)
}
}

func handleAnthropicNonStream(c echo.Context, id string, input *schema.AnthropicRequest, cfg *config.ModelConfig, ml *model.ModelLoader, predInput string, openAIReq *schema.OpenAIRequest) error {
func handleAnthropicNonStream(c echo.Context, id string, input *schema.AnthropicRequest, cfg *config.ModelConfig, ml *model.ModelLoader, predInput string, openAIReq *schema.OpenAIRequest, funcs functions.Functions, shouldUseFn bool) error {
images := []string{}
for _, m := range openAIReq.Messages {
images = append(images, m.StringImages...)
Expand All @@ -111,17 +115,53 @@ func handleAnthropicNonStream(c echo.Context, id string, input *schema.Anthropic
}

result := backend.Finetune(*cfg, predInput, prediction.Response)
stopReason := "end_turn"

// Check if the result contains tool calls
toolCalls := functions.ParseFunctionCall(result, cfg.FunctionsConfig)

var contentBlocks []schema.AnthropicContentBlock
var stopReason string

if shouldUseFn && len(toolCalls) > 0 {
// Model wants to use tools
stopReason = "tool_use"
for _, tc := range toolCalls {
// Parse arguments as JSON
var inputArgs map[string]interface{}
if err := json.Unmarshal([]byte(tc.Arguments), &inputArgs); err != nil {
xlog.Warn("Failed to parse tool call arguments as JSON", "error", err, "args", tc.Arguments)
inputArgs = map[string]interface{}{"raw": tc.Arguments}
}

contentBlocks = append(contentBlocks, schema.AnthropicContentBlock{
Type: "tool_use",
ID: fmt.Sprintf("toolu_%s_%d", id, len(contentBlocks)),
Name: tc.Name,
Input: inputArgs,
})
}

// Add any text content before the tool calls
textContent := functions.ParseTextContent(result, cfg.FunctionsConfig)
if textContent != "" {
// Prepend text block
contentBlocks = append([]schema.AnthropicContentBlock{{Type: "text", Text: textContent}}, contentBlocks...)
}
} else {
// Normal text response
stopReason = "end_turn"
contentBlocks = []schema.AnthropicContentBlock{
{Type: "text", Text: result},
}
}

resp := &schema.AnthropicResponse{
ID: fmt.Sprintf("msg_%s", id),
Type: "message",
Role: "assistant",
Model: input.Model,
StopReason: &stopReason,
Content: []schema.AnthropicContentBlock{
{Type: "text", Text: result},
},
Content: contentBlocks,
Usage: schema.AnthropicUsage{
InputTokens: prediction.Usage.Prompt,
OutputTokens: prediction.Usage.Completion,
Expand All @@ -135,13 +175,13 @@ func handleAnthropicNonStream(c echo.Context, id string, input *schema.Anthropic
return c.JSON(200, resp)
}

func handleAnthropicStream(c echo.Context, id string, input *schema.AnthropicRequest, cfg *config.ModelConfig, ml *model.ModelLoader, predInput string) error {
func handleAnthropicStream(c echo.Context, id string, input *schema.AnthropicRequest, cfg *config.ModelConfig, ml *model.ModelLoader, predInput string, openAIReq *schema.OpenAIRequest, funcs functions.Functions, shouldUseFn bool) error {
c.Response().Header().Set("Content-Type", "text/event-stream")
c.Response().Header().Set("Cache-Control", "no-cache")
c.Response().Header().Set("Connection", "keep-alive")

// Create OpenAI messages for inference
openAIMessages := convertAnthropicToOpenAIMessages(input)
openAIMessages := openAIReq.Messages

images := []string{}
for _, m := range openAIMessages {
Expand All @@ -162,25 +202,93 @@ func handleAnthropicStream(c echo.Context, id string, input *schema.AnthropicReq
}
sendAnthropicSSE(c, messageStart)

// Send content_block_start event
// Track accumulated content for tool call detection
accumulatedContent := ""
currentBlockIndex := 0
inToolCall := false
toolCallsEmitted := 0

// Send initial content_block_start event
contentBlockStart := schema.AnthropicStreamEvent{
Type: "content_block_start",
Index: 0,
Index: currentBlockIndex,
ContentBlock: &schema.AnthropicContentBlock{Type: "text", Text: ""},
}
sendAnthropicSSE(c, contentBlockStart)

// Stream content deltas
tokenCallback := func(token string, usage backend.TokenUsage) bool {
delta := schema.AnthropicStreamEvent{
Type: "content_block_delta",
Index: 0,
Delta: &schema.AnthropicStreamDelta{
Type: "text_delta",
Text: token,
},
accumulatedContent += token

// If we're using functions, try to detect tool calls incrementally
if shouldUseFn {
cleanedResult := functions.CleanupLLMResult(accumulatedContent, cfg.FunctionsConfig)

// Try parsing for tool calls
toolCalls := functions.ParseFunctionCall(cleanedResult, cfg.FunctionsConfig)

// If we detected new tool calls and haven't emitted them yet
if len(toolCalls) > toolCallsEmitted {
// Stop the current text block if we were in one
if !inToolCall && currentBlockIndex == 0 {
sendAnthropicSSE(c, schema.AnthropicStreamEvent{
Type: "content_block_stop",
Index: currentBlockIndex,
})
currentBlockIndex++
inToolCall = true
}

// Emit new tool calls
for i := toolCallsEmitted; i < len(toolCalls); i++ {
tc := toolCalls[i]

// Send content_block_start for tool_use
sendAnthropicSSE(c, schema.AnthropicStreamEvent{
Type: "content_block_start",
Index: currentBlockIndex,
ContentBlock: &schema.AnthropicContentBlock{
Type: "tool_use",
ID: fmt.Sprintf("toolu_%s_%d", id, i),
Name: tc.Name,
},
})

// Send input_json_delta with the arguments
sendAnthropicSSE(c, schema.AnthropicStreamEvent{
Type: "content_block_delta",
Index: currentBlockIndex,
Delta: &schema.AnthropicStreamDelta{
Type: "input_json_delta",
PartialJSON: tc.Arguments,
},
})

// Send content_block_stop
sendAnthropicSSE(c, schema.AnthropicStreamEvent{
Type: "content_block_stop",
Index: currentBlockIndex,
})

currentBlockIndex++
}
toolCallsEmitted = len(toolCalls)
return true
}
}

// Send regular text delta if not in tool call mode
if !inToolCall {
delta := schema.AnthropicStreamEvent{
Type: "content_block_delta",
Index: 0,
Delta: &schema.AnthropicStreamDelta{
Type: "text_delta",
Text: token,
},
}
sendAnthropicSSE(c, delta)
}
sendAnthropicSSE(c, delta)
return true
}

Expand All @@ -197,15 +305,22 @@ func handleAnthropicStream(c echo.Context, id string, input *schema.AnthropicReq
return sendAnthropicError(c, 500, "api_error", fmt.Sprintf("prediction failed: %v", err))
}

// Send content_block_stop event
contentBlockStop := schema.AnthropicStreamEvent{
Type: "content_block_stop",
Index: 0,
// Send content_block_stop event for last block if we didn't close it yet
if !inToolCall {
contentBlockStop := schema.AnthropicStreamEvent{
Type: "content_block_stop",
Index: 0,
}
sendAnthropicSSE(c, contentBlockStop)
}
sendAnthropicSSE(c, contentBlockStop)

// Send message_delta event with stop_reason
// Determine stop reason
stopReason := "end_turn"
if toolCallsEmitted > 0 {
stopReason = "tool_use"
}

// Send message_delta event with stop_reason
messageDelta := schema.AnthropicStreamEvent{
Type: "message_delta",
Delta: &schema.AnthropicStreamDelta{
Expand Down Expand Up @@ -274,6 +389,8 @@ func convertAnthropicToOpenAIMessages(input *schema.AnthropicRequest) []schema.M
// Handle array of content blocks
var textContent string
var stringImages []string
var toolCalls []schema.ToolCall
toolCallIndex := 0

for _, block := range content {
if blockMap, ok := block.(map[string]interface{}); ok {
Expand All @@ -295,16 +412,126 @@ func convertAnthropicToOpenAIMessages(input *schema.AnthropicRequest) []schema.M
}
}
}
case "tool_use":
// Convert tool_use to ToolCall format
toolID, _ := blockMap["id"].(string)
toolName, _ := blockMap["name"].(string)
toolInput := blockMap["input"]

// Serialize input to JSON string
inputJSON, err := json.Marshal(toolInput)
if err != nil {
xlog.Warn("Failed to marshal tool input", "error", err)
inputJSON = []byte("{}")
}

toolCalls = append(toolCalls, schema.ToolCall{
Index: toolCallIndex,
ID: toolID,
Type: "function",
FunctionCall: schema.FunctionCall{
Name: toolName,
Arguments: string(inputJSON),
},
})
toolCallIndex++
case "tool_result":
// Convert tool_result to a message with role "tool"
// This is handled by creating a separate message after this block
// For now, we'll add it as text content
toolUseID, _ := blockMap["tool_use_id"].(string)
isError := false
if isErrorPtr, ok := blockMap["is_error"].(*bool); ok && isErrorPtr != nil {
isError = *isErrorPtr
}

var resultText string
if resultContent, ok := blockMap["content"]; ok {
switch rc := resultContent.(type) {
case string:
resultText = rc
case []interface{}:
// Array of content blocks
for _, cb := range rc {
if cbMap, ok := cb.(map[string]interface{}); ok {
if cbMap["type"] == "text" {
if text, ok := cbMap["text"].(string); ok {
resultText += text
}
}
}
}
}
}

// Add tool result as a tool role message
// We need to handle this differently - create a new message
if msg.Role == "user" {
// Store tool result info for creating separate message
prefix := ""
if isError {
prefix = "Error: "
}
textContent += fmt.Sprintf("\n[Tool Result for %s]: %s%s", toolUseID, prefix, resultText)
}
}
}
}
openAIMsg.StringContent = textContent
openAIMsg.Content = textContent
openAIMsg.StringImages = stringImages

// Add tool calls if present
if len(toolCalls) > 0 {
openAIMsg.ToolCalls = toolCalls
}
}

messages = append(messages, openAIMsg)
}

return messages
}

// convertAnthropicTools converts Anthropic tools to internal Functions format
func convertAnthropicTools(input *schema.AnthropicRequest, cfg *config.ModelConfig) (functions.Functions, bool) {
if len(input.Tools) == 0 {
return nil, false
}

var funcs functions.Functions
for _, tool := range input.Tools {
f := functions.Function{
Name: tool.Name,
Description: tool.Description,
Parameters: tool.InputSchema,
}
funcs = append(funcs, f)
}

// Handle tool_choice
if input.ToolChoice != nil {
switch tc := input.ToolChoice.(type) {
case string:
// "auto", "any", or "none"
if tc == "any" {
// Force the model to use one of the tools
cfg.SetFunctionCallString("required")
} else if tc == "none" {
// Don't use tools
return nil, false
}
// "auto" is the default - let model decide
case map[string]interface{}:
// Specific tool selection: {"type": "tool", "name": "tool_name"}
if tcType, ok := tc["type"].(string); ok && tcType == "tool" {
if name, ok := tc["name"].(string); ok {
// Force specific tool
cfg.SetFunctionCallString(name)
}
}
}
}

return funcs, len(funcs) > 0 && cfg.ShouldUseFunctions()
}
Loading
Loading