/java-threads

Primary LanguageJavaApache License 2.0Apache-2.0

并发程序设计

单一核心处理器的计算速度不能满足用户的需求,所以处理器增加了更多的核心,这样的操作系统就可以同时执行多个任务。这就是所谓的并发处理。
  • 基本的并发概念
  • 并发应用程序中可能出现的问题
  • 设计并发算法的方法论
  • Java并发API

基本并发概念

并发与并行

  • 并发:在单个处理器上采用单核执行多个任务
  • 并行:同一时间在不同的计算机、处理器或处理器核心上同时运行多个任务

同步

一种协调两个或更多任务以获得预期结果的机制
  • 控制同步:由任务的数据依赖关系决定的执行顺序控制
  • 数据访问同步:任意时间里,某个共享变量只能被其中的一个任务访问

临界段: 一段代码,由于它可以访问共享资源,因此在任何给定的时间内,只能够被一个任务执行。 粒度:粗粒度(低互通信的大型任务)和细粒度(高互通信的小型任务)

同步机制的几种实现方式:

  • 信号量(semaphore):一种用于控制对一个或多个单位资源进行访问的机制
    • 互斥(资源空闲和资源忙)
  • 监视器(monitor):一种在共享资源上实现互斥的机制
    • 一个互斥
    • 一个条件
    • 两种操作(等待条件和通报条件):一旦通报了该条件,在等待它的任务中只有一个会继续执行

不可变对象

初始化后,不能修改其可视状态(其属性值)。
如果想修改一个不可变对象,就必须创建一个新的对象。

线程安全

例如:String

原子操作和原子变量

在并发应用程序中,可以通过一个临界段来实现原子操作,以便对整个操作采用同步机制。

原子操作:一种发生在瞬间的操作 原子变量:一种通过原子操作来设置和获取其值的变量

共享内存与消息传递

多任务间互相通信的方法
  • 共享内存:通常用于在同一台计算机上运行多个任务的情况。
    • 任务在读取和写入值的时候使用相同的内存区域。
    • 对该共享内存的访问必须在一个由同步机制保护的临界段内完成。
  • 消息传递:通常用于在不同计算机上运行多个任务的情况
    • 遵循预定义的协议,发送或接收消息
    • 同步通信:发送方保持阻塞并等待相应
    • 异步通信:发送方在发送消息后继续执行自己的流程

并发应用程序中可能出现的问题

数据竞争

多个任务在临界段之外对一个共享变量进行写入操作(无任何同步机制),那么程序可能存在数据竞争

例子

死锁

当两个(或多个)任务正在等待必须由另一个线程释放的某个共享资源,
而该线程又正在等待必须由前述任务之一释放的另一个共享资源时,并发应用程序出现了死锁。

死锁出现的条件(Coffman条件):

  • 互斥:涉及的资源必须是不可共享的
  • 占有并等待条件:一个任务需要多个资源,且已占有的资源在等待时不会释放
  • 不可剥夺:资源只能由持有者主动释放
  • 循环等待:形成资源等待的环形关系

如何避免:

  • 忽略不管:最常用的一种机制
  • 检测:系统中有一项专门分析系统状态的任务,可以检测是否发生了死锁
  • 预防:参照Coffman条件
  • 规避:在开始任务前,取得必须的全部资源信息

活锁

如果系统中有两个任务,他们总是因为对方的行为而改变自己的状态,那么就会出现活锁。
最终结果是他们陷入了状态变更的循环而无法继续向下执行。

资源不足

当某个任务在系统中无法获取维持其继续执行所需的资源时,就会出现资源不足。

公平原则:所有等待某一个资源的任务必须在某一给定的时间之内占有该资源。

  • 实现方式:实现一个算法,在选择下一个将占有某一个资源的任务时,对任务以等待 该资源的时间因素加以考虑
  • 缺点:增加额外的开销,可能会降低程序的吞吐量

优先权反转

一个低优先级的任务持有了一个高优先级任务所需的资源

设计并发算法的方法论

Threading Methodology:Principles and Practices
  1. 起点:算法的一个串行版本
  • 可以使用串行算法来测试并发算法是否生成了正确的结果
  • 可以度量二者的吞吐量,以此来衡量并行处理是否真的对系统产生了好的改善
  1. 第一步:分析
  • 寻找串行版本的哪部分代码可以并行执行
  • 特别关注耗时较多的部分,大概率能获取巨量的性能提升
  • 循环排查法:分析每一个代码块
  1. 第二步: 设计

