from baidubce import exception
from baidubce.services import bos
from baidubce.services.bos.bos_client import BosClient
from baidubce.auth.bce_credentials import BceCredentials
# 百度云账户配置
ACCESS_KEY_ID = "您的AccessKeyID" # 替换为您的AK
SECRET_ACCESS_KEY = "您的SecretAccessKey" # 替换为您的SK
BUCKET_NAME = "您的存储桶名称" # 替换为您的存储桶名称
REGION = "bj" # 存储桶所在区域(如北京: bj, 广州: gz, 苏州: su)
# 创建客户端
credentials = BceCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY)
bos_client = BosClient(credentials, endpoint=f"https://{REGION}.bcebos.com")
def list_bucket_contents():
"""列出存储桶中的所有对象"""
try:
response = bos_client.list_objects(BUCKET_NAME)
print("\n存储桶内容列表:")
for obj in response.contents:
print(f" - {obj.key} (大小: {obj.size}字节, 最后修改: {obj.last_modified}")
return True
except exception.BceHttpClientError as e:
print(f"错误: {e.message}")
return False
def upload_file(local_path, remote_path):
"""上传本地文件到BOS"""
try:
bos_client.put_object_from_file(
bucket_name=BUCKET_NAME,
key=remote_path, # 远程路径
file_name=local_path # 本地路径
)
print(f"\n文件上传成功: {local_path} -> {remote_path}")
return True
except Exception as e:
print(f"上传失败: {str(e)}")
return False
def download_file(remote_path, local_path):
"""从BOS下载文件到本地"""
try:
bos_client.get_object_to_file(
bucket_name=BUCKET_NAME,
key=remote_path, # 远程路径
file_name=local_path # 本地保存路径
)
print(f"\n文件下载成功: {remote_path} -> {local_path}")
return True
except Exception as e:
print(f"下载失败: {str(e)}")
return False
if __name__ == "__main__":
# 测试连接
print("尝试连接百度云BOS...")
if list_bucket_contents():
print("连接成功!")
# 示例操作
upload_file("localfile.txt", "remote_folder/backup.txt")
download_file("remote_folder/backup.txt", "downloaded_file.txt")
from baidubce.services import bos
from baidubce.services.bos.bos_client import BosClient
from baidubce.auth.bce_credentials import BceCredentials
# 百度云账户配置
ACCESS_KEY_ID = "您的AccessKeyID" # 替换为您的AK
SECRET_ACCESS_KEY = "您的SecretAccessKey" # 替换为您的SK
BUCKET_NAME = "您的存储桶名称" # 替换为您的存储桶名称
REGION = "bj" # 存储桶所在区域(如北京: bj, 广州: gz, 苏州: su)
# 创建客户端
credentials = BceCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY)
bos_client = BosClient(credentials, endpoint=f"https://{REGION}.bcebos.com")
def list_bucket_contents():
"""列出存储桶中的所有对象"""
try:
response = bos_client.list_objects(BUCKET_NAME)
print("\n存储桶内容列表:")
for obj in response.contents:
print(f" - {obj.key} (大小: {obj.size}字节, 最后修改: {obj.last_modified}")
return True
except exception.BceHttpClientError as e:
print(f"错误: {e.message}")
return False
def upload_file(local_path, remote_path):
"""上传本地文件到BOS"""
try:
bos_client.put_object_from_file(
bucket_name=BUCKET_NAME,
key=remote_path, # 远程路径
file_name=local_path # 本地路径
)
print(f"\n文件上传成功: {local_path} -> {remote_path}")
return True
except Exception as e:
print(f"上传失败: {str(e)}")
return False
def download_file(remote_path, local_path):
"""从BOS下载文件到本地"""
try:
bos_client.get_object_to_file(
bucket_name=BUCKET_NAME,
key=remote_path, # 远程路径
file_name=local_path # 本地保存路径
)
print(f"\n文件下载成功: {remote_path} -> {local_path}")
return True
except Exception as e:
print(f"下载失败: {str(e)}")
return False
if __name__ == "__main__":
# 测试连接
print("尝试连接百度云BOS...")
if list_bucket_contents():
print("连接成功!")
# 示例操作
upload_file("localfile.txt", "remote_folder/backup.txt")
download_file("remote_folder/backup.txt", "downloaded_file.txt")