/mmc-juc

Leveraging AI large models, the multi-threaded library encapsulated based on ForkJoinPool is particularly suitable for single long-task scenarios, such as traversing DB and other retrieval tasks. It also supports batch processing of small tasks, such as batch processing of Kafka messages, and is ready to use out of the box.

Primary LanguageJavaApache License 2.0Apache-2.0

mmc-juc [V1.1]

利用AI大模型,基于ForkJoinPool封装的多线程库,尤其适合单次长任务场景,例如遍历DB等检索任务,也支持批量小任务,例如kafka消息批量处理等,开箱即用。

一、功能特性

  • 支持设置线程池并发度
  • 支持设置任务池和来源
  • 支持控制执行速率
  • 支持设置任务处理器
  • 支持设置任务监听器

1、引入最新依赖包,如果找不到依赖包,请到工程目录mvn clean package install执行一下命令。

<dependency>
    <groupId>io.github.vipjoey</groupId>
    <artifactId>mmc-juc</artifactId>
    <version>1.1</version>
</dependency>

二、快速开始

2、同步执行任务。

    MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder()
        .taskSource(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList()))
        .taskProcessor(x -> x.stream().reduce(0, Integer::sum))
        .taskMerger(Integer::sum)
        .threshold(10)
        .taskName("mmcTaskExample")
        .rateLimiter(10, 20)  // 设置速率限制
        .forkJoinPoolConcurrency(4) // 设置ForkJoinPool的并发度为4
        .build();

    System.out.println("result: " + mmcTaskExecutor.execute());

3、异步执行任务。

    MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder()
        .taskSource(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList()))
        .taskProcessor(x -> x.stream().reduce(0, Integer::sum))
        .taskMerger(Integer::sum)
        .threshold(10)
        .taskName("mmcTaskExample")
        .rateLimiter(10, 20)  // 设置速率限制
        .forkJoinPoolConcurrency(4) // 设置ForkJoinPool的并发度为4
        .build();

    mmcTaskExecutor.commit((result -> System.out.println(result)));

4、多次提交小任务。

public static void main(String[] args) {

        // 从0加到100任务
        List<Integer> taskSource = new ArrayList<>();
        for (int i = 1; i <= 100; i++) {
            taskSource.add(i);
        }

        // 任务容器
        MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder()
        .threshold(10)
        .rateLimiter(10, 20)  // 设置速率限制
        .forkJoinPoolConcurrency(4) // 设置ForkJoinPool的并发度为4
        .build();
        
        // 同步提交任务
        Integer r = mmcTaskExecutor.execute(MmcTask.<Integer, Integer>builder()
            .taskSource(taskSource)
            .taskProcessor(x -> x.stream().reduce(0, Integer::sum))
            .taskMerger(Integer::sum)
            .rateLimiter(new TokenBucket(10, 20))
            .taskListener(new DefaultMmcTaskListener())
            .threshold(10)
            .start(0)
            .end(taskSource.size())
            .taskName("taskName")
            .build()
        );
        System.out.println("result: " + r);
}

三、变更记录

  • 20240826 v1.0 初始化

四、参考文章

五、特别说明

  • 欢迎共建
  • 佛系改bug