线程通用
线程中断
早期版本中线程提供stop方法来终止线程,但该方法已经被弃用
取而代之的是外部通知该线程应该结束了,但是具体什么时候结束交由线程自己控制
1 | Thread thread = new Thread(() -> { |
注意一:如果在Thread.sleep()
等会抛出InterruptedException
受查异常等内置函数后面调用Thread.currentThread().isInterrupted()
返回false
注意二:Thread.interrupted()
线程静态函数会将中断状态清空,与Thread.currentThread().isInterrupted(true)
效果一致
锁对象
1 | ReentrantLock mLock = new ReentrantLock();// 可重入锁对象 |
注意一:ReentrantLock 可多次调用lock()函数,但必须与unlock()函数成对出现
注意二:条件对象await必须是已经获得锁后调用,不然会爆出IllegalMonitorStateException异常
1 | public void method() { |
该方式与锁对象方式一模一样
值得注意的是,条件对象调用的是**await()方法,同步方法调用的是wait()**方法,别用错了
volatile域
作用:内存可见性、禁止指令重排序
JVM要求实现volatile必须要实现内存屏障,最终调用汇编指令为:lock addl
,由于lock
指令后面必须跟一条指令,但是Intel限制了lock
指令不能跟nop(空指令)
,所以后面添加了一个addl(加0指令)
LOCK 用于多处理器中执行指令时对共享内存的独占使用
他的作用时能够将当前处理器对应缓存的内容刷新到内存,并使其他处理器缓存失效
另外还提供了有序的指令无法越过内存屏障的作用,即lock指令前面的指令都无法越过这条指令
1 | if (os::is_mp()) {// 是否是多核cpu,可见volatile在单核cpu里并没有使用lock指令 |
内存屏障(JVM层面)
写:StoreStore
- volatile写
- StoreLoad
读:volatile读
- LoadStore
- LoadLoad
final变量
除了使用锁或volatitle
修饰符,还有一种情况可以安全地访问一个共享域,即将该域声明为final:
final Map<String, String> map = new HashMap<>();
其它线程会在构造函数完成构造后才看到这个map变量
CAS
Unsafe内部最终调用汇编语句:多核:lock cmpxchg
单核:cmpxchg
cmpxchg
:比较并交换操作数
原子性
java.util.concurrent.atomic
包中有很多类使用高效的机器级指令来保证操作的原子性。如:
AtomicInteger
的getAndIncrement
和incrementAndGet
方法
1 | AtomicInteger integer = new AtomicInteger(); |
注意一:如果大量线程要访问相同的类名开头为Atomic的原子值,性能会大幅下降,因为采用CAS乐观更新需要太多的循环重试操作。可通过类LongAddr
、LongAccumulator
、DoubleAdder
和DoubleAccumulator
来解决。
1 | // 原理:采用多个变量(加数),其总和为当前值。可以有多个线程同时更新不同的加数,线程增加会自动提供新的加数。 |
注意二:上述提到CAS操作更新会存在ABA的异常,可通过AtomicStampedReference
带版本号的原子类进行修复
让自己的类具有原子性:AtomicReference
和AtomicReferenceFieldUpdater
1 | // AtomicReference |
1 | // AtomicReferenceFieldUpdater |
AtomicReference与AtomicReferenceFieldUpdater的区别:
两者的作用差不多,
AtomicReference
对字段进行包裹,AtomicReferenceFieldUpdater
则为静态扩展字段功能
AtomicReference
和AtomicReferenceFIeldUpdater
比起来,要多创建一个对象
对于 32 位的机器,这个对象的头占 12 个字节,它的成员占 4 个字节,也就是多出来 16 个字节
对于 64 位的机器,如果启动了指针压缩,那这个对象占用的也是 16 个字节
对于 64 位的机器,如果没启动指针压缩,那么这个对象就会占 24 个字节,其中对象头占 16 个字节,成员占 8 个字节当要使用
AtomicReference
创建成千上万个对象时,这个开销就会变得很大因为开销的原因,一般在实例较少的情况下如单例才会选择
AtomicReference
,不然推荐使用AtomicReferenceFieldUpdater
线程局部变量
线程间共享变量存在不同步异常问题,有些情况并不推荐使用共享变量,如:SimpleDateFormat
类不是线程安全的。
假设有静态变量:
public static final SimpleDateFormat dateFormat = new SimpleDateForniat("yyyy-MM-dd");
如果有两个线程同时执行以下操作
String dateStamp = dateFormat.format(new DateO);
结果可能会混乱,因为dateFormat
内部数据可能会被并发的访问所破会。当然可以在该函数使用同步锁,但开销会很大,或者再使用时构造一个局部SimpleDateForniat
对象,不过会有点浪费。
推荐为每个线程构造实例:
1 | public static final ThreadLocal<SimpleDateFormat> initial = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));// 在使用时才会初始化SimpleDateFormat |
在多个线程中生成随机数也存在类似的问题。Random类是线程安全的,但是如果多个线程需要等待一个共享的随机数生成器会很低效。
可以通过:
1 | ThreadLocalRandom.current().nextInt() |
来解决此问题
锁超时
如果得不到锁可以执行其它逻辑,可以通过tryLock的方式获取锁
1 | if(mlock.tryLock()) { |
也可为tryLock添加超时逻辑,mlock.tryLock(100,TimeUnit.MILLISECONDS)
lockInterruptibly
相当于无限时长的tryLock
同样锁条件也可在等待时添加超时,condition.await(100,TimeUnit.MILLISECONDS)
awaitUninterruptibly
相当于无限时长的await
,但是不会抛出异常
读写锁
在频繁读取少变更的情况下,可以使用读写锁来提高性能
1 | ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
线程池
通过
Executors
生成线程池方法 描述 newCachedThreadPool 必要时创建新线程,空闲线程会被保留60 秒 newFixedThreadPool 该池包含固定数量的线程,空闲线程会一直被保留 newSingleThreadExecutor 只有一个线程的“ 池”, 该线程顺序执行每一个提交的任务 newScheduledThreadPool 用于预定执行而构建的固定线程池 newSingleThreadScheduledExecutor 用于预定执行而构建的单线程“ 池”
相关
Scheduled
线程池的用法:
schedule
:预定在指定的时间之后执行任务
scheduleAtFixedRate
:预定在初始的延迟结束后, 周期性地运行给定的任务, 周期长度是period
scheduleWithFixedDelay
:预定在初始的延迟结束后周期性地运行给定的任务, 在一次调用完成和下一次调用开
始之间有长度为delay 的延迟控制任务组
并行处理任务
1
2
3
4
5List<Callable<T>> tasks = ....;
List<Future<T>> futures = executor.invokeAll(tasks);
for(Future<T> result:results) {
result.get();
}以上方法有个缺点,必要要所有任务执行完毕了,
get
才会开始真正执行可通过
ExecutorCompletionService
类解决,使先完成的先返回1
2
3
4
5ExecutorCompletionService<T> service = new ExecutorCompletionService(executor);
for (Callable<T> task : tasks)
service,submit(task);
for (int i = 0; i < tasks.sizeO;i ++)
processFurther(service.take().get());Fork-Join框架
在未满足最小计算单位时拆分成更多小的任务进行并行操作,以提高效率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public Task<T> extends RecursiveTask<T> {
private static final int THRESHOLD = 10;
private final int from;
private final int to;
public Task(int from, int to) {
this.from = from;
this.to = to;
}
protected T compute() {
if (to - from < THRESHOLD) {
// do something
return T;
} else {
int mid = (from + to)/2;
Task first = new Task(from, mid);
Task second = new Task(mid,to);
invokeAll(first, second);
return first.join() + second.join();
}
}
}
Task task = new Task(0,100);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(task)
task.join();可完成Future【CompletableFuture】
由于单纯使用Future会造成线程阻塞,java8引用了
CompletableFuture
来对Future
进行改进,实现异步调用的逻辑1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public static void main(String[] args) throws Exception {
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);
// 如果执行成功:
cf.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 如果执行异常:
cf.exceptionally((e) -> {
System.out.println("error: " + e.getMessage());
return null;
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(200);
}
static Double fetchPrice() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {}
if (Math.random() < 0.3) {
throw new RuntimeException("fetch price failed!");
}
return 5 + Math.random() * 20;
}同步器
类 能做什么 说明 CyclicBarrier 允许线程集等待直至其中预定数目的线程到达一个公共障栅( barrier),然后可以选择执行一个处理障栅的动作 当大量的线程需要在它们的结果可用之前完成时 Phaser 类似于循环障栅, 不过有一个可变的计数 Java SE 7 中引人 CountDownLatch 允许线程集等待直到计数器减为0 当一个或多个线程需要等待直到指定数目的事件发生 Exchanger 允许两个线程在要交换的对象准备好时交换对象 当两个线程工作在同一数据结构的两个实例上的时候, 一个向实例添加数据而另一个从实例清除数据 Samaphore 允许线程集等待直到被允许继续运行为止 限制访问资源的线程总数。如果许可数是1,常常阻塞线程直到另一个线程给出许可为止 SynchronousQueue 允许一个线程把对象交给另一个线程 在没有显式同步的情况下, 当两个线程准备好将一个对象从一个线程传递到另一个时