/hyperion

✨ Clever Tricks Under Concurrency

Primary LanguageJavaApache License 2.0Apache-2.0

多线程的各种玩法

  • 多线程间的协调性,乱序性。
  • 并发(concurrent)与并行(parallel)
  • 临界区
  • 活跃性问题:死锁Deadlock,饥饿Starvation,活锁Livelock
  • 锁竞争带来的系统开销,线程间频繁调度带来的开销。
  • 并发级别:
    • 阻塞
    • 无饥饿(Starvation-Free),公平与非公平锁
    • 无障碍(Obstruction-free),最弱的非阻塞调度,回滚机制,一致性标记
    • 无锁,一个线程可以在有限步完成操作,CAS
    • 无等待,所有线程都可以在有限步完成操作,RCU
  • JMM,数据访问一致性
    • Atomicity,(JVM32bit,Long)
    • Visibility,(cache)
    • Ordering,指令重排(指令流水线,尽量少的中断流水线),
      • Happen-Before规则
        • 保证语义的串行性
  • 由于java单继承,继承本身是很宝贵的资源
  • 给线程取名,利于调试。
  • Thread.stop(),立即终止线程,释放所有锁,可能导致数据不一致。
  • 中断线程(设置中断标志位)
    • void Thread.interrupt()
    • boolean Thread.isInterrupted()
    • static boolean Thread.interrupted(),并清除当前中断状态
    • 线程中需要有处理中断的逻辑,否则中断无效(除了sleep(), wait())
  • sleep()
    • 由于中断抛出InterruptedException异常,会清除中断标记位,所以需要再次设置。
  • wait/notify
    • Object类方法,将对象作为线程间有效通信手段。
    • 需要持有监视器锁,需与synchronized合作使用。
    • wait会释放锁,sleep不释放。
    • notify()唤醒等待队列, 不释放锁,随机不公平唤醒.
    • wait()被唤醒,还需等待重新获得锁,才能执行
  • suspend/resume
    • suspend挂起线程,并不会释放任何锁资源。
    • 被挂起的线程,状态还是Runnable。
    • resume可能意外在suspend前执行。
  • join/yield
    • join调用了线程wait,被等待线程退出前,调用notifyAll()。
    • 不要在Thread对象实例上使用wait或notify,影响系统API。
    • yield使当前线程让出cpu。
  • volatile
    • 保证数据原子性,有序性,可见性。
  • 守护线程daemon,当系统没有用户线程时,则退出jvm。
  • synchronized
    • 指定加锁对象
    • 作用于实例方法
    • 作用于静态方法
    • 非公平锁
  • 重入锁,ReentrantLock,在JDK内部广泛使用,(ArrayBlockingQueue)
    • synchronized的增强版本,JDK6之后,两者性能差距不大。
    • 同一线程,反复加锁,释放次数大于锁定次数,IllegalMonitorStateException.
      • 高级功能
        • 中断响应,lockInterruptibly()
        • 限时等待,tryLock(5, TimeUnit.SECONDS), tryLock()不带参数,若锁被其他线程占用,则立即返回false。
        • 公平锁(必然要维护一个有序队列)
      • 原子状态通过CAS操作,存储当前锁状态。
    • Condition,使用时,要求线程持有相关的重入锁。
      • await,使当前线程等待,并释放锁,
      • awaitUninterruptibly
      • singal/singalAll
  • Semaphore
    • 对锁的扩展,指定同时有多少个线程可以访问某一资源。
    • Semaphore(int permits)/ Semaphore(int permits, boolean fair) 指定信号量的准入数。
    • acquire 尝试获得一个许可,若无法获得,则线程等待。
    • acquireUninterruptibly
    • tryAcquire、tryAcquire( long timeout, TimeUnit unit)
    • release
    • 若发生信号量泄露(申请了但没有释放),则进入临界区的线程越来越少。
  • ReadWriteLock
    • ReentrantReadWriteLock
    • 读读时非阻塞,读操作远大于写操作,发挥最大功效。
  • CountDownLatch
    • 让某一线程等待,直到倒计时结束。e.g 火箭发射倒计时
    • CountDownLatch(int count)
    • countDown()
    • await()
  • CyclicBarrier
    • CyclicBarrier(int parties, Runnable barrierAction), barrierAction当计数器一次计数完成后,执行动作额e.g 士兵集合。
    • 计数器可以反复使用。
    • await()
    • InterruptedException, BrokenBarrierException表示当前的CyclicBarrier已经破损,避免线程进行永久无谓等待(其中一个线程已被中断,等待没有结果)。
  • LockSupport
    • static park, 线程WAITING状态。
    • static unpark(Thread t)
    • 方便实用的线程阻塞工具,弥补resume发生在suspend之前的问题,类似信号量机制,每个线程一个许可。
    • 支持中断影响,不抛出InterruptedException,可使用Thread.interrupted()获得标记。
    • 不要求持有监视器锁。
  • Phaser
  • Exchanger
  • 线程池
    • 线程池中,总有那么几个活跃线程,完成工作时,并不急着关闭线程,而是将线程退回池中。
    • 使用线程池后,创建线程变成了从线程池中获得空闲线程,关闭线程变成了向池子归还线程。
    • JDK提供一套Executor框架,Executors扮演线程池工厂角色。
      • newFixedThreadPool(int nThreads)
        • 若没有空闲线程,则新任务被暂存在一个任务队列中。
        • new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
      • newSingleThreadExecutor()
        • 若没有空闲线程,则新任务被暂存在一个任务队列中。
        • new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
      • newCachedThreadPool()
        • 返回一个可根据实际情况调整线程数量的池。
        • new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>())
      • newSingleThreadScheduledExecutor()
      • newScheduledThreadPool(int corePoolSize)
        • schedule()
        • scheduleAtFixedRate(),以上次任务开始执行时间为起点。如果任务执行时间超过调度时间,不会堆叠任务,任务会在上个任务结束后,立即被调用。
        • scheduleWithFixedDelay(),以上次任务结束后为起点。
        • 如果任务本身抛出异常,后续的所有执行都会被中断,因此需要做好异常处理。
    • ThreadPoolExecutor
      • ThreadPoolExecutor(int corePoolSize, int maxinumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler
      • keepAliveTime: 线程池中线程数量超过corePoolSize时,多余空闲线程的存活时间。
      • workQueue
        • 直接提交队列,SynchronousQueue,没有容量,插入操作等待删除操作,没有空闲线程,则尝试创建新的进程,线程数达到最大值,则执行拒绝策略。
        • 有界队列,ArrayBlockingQueue(int capacity), 当线程数小于core,则创建新线程,等于core时,加入队列中,当队列满时,提升线程数量,如果大于max,则执行拒绝策略。除非系统非常繁忙,否则确保核心线程数维持在corePoolSize。
        • 无界队列,LinkedBlockingQueue,线程数小于core,则创建新线程,达到core后,加入队列等待,直到耗尽系统内存。
        • 优先队列,PriorityBlockingQueue,特殊的无界队列,可以根据任务自身的优先级顺序先后执行。
      • corePoolSize??
    • 拒绝策略,实现RejectedExecutionHandler
      • 任务数量超过系统实际承载能力
      • AbortPolicy, 直接抛出异常,阻止系统工作。
      • CallerRunsPolicy,在调用者线程中,运行当前被丢弃任务。
      • DiscardOldestPolicy,丢弃最老的请求(即将被执行的任务)。
      • DiscardPolicy,丢弃无法处理任务。
    • ThreadFactory
      • 创建线程,Thread newThread(Runnable r)
    • 扩展线程池, ThreadPoolExecutor提供三个接口
      • beforeExecute(Thread t, Runnable r)
      • afterExecute(Thread t, Runnable r)
      • terminated()
    • shutdown() 并不会立即暴力终止所有任务,只是发送关闭信号,线程池不再接受其他新的任务。
    • shutdownNow()
    • 线程数量: Nthreads = Ncpu * Ucpu * (1 + W/C), Ncpu:可用cpu数量,Ucpu:cpu使用率, W/C:等待时间/计算时间
    • 线程池会吃掉抛出的异常。
      • 放弃submit,改用execute或者使用future.get(), 可以得到部分堆栈信息。
  • Fork/join
    • 由于线程池的优化,提交任务与线程数量并不是一对一的,当线程空闲,试图帮助别人时,从任务队列的底部拿数据。
    • ForkJoinPool
      • RecursiveTask
  • JDK并发容器
    • 线程安全的HashMap
      • Collections.synchronizedMap(new HashMap()), 使用委托,如果并发级别不高,够用。
      • ConcurrentHashMap
    • 有关List的线程安全
      • Collections.synchronizedList()
      • ConcurrentLinkedQueue, 堆node操作,使用CAS
      • CopyOnWriteArrayList,只有写写时,才发生阻塞,在写入时,进行自我复制,将修改内容写入副本,何时写入主存??
      • BlockingQueue,适合线程间数据共享,意见箱的例子,当服务线程的队列为空,则进行等待,新消息进入队列,则自动唤醒线程。
        • ArrayBlockingQueue
        • ListedBlockingQueue
        • put()/take() 是Blocking的关键。offer/poll并不阻塞
          • offer,队列满了,则立即返回false
          • put, 队列满,则进行等待
        • 内部使用Condition,while(count == 0) notEmpty.await();
        • 通信两端更换线程,可以平滑过渡
      • SkipList
        • ConcurrentSkipListMap
        • 用空间换时间。
        • 维护多个有序链表,并且链表是分层,每上面一层链表是下层的子集。
  • 提高锁性能
    • 减小锁持有时间
    • 减小锁粒度,ConcurrentHashMap
    • 读写分离锁替换独占锁
    • 锁分离,LinkedBlockingQueue
    • 锁粗化,对同一个锁不停地进行请求,释放,会消耗宝贵资源。
  • JDK内部的锁优化策略
    • 尽量避免线程在操作系统层面真实挂起。
    • 偏向锁,连续多次由同一线程请求相同锁,无须再做同步操作, -XX:+UseBiasedLocking
    • 轻量级锁,如果偏向锁失败,jvm并不会立即挂起线程,将对象头部作为指针,指向持有锁的堆栈内部,判断线程是否持有对象锁,如果加锁失败,就会膨胀为重量级锁。??
    • 自旋锁,锁膨胀后,jvm为了避免在操作系统层面真实挂起,系统进行一次赌注,让当前线程做几个空循环,如果还得不到锁,则真实地将线程在操作层面挂起。
    • 锁消除,逃逸分析,-XX:+DoEscapeAnalysis, -XX:+EliminateLocks
  • ThreadLocal
    • 维持着一个类似ThreadLocal变量的集合ThreadLocalMap,key为ThreadLocal的当前对象。
    • 只要线程不退出,对象引用将一直存在。
    • 如果使用线程池,线程总是存在,则有可能出现内存泄露。
    • remove(),防止内存泄露。
    • ThreadLocalMap的实现使用了弱引用,gc时,发现弱引用,就会立即回收。

  • CAS,额外给出一个期望值E
    • CAS(V, E, N), V:要更新的变量 E: 预期值 N:新值, V==E时,V=N;否则返回V;
    • 不足,如果对象连续被修改两次,
    • atomic包内,实现使用CAS操作。
    • AtomicInteger
      • value保存实际取值
      • long valueOffset 偏移量
      • Unsafe,指针,getUnsafe()如果这个类的ClassLoader不为null,则会抛异常,是JDK内部专属类。
    • AtomicReference
      • compareAndSet(E, N)
      • 优惠券重复赠予的例子
    • AtomicStampedReference
      • 带时间戳的对象引用
      • 使用时间戳(int)记录状态变换
    • AtomicIntegerArray
    • AtomicIntegerFieldUpdater
      • Updater只修改范围内可见变量
      • volatile类型
      • 不支持static
    • CAS通过对象实例中的偏移量直接进行赋值??
  • 死锁:哲学家就餐问题,cpu占用率为0.
  • 用jps得到java进程ID,jstack得到线程堆栈
  • 多线程调试,为断点设置额外属性
  • 单例,private static 为了保证instance的安全。
  • 不变模式final
    • 里氏替换原则,子类可以完成替代父类
    • 不变模式通过回避问题的态度,解决并发访问控制。
    • 基本数据类型和String类型,使用不变模式,因此所有实例的方法均不需要进行同步操作。?
  • 生产者-消费者模式
    • 不直接通信,通过共享内存缓冲区(1.数据在多线程间共享,2.缓解性能差)
  • CPU Cache优化,解决伪共享问题。cpu高速缓存,读写数据最小单位是cache line,可能多个变量位于同一cache line,导致更新其中一个变量,导致其他cpu上缓存失效,无法命中,使系统吞吐量急剧下降,可使用padding填充变量前后。
  • future模式
    • 无法立即给出需要数据,但会返回一个契约,将来,可凭借这个契约重新获取需要的信息。
    • JDK的future模式
      • Callable
      • FutureTask
  • 并行流水线
    • 执行过程中有数据相关性的运算都是无法完美并行化的。
  • 并行搜索
  • Java8
    • 函数式编程
      • 函数作为一等公民
        • 将函数作为参数传递给另一个函数
        • 函数可以作为另一个函数的返回值
        • 无副作用(显式函数,与外界交换数据的唯一方式,就是参数和返回值)
        • 申明式的(所有的细节被程序库所封装),而非命令式
        • 不变的对象
        • 易于并行,完全没必要进行任何同步操作。
        • 更少代码
      • FunctionalInterface
        • 只定义了单一抽象方法的接口(不是只能有一个方法,任何被Object实现的方法,都不能视为抽象方法)
        • 函数式接口的实例可由方法引用或lambda进行构造
      • 接口默认方法
        • 这一改进使java8拥有类似于多继承的能力
        • 同时也引入了钻石问题
      • lambda
      • 方法引用
    • StampedLock
      • 通过引入乐观锁,增加读写锁的并行度
      • writeLock、readLock,悲观锁
      • tryOptimisticRead尝试一次乐观锁,validate()判断是否发生修改。
      • 内部实现,使用Unsafe.park(),遇到线程中断时,会直接返回,如果退出CAS循环条件得不到满足,则可能会再次进入循环。
      • 基于CLH锁,自旋锁,维护等待线程队列,每个节点保存当前线程是否释放锁的标记位。
    • CompletableFuture
      • 超大型工具类,接口定义约40种方法,为了流式调用准备的。
      • 可以手动设置完成状态complete()
      • supplyAsync(Supplier<U> supplier, Executor executor)/runAsync(),可接收Executor参数,若不指定,则默认在ForkJoinPool.common线程池中执行(所有线程都是Daemon)
      • 流式调用thenApply()
      • 异常处理exceptionally()
      • 组合多个CompletableFuture,thenCompose()/thenCombine()
    • LongAdder
      • 使用热点分离,类似ConcurrentHashMap,热点数据value被分离成多个单元。
      • 开始不会动用数据分离,先将数据记录在base变量,如果修改时发生冲突,则扩展为cell数据。
      • 避免伪共享,不是使用padding,而是用@sum.misc.Contended,自己应用这个注解时,加上-XX:-RestrictContended
    • LongAccumulator
      • 扩展LongAdder、可以实现任意函数操作LongAccumulator(LongBinaryOperator, long)

文档入口