一个使用aiohttp作为HTTP服务器、asyncio处理异步任务监控、multiprocessing.Process处理CPU密集型工作的多进程服务。每个工作进程启动自己的HTTP服务器,用于接收主进程的控制请求。
- 接收HTTP请求并启动长时间运行的任务
- 每个任务在单独的进程中运行
- 每个工作进程启动自己的HTTP服务器,用于接收控制命令
- 使用asyncio处理主服务器的异步逻辑
- 使用Python multiprocessing进行进程间通信
- 支持查询任务状态、列出所有任务和终止任务
- 支持暂停和恢复任务执行
- 定期(每10秒)在日志中打印所有任务及其状态
- 支持多种任务类型,包括CPU密集型任务
- 使用aiohttp作为异步Web服务器
- 每个工作进程有自己的aiohttp服务器实例,在随机可用端口上运行
- 通过HTTP接口实现进程间通信,提供更丰富的交互功能
- 使用asyncio处理主服务器的事件循环和异步任务
- 使用multiprocessing.Process创建独立工作进程执行CPU密集型任务
- 通过multiprocessing.Queue进行基本进程间通信
- 异步任务状态监控实时更新任务状态
- 支持通过HTTP接口优雅地终止任务
- 优雅的应用关闭,确保所有资源被正确释放
该应用结合了asyncio和multiprocessing的优势:
- 主服务进程(aiohttp):使用asyncio处理所有HTTP请求和内部事件循环
- 异步监控:使用asyncio协程监控所有工作进程的状态
- 工作进程(multiprocessing):使用multiprocessing.Process在独立进程中执行CPU密集型任务
- 工作进程HTTP服务器:每个工作进程启动自己的aiohttp服务器,用于接收控制命令
- 双重进程间通信:
- 通过multiprocessing.Queue进行快速状态更新
- 通过HTTP接口进行丰富的交互控制
# 创建虚拟环境
python3 -m venv venv
# 激活虚拟环境
source venv/bin/activate
# 安装依赖
pip install -r requirements.txt# 从虚拟环境中运行
python run.py应用将在 http://localhost:8000 上启动。
POST /tasks- 创建新任务GET /tasks- 列出所有任务GET /tasks/{task_id}- 获取特定任务的状态GET /tasks/{task_id}?detail=true- 获取任务的详细状态(从工作进程HTTP服务器获取)POST /tasks/{task_id}/control- 控制任务(暂停/恢复等)DELETE /tasks/{task_id}- 立即终止任务(强制)DELETE /tasks/{task_id}?use_http=true- 优雅地终止任务(通过HTTP请求)
process_data- 数据处理任务(IO密集型)generate_report- 报告生成任务(IO密集型)cpu_intensive- CPU密集型计算任务
每个工作进程启动的HTTP服务器提供以下端点:
GET /status- 获取工作进程的详细状态POST /control- 接收控制命令(如暂停/恢复)POST /terminate- 接收终止请求
创建数据处理任务:
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"task_type": "process_data", "task_data": {"items": [1, 2, 3, 4, 5]}}'创建报告生成任务:
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"task_type": "generate_report", "task_data": {"report_type": "detailed"}}'创建CPU密集型任务:
curl -X POST http://localhost:8000/tasks \
-H "Content-Type: application/json" \
-d '{"task_type": "cpu_intensive", "task_data": {"complexity": 2000000, "batches": 5}}'暂停一个正在运行的任务:
curl -X POST http://localhost:8000/tasks/{task_id}/control \
-H "Content-Type: application/json" \
-d '{"action": "pause"}'恢复一个已暂停的任务:
curl -X POST http://localhost:8000/tasks/{task_id}/control \
-H "Content-Type: application/json" \
-d '{"action": "resume"}'优雅地终止任务:
curl -X DELETE "http://localhost:8000/tasks/{task_id}?use_http=true"