线程通用

线程通用

线程中断

早期版本中线程提供stop方法来终止线程,但该方法已经被弃用

取而代之的是外部通知该线程应该结束了,但是具体什么时候结束交由线程自己控制

1
2
3
4
5
6
7
8
9
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
// do something
}
// 结束线程
});
thread.start();
// 外部通知线程中断
thread.interrupt();

注意一:如果在Thread.sleep()等会抛出InterruptedException受查异常等内置函数后面调用Thread.currentThread().isInterrupted()返回false

注意二:Thread.interrupted()线程静态函数会将中断状态清空,与Thread.currentThread().isInterrupted(true)效果一致

锁对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ReentrantLock mLock = new ReentrantLock();// 可重入锁对象
Condition mCondition = mLock.newCondition(); // 锁对象创建条件对象

public void method() {
mLokc.lock();
try{
while(//未达到条件) {
mCondition.await(); // 临时释放锁并进入阻塞状态
}
// do somethind
mCondition.signalAll(); //通知释放阻塞状态的地方允许重新获得锁
} catch (InterruptedException e) {

} finally {
mLock.unlock();
}
}

注意一:ReentrantLock 可多次调用lock()函数,但必须与unlock()函数成对出现

注意二:条件对象await必须是已经获得锁后调用,不然会爆出IllegalMonitorStateException异常

1
2
3
4
5
6
7
8
9
public void method() {
synchronized (this) {
while(//未达到条件) {
wait(); // 临时释放锁并进入阻塞状态
}
// do somethind
notifyAll(); // 通知释放阻塞状态的地方允许重新获得锁
}
}

该方式与锁对象方式一模一样

值得注意的是,条件对象调用的是**await()方法,同步方法调用的是wait()**方法,别用错了

volatile域

作用:内存可见性、禁止指令重排序

JVM要求实现volatile必须要实现内存屏障,最终调用汇编指令为:lock addl,由于lock指令后面必须跟一条指令,但是Intel限制了lock指令不能跟nop(空指令),所以后面添加了一个addl(加0指令)

LOCK 用于多处理器中执行指令时对共享内存的独占使用

他的作用时能够将当前处理器对应缓存的内容刷新到内存,并使其他处理器缓存失效

另外还提供了有序的指令无法越过内存屏障的作用,即lock指令前面的指令都无法越过这条指令

1
2
3
if (os::is_mp()) {// 是否是多核cpu,可见volatile在单核cpu里并没有使用lock指令
// 使用lock指令
}

内存屏障(JVM层面)

写:StoreStore - volatile写 - StoreLoad

读:volatile读 - LoadStore - LoadLoad

final变量

除了使用锁或volatitle修饰符,还有一种情况可以安全地访问一个共享域,即将该域声明为final:

final Map<String, String> map = new HashMap<>();

其它线程会在构造函数完成构造后才看到这个map变量

CAS

Unsafe内部最终调用汇编语句:多核:lock cmpxchg 单核:cmpxchg

cmpxchg:比较并交换操作数

原子性

java.util.concurrent.atomic包中有很多类使用高效的机器级指令来保证操作的原子性。

如:AtomicIntegergetAndIncrementincrementAndGet方法

1
2
3
4
5
6
7
8
AtomicInteger integer = new AtomicInteger();
integer.set(Math.max(integer.get(), number));//该操作不具有原子性,无法保证值的正常更新

do {
oldValue = integer.get();
newValue = Math.max(oldValue, number)
} while(!integer.compareAndSet(oldValue, newValue)); // 需要通过比对再设值的方式保证原子性,CAS
// java8 可以通过内置函数免写以上循环操作,如updateAndGet、accumulateAndGet等函数

注意一:如果大量线程要访问相同的类名开头为Atomic的原子值,性能会大幅下降,因为采用CAS乐观更新需要太多的循环重试操作。可通过类LongAddrLongAccumulatorDoubleAdderDoubleAccumulator来解决。

1
2
3
4
5
6
7
8
// 原理:采用多个变量(加数),其总和为当前值。可以有多个线程同时更新不同的加数,线程增加会自动提供新的加数。
// 只有当所有工作都完成之后才需要总和的值,这个情况下该类操作会相当高效
LongAdder adder
while(...) {
adder.increment();// 不会返回值,这样做回消除求和分解到多个加数所带来的性能提升
// long total = adder.sum();// 不推荐这么操作,会失去性能的提升,如必须该操作,则使用Atomic会更好
}
long total = adder.sum();

