aliyun/alibabacloud-oss-python-sdk-v2

是否支持异步上传

Closed this issue · 2 comments

是否支持异步上传

同求异步版本!

已解决

from __future__ import annotations

import hashlib
from datetime import datetime as DT, timezone as TZ
from typing import Literal

from aiohttp import ClientSession
from alibabacloud_oss_v2.client import Client
from alibabacloud_oss_v2.credentials import StaticCredentialsProvider
from alibabacloud_oss_v2.config import load_default
from alibabacloud_oss_v2.models import PutObjectRequest


SALT = 'xxx'

CLIENTS: dict[tuple[str, str], Client] = dict()
"""复用 Client"""


async def upload_image(
    session: ClientSession,
    image: bytes,
    suffix: Literal['jpeg', 'png'],
    endpoint: str,
    region: str,
    bucket: str,
    access_key_id: str,
    access_key_secret: str,
) -> str:
    """上传图片到阿里 OSS"""

    # 用当前时间戳加盐后的哈希作为文件名,直接上传到根目录
    now = DT.now(TZ.utc)
    # 文件名
    object_key = hashlib.sha256(f'{now.timestamp()}{SALT}'.encode()).hexdigest() + '.' + suffix
    # 访问链接
    url = f'https://{bucket}.{endpoint}/{object_key}'

    # 复用 Client
    if (access_key_id, access_key_secret) in CLIENTS:
        client = CLIENTS[(access_key_id, access_key_secret)]
    else:
        credentials_provider = StaticCredentialsProvider(
            access_key_id=access_key_id, access_key_secret=access_key_secret
        )
        client_config = load_default()
        client_config.credentials_provider = credentials_provider
        client_config.region = region
        client_config.endpoint = endpoint

        client = Client(client_config)
        CLIENTS[(access_key_id, access_key_secret)] = client

    request = PutObjectRequest(bucket=bucket, key=object_key, content_type=f'image/{suffix}')
    presigned = client.presign(request)

    async with session.put(
        presigned.url,  # type: ignore
        headers=presigned.signed_headers,
        data=image,
    ) as resp:
        if resp.status == 200:
            return url
        raise RuntimeError(f'上传失败 status={resp.status}')