是否支持异步上传
Closed this issue · 2 comments
ctykk commented
是否支持异步上传
zhaoyang0618 commented
同求异步版本!
ctykk commented
已解决
from __future__ import annotations
import hashlib
from datetime import datetime as DT, timezone as TZ
from typing import Literal
from aiohttp import ClientSession
from alibabacloud_oss_v2.client import Client
from alibabacloud_oss_v2.credentials import StaticCredentialsProvider
from alibabacloud_oss_v2.config import load_default
from alibabacloud_oss_v2.models import PutObjectRequest
SALT = 'xxx'
CLIENTS: dict[tuple[str, str], Client] = dict()
"""复用 Client"""
async def upload_image(
session: ClientSession,
image: bytes,
suffix: Literal['jpeg', 'png'],
endpoint: str,
region: str,
bucket: str,
access_key_id: str,
access_key_secret: str,
) -> str:
"""上传图片到阿里 OSS"""
# 用当前时间戳加盐后的哈希作为文件名,直接上传到根目录
now = DT.now(TZ.utc)
# 文件名
object_key = hashlib.sha256(f'{now.timestamp()}{SALT}'.encode()).hexdigest() + '.' + suffix
# 访问链接
url = f'https://{bucket}.{endpoint}/{object_key}'
# 复用 Client
if (access_key_id, access_key_secret) in CLIENTS:
client = CLIENTS[(access_key_id, access_key_secret)]
else:
credentials_provider = StaticCredentialsProvider(
access_key_id=access_key_id, access_key_secret=access_key_secret
)
client_config = load_default()
client_config.credentials_provider = credentials_provider
client_config.region = region
client_config.endpoint = endpoint
client = Client(client_config)
CLIENTS[(access_key_id, access_key_secret)] = client
request = PutObjectRequest(bucket=bucket, key=object_key, content_type=f'image/{suffix}')
presigned = client.presign(request)
async with session.put(
presigned.url, # type: ignore
headers=presigned.signed_headers,
data=image,
) as resp:
if resp.status == 200:
return url
raise RuntimeError(f'上传失败 status={resp.status}')