注意二:上述提到CAS操作更新会存在ABA的异常,可通过AtomicStampedReference带版本号的原子类进行修复

让自己的类具有原子性:AtomicReferenceAtomicReferenceFieldUpdater

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// AtomicReference
class MyAtomicClass {
// 使用AtomicReference对字符串进行封装
AtomicReference<String> atomicValue = new AtomicReference<>("HelloAtomic");
}

// 使用
public void execute() {
MyAtomicClass holder = new MyAtomicClass();
holder.atomicValue.compareAndSet("HelloAtomic", "World");
System.out.println(holder.atomicValue.get());
String value = holder.atomicValue.updateAndGet(new UnaryOperator<String>() {
@Override
public String apply(String s) {
return "HelloWorld";
}
});
System.out.println(value);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// AtomicReferenceFieldUpdater
class MyAtomicClass {
// 为value字段添加原子操作
public static final AtomicReferenceFieldUpdater<SimpleValueHolder, String> valueUpdater
= AtomicReferenceFieldUpdater.newUpdater(SimpleValueHolder.class, String.class, "value");
// 使用volatile声明
volatile String value = "HelloAtomic";
}

// 使用
public void execute() {
MyAtomicClass holder = new MyAtomicClass();
holder.valueUpdater.compareAndSet(holder, "HelloAtomic", "World");
System.out.println(holder.valueUpdater.get(holder));
String value = holder.valueUpdater.updateAndGet(holder, new UnaryOperator<String>() {
@Override
public String apply(String s) {
return "HelloWorld";
}
});
System.out.println(value);
}

AtomicReferenceAtomicReferenceFieldUpdater的区别:

两者的作用差不多,AtomicReference对字段进行包裹,AtomicReferenceFieldUpdater则为静态扩展字段功能

AtomicReferenceAtomicReferenceFIeldUpdater比起来,要多创建一个对象
对于 32 位的机器,这个对象的头占 12 个字节,它的成员占 4 个字节,也就是多出来 16 个字节
对于 64 位的机器,如果启动了指针压缩,那这个对象占用的也是 16 个字节
对于 64 位的机器,如果没启动指针压缩,那么这个对象就会占 24 个字节,其中对象头占 16 个字节,成员占 8 个字节

当要使用AtomicReference创建成千上万个对象时,这个开销就会变得很大

因为开销的原因,一般在实例较少的情况下如单例才会选择AtomicReference,不然推荐使用AtomicReferenceFieldUpdater

线程局部变量

线程间共享变量存在不同步异常问题,有些情况并不推荐使用共享变量,如:SimpleDateFormat类不是线程安全的。

假设有静态变量:

public static final SimpleDateFormat dateFormat = new SimpleDateForniat("yyyy-MM-dd");

如果有两个线程同时执行以下操作

String dateStamp = dateFormat.format(new DateO);

结果可能会混乱,因为dateFormat内部数据可能会被并发的访问所破会。当然可以在该函数使用同步锁,但开销会很大,或者再使用时构造一个局部SimpleDateForniat对象,不过会有点浪费。

推荐为每个线程构造实例:

1
2
public static final ThreadLocal<SimpleDateFormat> initial = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));// 在使用时才会初始化SimpleDateFormat
SimpleDateFormat simpleDateFormat = initial.get();

在多个线程中生成随机数也存在类似的问题。Random类是线程安全的,但是如果多个线程需要等待一个共享的随机数生成器会很低效。

可以通过:

1
ThreadLocalRandom.current().nextInt()

来解决此问题

锁超时

如果得不到锁可以执行其它逻辑,可以通过tryLock的方式获取锁

1
2
3
4
5
6
7
8
9
if(mlock.tryLock()) {
try {
// do something
} finally {
mlock.unlock();
}
} else {
// do something else
}

也可为tryLock添加超时逻辑,mlock.tryLock(100,TimeUnit.MILLISECONDS)

lockInterruptibly相当于无限时长的tryLock

同样锁条件也可在等待时添加超时,condition.await(100,TimeUnit.MILLISECONDS)

awaitUninterruptibly相当于无限时长的await,但是不会抛出异常

读写锁

在频繁读取少变更的情况下,可以使用读写锁来提高性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();

