下载有内存泄露问题
Closed this issue · 0 comments
wqk151 commented
下载大量小文件,大约几十万个,发现使用内存一直涨,最后内存耗尽,被killed, 使用官网的demo也是一样的,并发设置为1也没有效果,有解决方案吗?
部分代码如下:
class BatchDownloader:
"""批量下载器"""
def __init__(self, client: oss.Client, bucket: str, max_workers: int = 5):
self.client = client
self.bucket = bucket
self.max_workers = max_workers
self.stop_event = threading.Event()
def list_objects(
self,
prefix: str = "",
max_keys: int = 1000,
):
"""列举存储空间中指定前缀的所有对象"""
continuation_token = None
print(f"正在扫描存储空间中的文件...")
paginator = self.client.list_objects_v2_paginator()
# 遍历对象列表的每一页
for page in paginator.iter_page(
oss.ListObjectsV2Request(
bucket=self.bucket,
prefix=prefix,
max_keys=max_keys,
)
):
# 遍历每一页中的对象
for obj in page.contents:
if obj.key.endswith("/") and obj.size == 0:
continue
relative_path = obj.key[len(prefix) :] if prefix else obj.key
yield DownloadTask(obj.key, relative_path, obj.size)
def download_file(self, task: DownloadTask, local_dir: str) -> DownloadResult:
"""下载单个文件"""
result = DownloadResult(task.object_key, size=task.size)
try:
# 计算完整的本地文件路径
full_local_path = os.path.join(local_dir, task.local_path)
# 创建本地文件目录
os.makedirs(os.path.dirname(full_local_path), exist_ok=True)
# 检查文件是否已存在且大小一致(断点续传)
if os.path.exists(full_local_path):
local_size = os.path.getsize(full_local_path)
if local_size == task.size:
result.success = True
return result
# 创建下载请求
get_request = oss.GetObjectRequest(bucket=self.bucket, key=task.object_key)
# 执行下载
response = self.client.get_object(get_request)
# 保存文件
with open(full_local_path, "wb") as f:
with response.body as body_stream:
# 分块读取并写入
for chunk in body_stream.iter_bytes(
block_size=1024 * 1024
): # 1MB块
if self.stop_event.is_set():
raise Exception("下载被中断")
f.write(chunk)
result.success = True
except Exception as e:
result.error = str(e)
# 如果下载失败,删除不完整的文件
try:
if os.path.exists(full_local_path):
os.remove(full_local_path)
except:
pass
return result
def batch_download(self, prefix: str, local_dir: str) -> List[DownloadResult]:
"""执行批量下载"""
results = []
completed = 0
print(f"开始下载 文件,使用 {self.max_workers} 个并发...")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有下载任务
future_to_task = {
executor.submit(self.download_file, task, local_dir): task
for task in self.list_objects(prefix)
}
# 处理完成的任务
for future in as_completed(future_to_task):
if self.stop_event.is_set():
break
task = future_to_task[future]
try:
result = future.result()
# results.append(result)
completed += 1
# 显示进度
except Exception as e:
result = DownloadResult(
task.object_key, error=str(e), size=task.size
)
# results.append(result)
completed += 1
# print(f"✗ [{completed}/{total}] {task.object_key} - 异常: {str(e)}")