产生的影响

  • 代码的结构
  • 数据结构的组织

方法:

  • 任务分解:将代码划分成多个可以立即执行的独立任务
  • 数据分解:使用多个实例分别对数据集的每个子集进行处理

eg:创建数据库链接

Note:方案的粒度问题

  1. 第三步:实现

编码实现:最好使用语言提供的内置线程库,而非自己从零开始实现

  1. 第四步:测试

  2. 第五步:调整

通过对比串行和并行两种方式的吞吐量来衡量是否达到了预期结果。

可能的影响因素:

  • 任务的粒度问题
  • 代码的实现方式等

衡量指标:

  • 加速比(speedup):Tspeedup = Tsequential ➗ Tconcurrent
  • Amdahl定律:Speedup <= 1 / ((1 - P) + (P / N))
    • P:可以进行并行化处理的代码的百分比
    • N:用于执行计算的CPU核心数
    • 缺陷:错误的假定了增加CPU核的同时,数据量没变
  • Gustafson定律:Speedup = P - N * (P - 1)
    • P:可以进行并行化处理的代码的百分比
    • N:用于执行计算的CPU核心数

举一个例子

需求描述

单词最佳匹配算法:找出和作为参数的字符串最相似的单词

  • 单词列表
  • 用于评估两个单词之间相似度的指标(使用 Levenshtein 距离来度量两个字符序列的差 异)

程序分解

  • 使用Levenshtein距离,返回与某个字符串最相似的单词列表
  • 使用Levenshtein距离,判定我们的字典当中是否存在某个字符序列

代码设计

公共类

  • WordsLoader 类:用于将单词列表加载到字符串对象列表中
  • LevenshteinDistance 类:用于计算两个字符串之间的 Levenshtein 距离
  • BestMatchingData 类:用于存放最佳匹配算法的结果。它存储了单词列表以及这些单词与输入字符串之间的距离

串行版

  • 检查单词拼写是否正确 > + ExistSerialMain:判断目标单词是否存在
    • ExistSerialCalculation:串行算法-单词是否存在
  • 查找近似的单词 > + BestMatchingSerialMain:获取最佳匹配的单词列表
    • BestMatchingSerialCalculation:串行算法-最佳匹配单词

并行版

  • 检查单词拼写是否正确 > + ExistConcurrentCalculation
    • ExistTask
  • 查找近似的单词 > + BestMatchingConcurrentCalculation:
  • BestMatchingTask:最佳匹配任务 > + BestMatchingConcurrentCalculationV2:升级版

计算及调整

加速比

Amdahl定律

Gustafson定律

结论

  • 并非每一个算法都可以并行化处理
  • 对性能良好的串行版本算法实现并行处理,是不可取的
  • 当实现一个并发程序时,必须要考虑一下几点:
    • 效率:并行版必须比串行版,同数据规模耗时必须少或者同时间内处理的数据量必须多
    • 简单:无论哪种实现,设计必须简单,容易实现、测试、调试和维护
    • 可移植性:并行算法应该只需要很少或不需要修改就能在不同的平台上运行
    • 伸缩性:能动态且充分利用CPU资源

Java并发API

基本并发类

  • Thread类:描述了执行并发java应用的所有线程
  • Runnable接口:创建并发的另一种方式
  • ThreadLocal类:存放从属于某一线程的变量
  • ThreadFactory:实现Factory模式的基类,可以用它来创建定制线程

举一个例子

同步机制

作用

  • 定义用于访问某一共享资源的临界段
  • 在某一共同点上同步不同的任务

几种同步机制

  • synchronized关键字
  • Lock接口:比synchronized更灵活的同步操作
    • ReentrantLock
    • ReentrantReadWriteLock
    • StampedLock
  • Semaphore类:通过实现经典的信号量机制来实现同步
  • CountDownLatch类:允许一个任务等待多项操作的结束
  • CyclicBarrier类:允许多线程在某一共同点上进行同步
  • Phaser类:允许控制多段任务的执行

执行器

