文章目录
19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?
CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而 CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但 CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。
20 | 并发容器-选对容器,才是最关键的
1. List----CopyOnWriteArrayList
写时复制,读操作完全无锁。
CopyOnWriteArrayList 内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组,需要注意的是 它的迭代器是只读的,不支持增删改。因为迭代器遍历的仅仅是一个快照,而对快照进行增删改是没有意义的。
如果在遍历 array 的同时,还有一个写操作,例如增加或者修改元素,CopyOnWriteArrayList 是如何处理的呢?
CopyOnWriteArrayList 会将 array 复制一份,然后在新复制处理的数组上执行增加元素的操作,执行完之后再将 array 指向这个新的数组。通过下图你可以看到,读写是可以并行的,遍历操作一直都是基于原 array 执行,而写操作则是基于新 array 进行。
坑点:
- 仅仅适用于写操作场景比较少的情况。
- 能够容忍读写的短暂不一致。例如上面的例子中,写入的新元素并不能立刻被遍历到
2. Map----ConcurrentHashMap 和 ConcurrentSkipListMap
- ConcurrentHashMap 的 key 是无序的,而 ConcurrentSkipListMap 的 key 是有序的。
- 使用上面两种 Map时,它们的 key 和 value 都不能为空,否则会抛出NullPointerException这个运行时异常。对五种 Hash 类型的数据总结如下:
本质上两者的实现就是 散列表 和 跳表。跳表见:跳表
3. Set----CopyOnWriteArraySet 和 ConcurrentSkipListSet
Set 接口的两个实现是 CopyOnWriteArraySet 和 ConcurrentSkipListSet,使用场景同上。
4. Queue
阻塞与非阻塞
所谓阻塞指的是当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。
(1)单端阻塞队列
成员有:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、LinkedTransferQueue、PriorityBlockingQueue 和 DelayQueue。
LinkedTransferQueue 融合 LinkedBlockingQueue 和 SynchronousQueue 的功能,性能比 LinkedBlockingQueue 更好;PriorityBlockingQueue 支持按照优先级出队;DelayQueue 支持延时出队。
(2)双端阻塞队列:只有 LinkedBlockingDeque。
(3)单端非阻塞队列:只有 ConcurrentLinkedQueue
(4)双端非阻塞队列:只有 ConcurrentLinkedDeque
在使用队列时,需要格外注意队列是否支持有界(所谓有界指的是内部的队列是否有容量限制)。实际工作中,一般都不建议使用无界的队列,因为数据量大了之后很容易导致 OOM。上面我们提到的这些 Queue 中,只有 ArrayBlockingQueue 和 LinkedBlockingQueue 是支持有界的,所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患。
21 | 原子类:无锁工具类的典范
无锁的实现原理:通过硬件CPU实现CAS指令去支持。CAS的实现大体是:CAS 指令包含 3 个参数:
共享变量的内存地址 A、用于比较的值 B 和共享变量的新值 C;并且只有当内存中地址 A 处的值等于 B 时,才能将内存中地址 A 处的值更新为新值 C。
使用模拟代码来解释就是:
class SimulatedCAS{
int count;
synchronized int cas(
int expect, int newValue){
// 读目前count的值
int curValue = count;
// 比较目前count值是否==期望值
if(curValue == expect){
// 如果是,则更新count的值
count = newValue;
}
// 返回写入前的值
return curValue;
}
}
使用 CAS,一般都会伴随着自旋,即循环尝试。
ABA问题:
class SimulatedCAS{
volatile int count;
// 实现count+=1
addOne(){
do {
newValue = count+1; //①
}while(count !=
cas(count,newValue) //②
}
// 模拟实现CAS,仅用来帮助理解
synchronized int cas(
int expect, int newValue){
// 读目前count的值
int curValue = count;
// 比较目前count值是否==期望值
if(curValue == expect){
// 如果是,则更新count的值
count= newValue;
}
// 返回写入前的值
return curValue;
}
}
假设 count 原本是 A,线程 T1 在执行完代码①处之后,执行代码②处之前,有可能 count 被线程 T2 更新成了 B,之后又被 T3 更新回了 A,这样线程 T1 虽然看到的一直是 A,但是其实已经被其他线程更新过了,这就是 ABA 问题。请注意:这里虽然两个A是一样的,但是第二个A的某些属性可能发生了变化,与之前有所不一致了
Java中实现CAS的经典示例–getAndIncrement()方法
public final int getAndIncrement() {
return U.getAndAddInt(this, VALUE, 1);
}
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
即:
do {
// 获取当前值
oldV = xxxx;
// 根据当前值计算新值
newV = ...oldV...
}while(!compareAndSet(oldV,newV);
原子类概述
主要分为五类:
原子化的基本数据类型、原子化的对象引用类型、原子化数组、原子化对象属性更新器和原子化的累加器。
- 原子化的基本数据类型
主要有:AtomicBoolean、AtomicInteger 和 AtomicLong 三种,常用方法有:
getAndIncrement() //原子化i++
getAndDecrement() //原子化的i--
incrementAndGet() //原子化的++i
decrementAndGet() //原子化的--i
//当前值+=delta,返回+=前的值
getAndAdd(delta)
//当前值+=delta,返回+=后的值
addAndGet(delta)
//CAS操作,返回是否成功
compareAndSet(expect, update)
//以下四个方法
//新值可以通过传入func函数来计算
getAndUpdate(func)
updateAndGet(func)
getAndAccumulate(x,func)
accumulateAndGet(x,func)
- 原子化的对象引用类型
相关实现有 AtomicReference、AtomicStampedReference 和 AtomicMarkableReference,不过需要注意的是,对象引用的更新需要重点关注 ABA 问题,AtomicStampedReference 和 AtomicMarkableReference 这两个原子类可以解决 ABA 问题。 - 原子化数组
相关实现有 AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray,利用这些原子类,我们可以原子化地更新数组里面的每一个元素。 - 原子化对象属性更新器
相关实现有 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater,利用它们可以原子化地更新对象的属性 - 原子化的累加器
DoubleAccumulator、DoubleAdder、LongAccumulator 和 LongAdder,这四个类仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持 compareAndSet() 方法。如果你仅仅需要累加操作,使用原子化的累加器性能会更好。
所有原子类的方法都是针对一个共享变量的,如果你需要解决多个变量的原子性问题,建议还是使用互斥锁
线程池—最核心的是 ThreadPoolExecutor
Java 并发包里还提供了一个线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。不过目前大厂的编码规范中基本上都不建议使用 Executors 了,主要是因为:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列。
Future 获取线程执行结果
- ThreadPoolExecutor 提供的 3 个 submit() 方法
// 提交Runnable任务
Future<?>
submit(Runnable task);
// 提交Callable任务
<T> Future<T>
submit(Callable<T> task);
// 提交Runnable任务及结果引用
<T> Future<T>
submit(Runnable task, T result);
- Future 接口有 5 个方法。需要注意的是:get 方法是阻塞的吆,如果调用的线程,跑到了future.get() 方法,而任务还没有执行完成,那么调用线程就会阻塞哦
// 取消任务
boolean cancel(
boolean mayInterruptIfRunning);
// 判断任务是否已取消
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);
CompletableFuture 异步编程
Java 在 1.8 版本提供了 CompletableFuture 来支持异步编程。可阅读 RxJava 源码来学习异步编程。
CompletionService 批量执行异步任务
CompletionService 的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是 CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中,而上面的示例代码是把任务最终的执行结果放入了阻塞队列中。
// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> cs = new
ExecutorCompletionService<>(executor);
// 异步向电商S1询价
cs.submit(()->getPriceByS1());
// 异步向电商S2询价
cs.submit(()->getPriceByS2());
// 异步向电商S3询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute(()->save(r));
}
如果没有指定 completionQueue,因此默认使用无界的 LinkedBlockingQueue。之后通过 CompletionService 接口提供的 submit() 方法提交了三个询价操作,这三个询价操作将会被 CompletionService 异步执行。最后,我们通过 CompletionService 接口提供的 take() 方法获取一个 Future 对象(前面我们提到过,加入到阻塞队列中的是任务执行结果的 Future 对象),调用 Future 对象的 get() 方法就能返回询价操作的执行结果了。
Fork/Join:单机版的MapReduce
对于简单的并行任务,你可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;而批量的并行任务,则可以通过 CompletionService 来解决。
看看如何用 Fork/Join 这个并行计算框架计算斐波那契数列(下面的代码源自 Java 官方示例)。首先我们需要创建一个分治任务线程池以及计算斐波那契数列的分治任务,之后通过调用分治任务线程池的 invoke() 方法来启动分治任务。由于计算斐波那契数列需要有返回值,所以 Fibonacci 继承自 RecursiveTask。分治任务 Fibonacci 需要实现 compute() 方法,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算 Fibonacci(n - 1) 使用了异步子任务,这是通过 f1.fork() 这条语句实现的。
static void main(String[] args){
//创建分治任务线程池
ForkJoinPool fjp =
new ForkJoinPool(4);
//创建分治任务
Fibonacci fib =
new Fibonacci(30);
//启动分治任务
Integer result =
fjp.invoke(fib);
//输出结果
System.out.println(result);
}
//递归任务
static class Fibonacci extends
RecursiveTask<Integer>{
final int n;
Fibonacci(int n){
this.n = n;}
protected Integer compute(){
if (n <= 1)
return n;
Fibonacci f1 =
new Fibonacci(n - 1);
//创建子任务
f1.fork();
Fibonacci f2 =
new Fibonacci(n - 2);
//等待子任务结果,并合并结果
return f2.compute() + f1.join();
}
}