Skip to content

Commit 971935b

Browse files
committed
feat: add s3 unit test
1 parent f362d98 commit 971935b

File tree

1 file changed

+83
-0
lines changed

1 file changed

+83
-0
lines changed

test_s3_auth.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import boto3
2+
from botocore.exceptions import ClientError, NoCredentialsError
3+
4+
def test_s3_authentication():
5+
"""测试S3认证"""
6+
7+
# 配置信息
8+
endpoint_url = "https://s3.kclab.cloud"
9+
access_key = "B2sE0fKv1Y1lOpZtge5u"
10+
secret_key = "JRx4MbrMbfUfQjIEm7speT52kQgjt0zafvlAuYxW"
11+
bucket_name = "bucket-78134-shared"
12+
region = "us-east-1"
13+
14+
print("=== S3认证测试 ===")
15+
print(f"端点: {endpoint_url}")
16+
print(f"存储桶: {bucket_name}")
17+
print(f"区域: {region}")
18+
print()
19+
20+
try:
21+
# 创建S3客户端
22+
s3_client = boto3.client(
23+
's3',
24+
endpoint_url=endpoint_url,
25+
aws_access_key_id=access_key,
26+
aws_secret_access_key=secret_key,
27+
region_name=region
28+
)
29+
30+
print("1. 测试列出存储桶...")
31+
response = s3_client.list_buckets()
32+
buckets = [b['Name'] for b in response['Buckets']]
33+
print(f" 可用存储桶: {buckets}")
34+
35+
if bucket_name in buckets:
36+
print(f"\n2. 测试访问存储桶: {bucket_name}")
37+
try:
38+
response = s3_client.list_objects_v2(Bucket=bucket_name, MaxKeys=5)
39+
if 'Contents' in response:
40+
print(f" 存储桶内容数量: {len(response['Contents'])}")
41+
for obj in response['Contents'][:3]:
42+
print(f" - {obj['Key']} ({obj['Size']} bytes)")
43+
else:
44+
print(" 存储桶为空")
45+
except ClientError as e:
46+
print(f" 访问存储桶失败: {e}")
47+
48+
print(f"\n3. 测试上传小文件...")
49+
test_content = b"Hello, S3 Test!"
50+
test_key = "test/connectivity_test.txt"
51+
52+
try:
53+
s3_client.put_object(
54+
Bucket=bucket_name,
55+
Key=test_key,
56+
Body=test_content,
57+
ContentType='text/plain'
58+
)
59+
print(" 上传成功!")
60+
61+
# 测试下载
62+
response = s3_client.get_object(Bucket=bucket_name, Key=test_key)
63+
downloaded_content = response['Body'].read()
64+
print(f" 下载成功: {downloaded_content}")
65+
66+
# 清理测试文件
67+
s3_client.delete_object(Bucket=bucket_name, Key=test_key)
68+
print(" 清理成功!")
69+
70+
except ClientError as e:
71+
print(f" 上传/下载失败: {e}")
72+
73+
except NoCredentialsError:
74+
print("错误: 无法找到AWS凭证")
75+
except ClientError as e:
76+
error_code = e.response['Error']['Code']
77+
error_message = e.response['Error']['Message']
78+
print(f"错误: {error_code} - {error_message}")
79+
except Exception as e:
80+
print(f"未知错误: {e}")
81+
82+
if __name__ == "__main__":
83+
test_s3_authentication()

0 commit comments

Comments
 (0)