2881099/FreeScheduler

建议:提供独立的调度任务存储器接口

Opened this issue · 7 comments

提供独立的任务存储器接口,用于支持多种存储方式,例如:RedisMongoDBMemcachedMemory

Reids/Memcached/Memory 可以实现,不过 TaskLog 要不要写进 Redis 呢,是否合适?

MongoDB 就比较简单了。

FreeRedis 实现:

using FreeRedis;
using System;
using System.Collections.Generic;

namespace FreeScheduler.TaskHandlers
{
    public class FreeRedisHandler : ITaskHandler
    {
        readonly RedisClient _cli;
        public FreeRedisHandler(RedisClient cli)
        {
            if (cli.Serialize == null || cli.Deserialize == null) throw new Exception("FreeRedis 必须设置了序列化/反序列化 cli.Serialize/Deserialize");
            _cli = cli;
        }

        public IEnumerable<TaskInfo> LoadAll()
        {
            var taskIds = _cli.ZRange($"FreeScheduler_zset_{TaskStatus.Running}", 0, -1);
            if (taskIds.Length == 0) return new TaskInfo[0];
            return _cli.HMGet<TaskInfo>("FreeScheduler_hset", taskIds);
        }
        public TaskInfo Load(string id)
        {
            return _cli.HGet<TaskInfo>("FreeScheduler_hset", id);
        }
        public void OnAdd(TaskInfo task)
        {
            var taskScore = (decimal)task.CreateTime.Subtract(_2020).TotalSeconds;
            using (var pipe = _cli.StartPipe())
            {
                pipe.HSet("FreeScheduler_hset", task.Id, task);
                pipe.ZAdd($"FreeScheduler_zset_{task.Status}", taskScore, task.Id);
                pipe.EndPipe();
            }
        }
        public void OnRemove(TaskInfo task)
        {
            using (var pipe = _cli.StartPipe())
            {
                pipe.HDel("FreeScheduler_hset", task.Id);
                pipe.ZRem($"FreeScheduler_zset_{TaskStatus.Running}", task.Id);
                pipe.ZRem($"FreeScheduler_zset_{TaskStatus.Paused}", task.Id);
                pipe.ZRem($"FreeScheduler_zset_{TaskStatus.Completed}", task.Id);
                pipe.HDel($"FreeScheduler_zset_log_{task.Id}");
                pipe.EndPipe();
            }
        }
        public void OnExecuted(Scheduler scheduler, TaskInfo task, TaskLog result)
        {
            var t = Load(task.Id);
            var status = t.Status;
            t.CurrentRound = task.CurrentRound;
            t.ErrorTimes = task.ErrorTimes;
            t.LastRunTime = task.LastRunTime;
            t.Status = task.Status;
            var taskScore = (decimal)task.CreateTime.Subtract(_2020).TotalSeconds;
            var resultScore = (decimal)result.CreateTime.Subtract(_2020).TotalSeconds;
            var resultMember = _cli.Serialize(result)?.ToString();
            using (var pipe = _cli.StartPipe())
            {
                pipe.HSet("FreeScheduler_hset", task.Id, task);
                if (status != t.Status)
                    pipe.ZRem($"FreeScheduler_zset_{status}", task.Id);
                pipe.ZAdd($"FreeScheduler_zset_{t.Status}", taskScore, task.Id);
                //pipe.ZAdd("FreeScheduler_zset_log", resultScore, resultMember);
                pipe.ZAdd($"FreeScheduler_zset_log_{task.Id}", resultScore, resultMember);
                pipe.EndPipe();
            }
        }
        readonly DateTime _2020 = new DateTime(2020, 1, 1);

        public virtual void OnExecuting(Scheduler scheduler, TaskInfo task)
        {
            Console.WriteLine($"[{DateTime.Now.ToString("HH:mm:ss")}] {task.Topic} 被执行,还剩 {scheduler.QuantityTask} 个循环任务");
        }
    }
}

TaskLog 还是需要写入到Redis当中,如果出错了,更容易死排查错误,同时成功的调度任务需要支持配置多长时间自动清理掉。

按照上面的代码

FreeScheduler_hset 存储所有任务数据
进行中的任务,在 FreeScheduler_zset_Running
暂停的任务,在 FreeScheduler_zset_Paused
成功的任务,会移至 FreeScheduler_zset_Completed

FreeScheduler_zset_log_{task.Id},存储任务对应的日志

查询运行中的任务:

var taskIds = _cli.ZRange($"FreeScheduler_zset_{TaskStatus.Running}", 0, -1);
if (taskIds.Length == 0) return new TaskInfo[0];
return _cli.HMGet<TaskInfo>("FreeScheduler_hset", taskIds);
sgf commented

补充一下,还有方便进行 LiteDB等 NoSql数据库的支持