实现并发任务的创建和管理分割开来的一种机制。
不必担心线程的创建和管理,只需要关心任务的创建并将其发送给执行器。

执行器框架的基本组件

  • Executor接口和ExecutorService接口:包含了所有执行器共有的execute()方法
  • ThreadPoolExecutor类:获取一个含有线程池的执行器
  • ScheduledThreadPoolExecutor类:在某段延迟之后执行任务或周期性的执行任务
  • Executors:执行器工具类
  • Callable接口:Runnable接口的替换品,可返回任务的执行结果
  • Future接口:包含了一些能获取Callable接口返回值并控制其状态的方法

执行器的特征

  • 不需要创建任何Thread对象。如果要执行一个并发任务,只需要创建一个执行该任务(例如一个实现Runnable接口的类)的实例并且将其发送给执行器。执行器会管理执行该任务的线程。
  • 执行器通过重新使用线程来缩减线程创建带来的开销。在内部,执行器管理着一个线程池,其中的线程称为工作线程(worker-thread)。如果向执行器发送任务而且存在某一空闲的工作线程,那么执行器就会使用该线程执行任务。
  • 使用执行器控制资源很容易。可以限制执行器工作线程的最大数目。如果发送的任务数多于 工作线程数,那么执行器就会将任务存入一个队列。当工作线程完成某个任务的执行后,将 从队列中调取另一个任务继续执行。
  • 你必须以显式方式结束执行器的执行,必须告诉执行器完成执行之后终止所创建的线程。如 若不然,执行器则不会结束执行,这样应用程序也不会结束。

举一个例子

Fork/Join框架

采用分治方法进行计算一批任务。

Fork/Join 框架必须用于解决基于分治方法的问题。必须将原始问题划分为较小的问题,直到问题很小,可以直接解决。

Fork/Join的局限性

  • 不再进行细分的基本问题的规模既不能过大也不能过小。按照 Java API 文档的说明,该基本问题的规模应该介于 100 到 10 000 个基本计算步骤之间
  • 数据可用前,不应使用阻塞型 I/O 操作,例如读取用户输入或者来自网络套接字的数据。这样的操作将导致 CPU 核资源空闲,降低并行处理等级,进而使性能无法达到最佳
  • 不能在任务内部抛出校验异常,必须编写代码来处理异常(例如,陷入未经校验的 RuntimeException)。在后面的例子中你将看到,对于未校验异常有一种特殊的处理方式

Fork/Join 框架的组件

  • ForkJoinPool 类:该类实现了 Executor 接口和 ExecutorService 接口,而执行 Fork/Join 任务时将用到 Executor 接口。Java 提供了一个默认的 ForkJoinPool 对象(称作 公用池),但是如果需要,你还可以创建一些构造函数。你可以指定并行处理的等级(运行 并行线程的最大数目)。默认情况下,它将可用处理器的数目作为并发处理等级。
  • ForkJoinTask 类:这是所有 Fork/Join 任务的基本抽象类。该类是一个抽象类,提供了 fork() 方法和 join()方法,以及这些方法的一些变体。该类还实现了 Future 接口,提供了一些方 法来判断任务是否以正常方式结束,它是否被撤销,或者是否抛出了一个未校验异常。 RecursiveTask 类、RecursiveAction 类和 CountedCompleter 类提供了 compute()抽 象方法。为了执行实际的计算任务,该方法应该在子类中实现。
  • RecursiveTask 类:该类扩展了 ForkJoinTask 类。RecursiveTask 也是一个抽象类,而 且应该作为实现返回结果的 Fork/Join 任务的起点。
  • RecursiveAction 类:该类扩展了 ForkJoinTask 类。RecursiveAction 类也是一个抽 象类,而且应该作为实现不返回结果的 Fork/Join 任务的起点。
  • CountedCompleter 类:该类扩展了 ForkJoinTask 类。该类应作为实现任务完成时触发另 一任务的起点。

举一个例子

并行流

  • Stream接口
  • Optional
  • Collectors
  • Lambda表达式

主要特征

  • 流并不存储元素。它们只处理存放在数据源(数据结构、文件等)中的元素
  • 流不可重用
  • 流可对数据进行延迟处理
  • 流操作不能修改流的源
  • 流允许你进行链式操作,因此一项操作的输出是下一项操作的输入

扩展阅读