public double getData() {
readLock.lock();
try{
// ...
} finally {
readLock.unlock();
}
}

public void setData(double data) {
writeLock.lock();
try{
// ...
} finally {
writeLock.unlock();
}
}
线程池
  1. 通过Executors生成线程池

    方法 描述
    newCachedThreadPool 必要时创建新线程,空闲线程会被保留60 秒
    newFixedThreadPool 该池包含固定数量的线程,空闲线程会一直被保留
    newSingleThreadExecutor 只有一个线程的“ 池”, 该线程顺序执行每一个提交的任务
    newScheduledThreadPool 用于预定执行而构建的固定线程池
    newSingleThreadScheduledExecutor 用于预定执行而构建的单线程“ 池”
  1. 相关Scheduled线程池的用法:
    schedule:预定在指定的时间之后执行任务
    scheduleAtFixedRate:预定在初始的延迟结束后, 周期性地运行给定的任务, 周期长度是period
    scheduleWithFixedDelay:预定在初始的延迟结束后周期性地运行给定的任务, 在一次调用完成和下一次调用开
    始之间有长度为delay 的延迟

  2. 控制任务组

    并行处理任务

    1
    2
    3
    4
    5
    List<Callable<T>> tasks = ....;
    List<Future<T>> futures = executor.invokeAll(tasks);
    for(Future<T> result:results) {
    result.get();
    }

    以上方法有个缺点,必要要所有任务执行完毕了,get才会开始真正执行

    可通过ExecutorCompletionService类解决,使先完成的先返回

    1
    2
    3
    4
    5
    ExecutorCompletionService<T> service = new ExecutorCompletionService(executor);
    for (Callable<T> task : tasks)
    service,submit(task);
    for (int i = 0; i < tasks.sizeO;i ++)
    processFurther(service.take().get());
  3. Fork-Join框架

    在未满足最小计算单位时拆分成更多小的任务进行并行操作,以提高效率

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public Task<T> extends RecursiveTask<T> {
    private static final int THRESHOLD = 10;
    private final int from;
    private final int to;
    public Task(int from, int to) {
    this.from = from;
    this.to = to;
    }

    protected T compute() {
    if (to - from < THRESHOLD) {
    // do something
    return T;
    } else {
    int mid = (from + to)/2;
    Task first = new Task(from, mid);
    Task second = new Task(mid,to);
    invokeAll(first, second);
    return first.join() + second.join();
    }
    }
    }

    Task task = new Task(0,100);
    ForkJoinPool pool = new ForkJoinPool();
    pool.invoke(task)
    task.join();
  4. 可完成Future【CompletableFuture】

    由于单纯使用Future会造成线程阻塞,java8引用了CompletableFuture来对Future进行改进,实现异步调用的逻辑

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public static void main(String[] args) throws Exception {
    // 创建异步执行任务:
    CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);
    // 如果执行成功:
    cf.thenAccept((result) -> {
    System.out.println("price: " + result);
    });
    // 如果执行异常:
    cf.exceptionally((e) -> {
    System.out.println("error: " + e.getMessage());
    return null;
    });
    // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
    Thread.sleep(200);
    }

    static Double fetchPrice() {
    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {}
    if (Math.random() < 0.3) {
    throw new RuntimeException("fetch price failed!");
    }
    return 5 + Math.random() * 20;
    }
  5. 同步器

    能做什么 说明
    CyclicBarrier 允许线程集等待直至其中预定数目的线程到达一个公共障栅( barrier),然后可以选择执行一个处理障栅的动作 当大量的线程需要在它们的结果可用之前完成时
    Phaser 类似于循环障栅, 不过有一个可变的计数 Java SE 7 中引人
    CountDownLatch 允许线程集等待直到计数器减为0 当一个或多个线程需要等待直到指定数目的事件发生
    Exchanger 允许两个线程在要交换的对象准备好时交换对象 当两个线程工作在同一数据结构的两个实例上的时候, 一个向实例添加数据而另一个从实例清除数据
    Samaphore 允许线程集等待直到被允许继续运行为止 限制访问资源的线程总数。如果许可数是1,常常阻塞线程直到另一个线程给出许可为止
    SynchronousQueue 允许一个线程把对象交给另一个线程 在没有显式同步的情况下, 当两个线程准备好将一个对象从一个线程传递到另一个时