3
3
import os
4
4
import base64
5
5
import httpx
6
- from tenacity import retry , stop_after_attempt , wait_exponential , retry_if_exception_type
6
+ from tenacity import retry , stop_after_attempt , wait_exponential , \
7
+ retry_if_exception_type
7
8
8
9
MAX_RETRIES = 5
9
10
11
+
10
12
class AnthropicRateLimitError (Exception ):
11
13
"""Exception raised for rate limit errors."""
12
14
def __init__ (self , message , retry_after ):
13
15
self .message = message
14
16
self .retry_after = retry_after
15
17
super ().__init__ (self .message )
16
18
19
+
20
+ class AnthropicOverloadError (Exception ):
21
+ """Exception raised for overloaded errors."""
22
+ def __init__ (self , message ):
23
+ self .message = message
24
+ super ().__init__ (self .message )
25
+
26
+
17
27
class AnthropicClient :
18
28
def __init__ (self , verbose = False ):
19
29
"""Initialize the Anthropic client with the API key."""
@@ -23,16 +33,28 @@ def __init__(self, verbose=False):
23
33
@retry (
24
34
stop = stop_after_attempt (MAX_RETRIES ),
25
35
wait = wait_exponential (multiplier = 1 , min = 4 , max = 10 ),
26
- retry = retry_if_exception_type (AnthropicRateLimitError )
36
+ retry = (retry_if_exception_type (AnthropicRateLimitError ) |
37
+ retry_if_exception_type (AnthropicOverloadError ))
27
38
)
28
39
def _make_api_call (self , api_args ):
29
40
"""Make an API call with retry mechanism."""
30
41
try :
31
42
return self .client .messages .create (** api_args )
32
43
except httpx .HTTPStatusError as e :
33
44
if e .response .status_code == 429 :
34
- retry_after = int (e .response .headers .get ('retry-after' , 60 ))
35
- raise AnthropicRateLimitError (f"Rate limit exceeded. { str (e )} " , retry_after )
45
+ retry_after = int (e .response .headers .get (
46
+ 'retry-after' , 60 ))
47
+ raise AnthropicRateLimitError (
48
+ f"Rate limit exceeded. { str (e )} " , retry_after )
49
+ elif e .response .status_code == 529 :
50
+ raise AnthropicOverloadError (
51
+ f"Anthropic API overloaded: { str (e )} " )
52
+ raise
53
+ except anthropic .APIStatusError as e :
54
+ error_data = e .args [0 ]
55
+ if error_data ['error' ]['type' ] == 'overloaded_error' :
56
+ raise AnthropicOverloadError (
57
+ f"Anthropic API overloaded: { error_data ['error' ]['message' ]} " )
36
58
raise
37
59
38
60
def stream_completion (self , messages , model , ** kwargs ):
@@ -46,46 +68,41 @@ def stream_completion(self, messages, model, **kwargs):
46
68
Yields:
47
69
str: Text generated by the Anthropic API.
48
70
"""
49
- # Extract system message if present, otherwise set to None
50
- system_messages = [ message [ 'content' ] for message in messages if message ['role' ] == 'system' ]
71
+ system_messages = [ msg [ 'content' ] for msg in messages
72
+ if msg ['role' ] == 'system' ]
51
73
system_message = system_messages [0 ] if system_messages else None
52
-
53
- # Filter out system messages from the messages list
54
- messages = [message for message in messages if message ['role' ] != 'system' ]
55
74
56
- # Prepare the arguments for the Anthropic API call
75
+ messages = [msg for msg in messages
76
+ if msg ['role' ] != 'system' ]
77
+
57
78
api_args = {
58
79
"model" : model ,
59
- "max_tokens" : kwargs .get ('max_tokens' , 1000 ), # Default to 1000 if not provided
80
+ "max_tokens" : kwargs .get ('max_tokens' , 1000 ),
60
81
"stream" : True ,
61
82
** kwargs
62
83
}
63
-
64
- # Only include the system parameter if a system message is present
84
+
65
85
if system_message :
66
86
api_args ["system" ] = system_message
67
87
68
88
processed_messages = []
69
89
for message in messages :
70
90
if 'image' in message :
71
- processed_content = [
72
- {
73
- "type" : "image" ,
74
- "source" : {
75
- "type" : "base64" ,
76
- "media_type" : "image/jpeg" ,
77
- "data" : message ['image' ].replace ('\n ' , '' ) # Remove newlines
78
- }
91
+ processed_content = [{
92
+ "type" : "image" ,
93
+ "source" : {
94
+ "type" : "base64" ,
95
+ "media_type" : "image/jpeg" ,
96
+ "data" : message ['image' ].replace ('\n ' , '' )
79
97
}
80
- ]
81
-
82
- # Add original text content if present
98
+ }]
99
+
83
100
if 'content' in message and message ['content' ]:
84
101
processed_content .append ({
85
102
"type" : "text" ,
86
103
"text" : message ['content' ]
87
104
})
88
-
105
+
89
106
processed_messages .append ({
90
107
"role" : message ['role' ],
91
108
"content" : processed_content
@@ -97,7 +114,8 @@ def stream_completion(self, messages, model, **kwargs):
97
114
})
98
115
99
116
if not processed_messages :
100
- raise ValueError (f"No messages to send to the API. Original messages: { messages } " )
117
+ raise ValueError (
118
+ f"No messages to send. Original messages: { messages } " )
101
119
102
120
api_args ["messages" ] = processed_messages
103
121
@@ -110,18 +128,24 @@ def stream_completion(self, messages, model, **kwargs):
110
128
if self .verbose :
111
129
print (f"Rate limit error: { e .message } . Retry after { e .retry_after } seconds." )
112
130
raise
131
+ except AnthropicOverloadError as e :
132
+ if self .verbose :
133
+ print (f"Overload error: { e .message } " )
134
+ raise
113
135
except Exception as e :
114
136
if self .verbose :
115
137
import traceback
116
138
traceback .print_exc ()
117
139
print (f"An error occurred streaming completion from Anthropic API: { e } " )
118
- raise RuntimeError (f"An error occurred streaming completion from Anthropic API: { e } " )
140
+ raise RuntimeError (
141
+ f"An error occurred streaming completion from Anthropic API: { e } " )
142
+
119
143
120
144
# Test the AnthropicClient
121
145
if __name__ == "__main__" :
122
146
client = AnthropicClient (verbose = True )
123
-
124
- #test text only
147
+
148
+ # test text only
125
149
messages = [
126
150
{
127
151
"role" : "system" ,
@@ -138,18 +162,20 @@ def stream_completion(self, messages, model, **kwargs):
138
162
try :
139
163
for chunk in client .stream_completion (messages , model ):
140
164
print (chunk , end = '' , flush = True )
141
- print () # Add a newline at the end
165
+ print ()
142
166
except AnthropicRateLimitError as e :
143
- print (f"\n Rate limit error encountered: { e .message } . Retry after { e .retry_after } seconds." )
167
+ print (f"\n Rate limit error: { e .message } . Retry after { e .retry_after } seconds." )
168
+ except AnthropicOverloadError as e :
169
+ print (f"\n Overload error: { e .message } " )
144
170
except Exception as e :
145
171
print (f"\n An error occurred: { e } " )
146
172
147
- #test multimodal
173
+ # test multimodal
148
174
image_url = "https://upload.wikimedia.org/wikipedia/commons/a/a7/Camponotus_flavomarginatus_ant.jpg"
149
175
image_media_type = "image/jpeg"
150
176
image_data = base64 .b64encode (httpx .get (image_url ).content ).decode ("utf-8" )
151
-
152
- messages = [
177
+
178
+ messages = [
153
179
{
154
180
"role" : "system" ,
155
181
"content" : "Respond only in rhyming couplets."
@@ -172,13 +198,15 @@ def stream_completion(self, messages, model, **kwargs):
172
198
],
173
199
}
174
200
]
175
-
201
+
176
202
print ("\n Multimodal Response:" )
177
203
try :
178
204
for chunk in client .stream_completion (messages , model ):
179
205
print (chunk , end = '' , flush = True )
180
206
print ()
181
207
except AnthropicRateLimitError as e :
182
- print (f"\n Rate limit error encountered: { e .message } . Retry after { e .retry_after } seconds." )
208
+ print (f"\n Rate limit error: { e .message } . Retry after { e .retry_after } seconds." )
209
+ except AnthropicOverloadError as e :
210
+ print (f"\n Overload error: { e .message } " )
183
211
except Exception as e :
184
212
print (f"\n An error occurred: { e } " )
0 commit comments