- 多线程间的协调性,乱序性。
- 并发(concurrent)与并行(parallel)
- 临界区
- 活跃性问题:死锁Deadlock,饥饿Starvation,活锁Livelock
- 锁竞争带来的系统开销,线程间频繁调度带来的开销。
- 并发级别:
- 阻塞
- 无饥饿(Starvation-Free),公平与非公平锁
- 无障碍(Obstruction-free),最弱的非阻塞调度,回滚机制,一致性标记
- 无锁,一个线程可以在有限步完成操作,CAS
- 无等待,所有线程都可以在有限步完成操作,RCU
- JMM,数据访问一致性
- Atomicity,(JVM32bit,Long)
- Visibility,(cache)
- Ordering,指令重排(指令流水线,尽量少的中断流水线),
- Happen-Before规则
- 保证语义的串行性
- Happen-Before规则
- 由于java单继承,继承本身是很宝贵的资源
- 给线程取名,利于调试。
- Thread.stop(),立即终止线程,释放所有锁,可能导致数据不一致。
- 中断线程(设置中断标志位)
- void Thread.interrupt()
- boolean Thread.isInterrupted()
- static boolean Thread.interrupted(),并清除当前中断状态
- 线程中需要有处理中断的逻辑,否则中断无效(除了sleep(), wait())
- sleep()
- 由于中断抛出InterruptedException异常,会清除中断标记位,所以需要再次设置。
- wait/notify
- Object类方法,将对象作为线程间有效通信手段。
- 需要持有监视器锁,需与synchronized合作使用。
- wait会释放锁,sleep不释放。
- notify()唤醒等待队列, 不释放锁,随机不公平唤醒.
- wait()被唤醒,还需等待重新获得锁,才能执行
- suspend/resume
- suspend挂起线程,并不会释放任何锁资源。
- 被挂起的线程,状态还是Runnable。
- resume可能意外在suspend前执行。
- join/yield
- join调用了线程wait,被等待线程退出前,调用notifyAll()。
- 不要在Thread对象实例上使用wait或notify,影响系统API。
- yield使当前线程让出cpu。
- volatile
- 保证数据原子性,有序性,可见性。
- 守护线程daemon,当系统没有用户线程时,则退出jvm。
- synchronized
- 指定加锁对象
- 作用于实例方法
- 作用于静态方法
- 非公平锁
- 重入锁,ReentrantLock,在JDK内部广泛使用,(ArrayBlockingQueue)
- synchronized的增强版本,JDK6之后,两者性能差距不大。
- 同一线程,反复加锁,释放次数大于锁定次数,IllegalMonitorStateException.
- 高级功能
- 中断响应,lockInterruptibly()
- 限时等待,tryLock(5, TimeUnit.SECONDS), tryLock()不带参数,若锁被其他线程占用,则立即返回false。
- 公平锁(必然要维护一个有序队列)
- 原子状态通过CAS操作,存储当前锁状态。
- 高级功能
- Condition,使用时,要求线程持有相关的重入锁。
- await,使当前线程等待,并释放锁,
- awaitUninterruptibly
- singal/singalAll
- Semaphore
- 对锁的扩展,指定同时有多少个线程可以访问某一资源。
- Semaphore(int permits)/ Semaphore(int permits, boolean fair) 指定信号量的准入数。
- acquire 尝试获得一个许可,若无法获得,则线程等待。
- acquireUninterruptibly
- tryAcquire、tryAcquire( long timeout, TimeUnit unit)
- release
- 若发生信号量泄露(申请了但没有释放),则进入临界区的线程越来越少。
- ReadWriteLock
- ReentrantReadWriteLock
- 读读时非阻塞,读操作远大于写操作,发挥最大功效。
- CountDownLatch
- 让某一线程等待,直到倒计时结束。e.g 火箭发射倒计时
- CountDownLatch(int count)
- countDown()
- await()
- CyclicBarrier
- CyclicBarrier(int parties, Runnable barrierAction), barrierAction当计数器一次计数完成后,执行动作额e.g 士兵集合。
- 计数器可以反复使用。
- await()
- InterruptedException, BrokenBarrierException表示当前的CyclicBarrier已经破损,避免线程进行永久无谓等待(其中一个线程已被中断,等待没有结果)。
- LockSupport
- static park, 线程WAITING状态。
- static unpark(Thread t)
- 方便实用的线程阻塞工具,弥补resume发生在suspend之前的问题,类似信号量机制,每个线程一个许可。
- 支持中断影响,不抛出InterruptedException,可使用Thread.interrupted()获得标记。
- 不要求持有监视器锁。
- Phaser
- Exchanger
- 线程池
- 线程池中,总有那么几个活跃线程,完成工作时,并不急着关闭线程,而是将线程退回池中。
- 使用线程池后,创建线程变成了从线程池中获得空闲线程,关闭线程变成了向池子归还线程。
- JDK提供一套Executor框架,Executors扮演线程池工厂角色。
- newFixedThreadPool(int nThreads)
- 若没有空闲线程,则新任务被暂存在一个任务队列中。
- new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
- newSingleThreadExecutor()
- 若没有空闲线程,则新任务被暂存在一个任务队列中。
- new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
- newCachedThreadPool()
- 返回一个可根据实际情况调整线程数量的池。
- new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>())
- newSingleThreadScheduledExecutor()
- newScheduledThreadPool(int corePoolSize)
- schedule()
- scheduleAtFixedRate(),以上次任务开始执行时间为起点。如果任务执行时间超过调度时间,不会堆叠任务,任务会在上个任务结束后,立即被调用。
- scheduleWithFixedDelay(),以上次任务结束后为起点。
- 如果任务本身抛出异常,后续的所有执行都会被中断,因此需要做好异常处理。
- newFixedThreadPool(int nThreads)
- ThreadPoolExecutor
- ThreadPoolExecutor(int corePoolSize, int maxinumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler
- keepAliveTime: 线程池中线程数量超过corePoolSize时,多余空闲线程的存活时间。
- workQueue
- 直接提交队列,SynchronousQueue,没有容量,插入操作等待删除操作,没有空闲线程,则尝试创建新的进程,线程数达到最大值,则执行拒绝策略。
- 有界队列,ArrayBlockingQueue(int capacity), 当线程数小于core,则创建新线程,等于core时,加入队列中,当队列满时,提升线程数量,如果大于max,则执行拒绝策略。除非系统非常繁忙,否则确保核心线程数维持在corePoolSize。
- 无界队列,LinkedBlockingQueue,线程数小于core,则创建新线程,达到core后,加入队列等待,直到耗尽系统内存。
- 优先队列,PriorityBlockingQueue,特殊的无界队列,可以根据任务自身的优先级顺序先后执行。
- corePoolSize??
- 拒绝策略,实现RejectedExecutionHandler
- 任务数量超过系统实际承载能力
- AbortPolicy, 直接抛出异常,阻止系统工作。
- CallerRunsPolicy,在调用者线程中,运行当前被丢弃任务。
- DiscardOldestPolicy,丢弃最老的请求(即将被执行的任务)。
- DiscardPolicy,丢弃无法处理任务。
- ThreadFactory
- 创建线程,Thread newThread(Runnable r)
- 扩展线程池, ThreadPoolExecutor提供三个接口
- beforeExecute(Thread t, Runnable r)
- afterExecute(Thread t, Runnable r)
- terminated()
- shutdown() 并不会立即暴力终止所有任务,只是发送关闭信号,线程池不再接受其他新的任务。
- shutdownNow()
- 线程数量: Nthreads = Ncpu * Ucpu * (1 + W/C), Ncpu:可用cpu数量,Ucpu:cpu使用率, W/C:等待时间/计算时间
- 线程池会吃掉抛出的异常。
- 放弃submit,改用execute或者使用future.get(), 可以得到部分堆栈信息。
- Fork/join
- 由于线程池的优化,提交任务与线程数量并不是一对一的,当线程空闲,试图帮助别人时,从任务队列的底部拿数据。
- ForkJoinPool
- RecursiveTask
- JDK并发容器
- 线程安全的HashMap
- Collections.synchronizedMap(new HashMap()), 使用委托,如果并发级别不高,够用。
- ConcurrentHashMap
- 有关List的线程安全
- Collections.synchronizedList()
- ConcurrentLinkedQueue, 堆node操作,使用CAS
- CopyOnWriteArrayList,只有写写时,才发生阻塞,在写入时,进行自我复制,将修改内容写入副本,何时写入主存??
- BlockingQueue,适合线程间数据共享,意见箱的例子,当服务线程的队列为空,则进行等待,新消息进入队列,则自动唤醒线程。
- ArrayBlockingQueue
- ListedBlockingQueue
- put()/take() 是Blocking的关键。offer/poll并不阻塞
- offer,队列满了,则立即返回false
- put, 队列满,则进行等待
- 内部使用Condition,while(count == 0) notEmpty.await();
- 通信两端更换线程,可以平滑过渡
- SkipList
- ConcurrentSkipListMap
- 用空间换时间。
- 维护多个有序链表,并且链表是分层,每上面一层链表是下层的子集。
- 线程安全的HashMap
- 提高锁性能
- 减小锁持有时间
- 减小锁粒度,ConcurrentHashMap
- 读写分离锁替换独占锁
- 锁分离,LinkedBlockingQueue
- 锁粗化,对同一个锁不停地进行请求,释放,会消耗宝贵资源。
- JDK内部的锁优化策略
- 尽量避免线程在操作系统层面真实挂起。
- 偏向锁,连续多次由同一线程请求相同锁,无须再做同步操作, -XX:+UseBiasedLocking
- 轻量级锁,如果偏向锁失败,jvm并不会立即挂起线程,将对象头部作为指针,指向持有锁的堆栈内部,判断线程是否持有对象锁,如果加锁失败,就会膨胀为重量级锁。??
- 自旋锁,锁膨胀后,jvm为了避免在操作系统层面真实挂起,系统进行一次赌注,让当前线程做几个空循环,如果还得不到锁,则真实地将线程在操作层面挂起。
- 锁消除,逃逸分析,-XX:+DoEscapeAnalysis, -XX:+EliminateLocks
- ThreadLocal
- 维持着一个类似ThreadLocal变量的集合ThreadLocalMap,key为ThreadLocal的当前对象。
- 只要线程不退出,对象引用将一直存在。
- 如果使用线程池,线程总是存在,则有可能出现内存泄露。
- remove(),防止内存泄露。
- ThreadLocalMap的实现使用了弱引用,gc时,发现弱引用,就会立即回收。
- ?
- CAS,额外给出一个期望值E
- CAS(V, E, N), V:要更新的变量 E: 预期值 N:新值, V==E时,V=N;否则返回V;
- 不足,如果对象连续被修改两次,
- atomic包内,实现使用CAS操作。
- AtomicInteger
- value保存实际取值
- long valueOffset 偏移量
- Unsafe,指针,getUnsafe()如果这个类的ClassLoader不为null,则会抛异常,是JDK内部专属类。
- AtomicReference
- compareAndSet(E, N)
- 优惠券重复赠予的例子
- AtomicStampedReference
- 带时间戳的对象引用
- 使用时间戳(int)记录状态变换
- AtomicIntegerArray
- AtomicIntegerFieldUpdater
- Updater只修改范围内可见变量
- volatile类型
- 不支持static
- CAS通过对象实例中的偏移量直接进行赋值??
- 死锁:哲学家就餐问题,cpu占用率为0.
- 用jps得到java进程ID,jstack得到线程堆栈
- 多线程调试,为断点设置额外属性
- 单例,private static 为了保证instance的安全。
- 不变模式final
- 里氏替换原则,子类可以完成替代父类
- 不变模式通过回避问题的态度,解决并发访问控制。
- 基本数据类型和String类型,使用不变模式,因此所有实例的方法均不需要进行同步操作。?
- 生产者-消费者模式
- 不直接通信,通过共享内存缓冲区(1.数据在多线程间共享,2.缓解性能差)
- CPU Cache优化,解决伪共享问题。cpu高速缓存,读写数据最小单位是cache line,可能多个变量位于同一cache line,导致更新其中一个变量,导致其他cpu上缓存失效,无法命中,使系统吞吐量急剧下降,可使用padding填充变量前后。
- future模式
- 无法立即给出需要数据,但会返回一个契约,将来,可凭借这个契约重新获取需要的信息。
- JDK的future模式
- Callable
- FutureTask
- 并行流水线
- 执行过程中有数据相关性的运算都是无法完美并行化的。
- 并行搜索
- Java8
- 函数式编程
- 函数作为一等公民
- 将函数作为参数传递给另一个函数
- 函数可以作为另一个函数的返回值
- 无副作用(显式函数,与外界交换数据的唯一方式,就是参数和返回值)
- 申明式的(所有的细节被程序库所封装),而非命令式
- 不变的对象
- 易于并行,完全没必要进行任何同步操作。
- 更少代码
- FunctionalInterface
- 只定义了单一抽象方法的接口(不是只能有一个方法,任何被Object实现的方法,都不能视为抽象方法)
- 函数式接口的实例可由方法引用或lambda进行构造
- 接口默认方法
- 这一改进使java8拥有类似于多继承的能力
- 同时也引入了钻石问题
- lambda
- 方法引用
- 函数作为一等公民
- StampedLock
- 通过引入乐观锁,增加读写锁的并行度
- writeLock、readLock,悲观锁
- tryOptimisticRead尝试一次乐观锁,validate()判断是否发生修改。
- 内部实现,使用Unsafe.park(),遇到线程中断时,会直接返回,如果退出CAS循环条件得不到满足,则可能会再次进入循环。
- 基于CLH锁,自旋锁,维护等待线程队列,每个节点保存当前线程是否释放锁的标记位。
- CompletableFuture
- 超大型工具类,接口定义约40种方法,为了流式调用准备的。
- 可以手动设置完成状态complete()
- supplyAsync(Supplier<U> supplier, Executor executor)/runAsync(),可接收Executor参数,若不指定,则默认在ForkJoinPool.common线程池中执行(所有线程都是Daemon)
- 流式调用thenApply()
- 异常处理exceptionally()
- 组合多个CompletableFuture,thenCompose()/thenCombine()
- LongAdder
- 使用热点分离,类似ConcurrentHashMap,热点数据value被分离成多个单元。
- 开始不会动用数据分离,先将数据记录在base变量,如果修改时发生冲突,则扩展为cell数据。
- 避免伪共享,不是使用padding,而是用@sum.misc.Contended,自己应用这个注解时,加上-XX:-RestrictContended
- LongAccumulator
- 扩展LongAdder、可以实现任意函数操作LongAccumulator(LongBinaryOperator, long)
- 函数式编程