Java并发
并发编程
多线程在多核心机器中能减少资源浪费,充分利用多核性能。在单核心机器中能提高吞吐率。
可见性
和指令重排
是编发编程中出现问题的原因所在。Java内存模型分为主内存
和工作内存
两部分。JMM规定,线程写值时只能写到工作内存
,不能直接写到主内存
。JVM定期(内存屏障)将工作内存
的值刷会主内存。同样,读取共享变量
的值时只能从工作内存
中读取,工作内存
不能直接读取主内存
。
1 | public class NoViisibility { |
内存屏障
Java有工作内存
和主内存
,工作内存要定期将内容刷回主内存才能保证可见。那什么时候刷回去呢?在Java里面遇到内存屏障指令时就会将工作内存和主内存进行同步。例如,我们使用synchronized
可以保证可见性,这是因为在synchronized
代码块的第一行和最后一行会插入内存屏障,进入synchronized
代码块时会从主内存获取一遍变量,退出synchronized
代码块时会将工作内存刷回主存。
synchronized与volatile
synchronized
能保证可见性和原子性,volatile
只能保证可见性。看下面的例子:
1 |
|
上例不是并发安全的。虽然我们能看到count
的新值,但是count++
是一个符合操作。要将incr
变为原子操作只能使用synchronized
关键字。
1 |
|
volatile
一般应用在标志位变量1
2
3
4
5
6
7
8
9
10
11
12
13public class Loop {
private volatile boolean shutdown;
public void start() {
while(!shutdown) {
...
}
}
public void shutdown() {
shutdown = true;
}
}
并发与同步
并发程序可以并行执行任务也可以串行来执行。并发是去同时应对多个任务,并行是同时去做多种任务。
并发在同时应对多种任务时,需要去处理同步的问题。
并发编程中的问题
死锁
根本原因是要获取多把锁,但是不同线程加锁顺序不同。
饥饿
一直等待某种状态。
活锁
是指线程1可以使用资源,但它很礼貌,让其他线程先使用资源。线程2也可以使用资源,但它很绅士,也让其他线程先使用资源。这样你让我,我让你,最后两个线程都无法使用资源。
线程
start
启动一个线程。
run
直接在当前线程内运行。可用于线程封装,比如线程池内运行线程可以直接在线程池线程中调用被传入线程的
run
方法。sleep
线程休眠,会释放CPU资源但是不会释放锁。
yield
短暂让出CPU资源,不像sleep时间结束后进入
RUNNABLE
状态,它会立即进入RUNNABLE
状态等待CPU资源。它同样不会释放锁。join
用于线程协调。调用
t.join()
当前线程会等待线程t
执行完再继续执行。interrupt
中断,类似一种信号,是一种协商机制。具体查看下面的中断机制。
wait、notify/notifyAll
这两者属于Object类上的方法,用于线程通讯(其实没有讯息,类似一个通知机制)。具体查看下面的内置条件队列。
线程状态
线程中断
interrupted
是一种协商机制,中断机制是一种协作机制,也就是说通过中断并不能直接终止另一个线程,而需要被中断的线程自己处理中断。可以理解为进程通信中的信号,例如kill -n pid
。interrupt
信号是通知线程应该中断了,具体到底中断还是继续运行,应该由被通知的线程自己处理。
public void interrupt()
- 将调用者线程的中断状态设为true。
被通知中断的线程在阻塞时,会抛出
InterruptedException
异常,同时将中断状态修改为false。如果线程被中断
interrupt()
后,我们并不想立即中断它那么我们需要重置中断状态为未中断
。不然很多依赖中断状态isInterrupted()
的方法会出问题。1
2
3
4
5
6try {
...
} catch (InterruptedException ignored) {
// 重置中断状态为 true
Thread.currentThread().interrupt();
}
public boolean isInterrupted()
判断调用者线程的中断状态。
1
2
3public boolean isInterrupted() {
return isInterrupted(false);
}
public static boolean interrupted()
- 返回当前线程的中断状态
将当前线程的中断状态设为false
1
2
3
4
5
6
7
8
9
10
11
12/**
* Tests if some Thread has been interrupted. The interrupted state
* is reset or not based on the value of ClearInterrupted that is
* passed.
* @param ClearInterrupted 是否清除中断状态
* 清除中断状态
*/
private native boolean isInterrupted(boolean ClearInterrupted);
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
1 | public class Interrupted { |
如何中断任务?
可中断任务1
2
3
4
5
6try {
Thread.sleep(1000);
} catch (InterruptedException ignore) {
// 恢复中断状态
Thread.currentThread.interrupt();
}
不可中断非阻塞任务1
2
3while (!Thread.currentThread().isInterrupted()) {
System.out.println(".");
}
不可中断阻塞任务1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public class SocketThread extends Thread {
private Socket socket;
public SocketThread(Socket socket) {
this.socket = socket;
}
/**
* 处理中断
* 关闭socket触发IOException
*/
public void interrupt() {
try {
socket.close();
} catch (IOException ignore) {
} finally {
super.interrupt();
}
}
public void run() {
try (ReadableByteChannel channel = Channels.newChannel(socket.getInputStream())) {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024 * 10);
// 这里会阻塞
while (-1 != channel.read(buffer)) {
buffer.flip();
// write
buffer.clear();
}
} catch (IOException ignored) {
// socket.close() 触发 IOException 线程退出
}
}
}
Shutdown Hook
JVM在退出前会首先调用所有注册的关闭钩子。JVM不保证调用顺序(可以注册多个钩子),钩子必须是线程安全的。我们在钩子里面可以做清理工作。钩子应当尽快退出,它会延迟JVM结束时间。1
2
3
4Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// 清理工作
System.out.println("exit");
}));
内置条件队列
这里的条件是一种状态,当达到某种状态后我们可以继续后面的操作。例如,队列空
、队列满
都是一种状态,当队列为空时我们不能take
操作,当队列满时我们不能push
操作。当遇到上述状态时,我们可以可能需要等待,等待状态变更我们能进一步执行。等待的方式有多种,我们可以自旋
也可以休眠
。如果等待时间过久,自旋
会浪费大量的CPU资源。如果休眠
我们系统将不够灵敏。如果能有一种通知机制
,当状态变更时通知我们的线程醒来再次检查状态。
Java里面wait/notify就是这种机制。当我们在一个把锁上wait
时,线程将释放锁并且CPU资源处于休眠状态。当在一把锁上调用notify/notifyAll
时将唤醒在同一把锁上等待的线程。这既避免了自旋
过久对于CPU的浪费,也解决了休眠
时不能立即唤醒的问题。
我们可以看做每一把锁上面都有一个队列,队列里面存放的是这把锁上wait
的线程,当在这把锁上调用notify/notifyAll
时会唤醒队列里面的线程。
1 | public class Queue<T> { |
自Java 1.5开始,Java并发包提供了新的选择,我们可以使用Condition
来实现条件队列。它与传统的不同之处在于,在同一把锁上,我们可以创建多个条件队列,不同的条件可以放到不同的队列中。上例中有个弊端,当我们notifyAll
时isEmpty
和isFull
条件等待的线程都会被唤醒。另外一个弊端是所有线程都会被唤醒,但是不是所有的线程获取锁再继续执行时就能满足条件,有可能会再次进入等待。例如对于生产者消费者模型,有多个消费者等待而此时只有一个生产者只生产了一个,那么所有消费者都被唤醒时,只有一个会成功抢到任务。其它会在唤醒后再次等待。还有一种情况是由于所有条件都在一个队列中,这个队列中可能既队列空
又有队列满
两种等待条件。我们一个notifyAll
会唤醒两者,而实际情况是我们只想唤醒一种条件。例如,push
时只想唤醒队列空
条件的等待,而take
时只想唤醒队列满
条件的等待。这种无差别的唤醒同样会造成一种浪费。
1 | public class Queue<T> { |
同步工具
同步容器
Vector
最初版本的同步集合,所有方法均使用synchronized加锁同步。
HashTable
最初版本的同步哈希表,所有方法均加锁同步。
Collections.synchronized(Collection|List|Map|Set|SortMap|SortSet)
以上同步容器使用装饰器模式实现,将一个线程不安全的List/Map封闭在容器内部,通过同一把锁(同步容器本身)保护对对象的所有操作(和Vector类似)。这种方式最重要的一点是防止被封闭的对象逸出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34static class SynchronizedList<E>
extends SynchronizedCollection<E>
implements List<E> {
private static final long serialVersionUID = -7754090372962971524L;
final List<E> list;
SynchronizedList(List<E> list) {
super(list);
this.list = list;
}
SynchronizedList(List<E> list, Object mutex) {
super(list, mutex);
this.list = list;
}
...
public E get(int index) {
// 所有操作上都使用锁同步
synchronized (mutex) {return list.get(index);}
}
public E set(int index, E element) {
synchronized (mutex) {return list.set(index, element);}
}
public void add(int index, E element) {
synchronized (mutex) {list.add(index, element);}
}
public E remove(int index) {
synchronized (mutex) {return list.remove(index);}
}
...
}
并发容器
CopyOnWriteArrayList
写入时复制的思想,每次更新时都会重新copy一份新的数据。由于每次修改都会复制底层数组,当容器规模较大时将会产生较大的开销。对于容器修改尽量调用批量操作的API,减少容器数据复制操作。这种容器主要用在读多写少的场景中。
多线程可以同时对容器进行迭代,不会彼此干扰或与修改容器的线程相互干扰。写入时复制不会抛出
ConcurrentModificationException
异常,迭代的是创建迭代器时的元素,修改操作不会对迭代的数据有影响(修改后被迭代的数组和容器此时的数组已经不是同一个了,因此也不可能读到最新容器的变化)。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// CopyOnWriteArrayList.add
public boolean add(E e) {
// 排它锁保证了并发的安全性(可见性/原子性),所有的更新操作使用同一把锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 容器底层数组
Object[] elements = getArray();
int len = elements.length;
// !!!重点
// 复制原数组到新数组中
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 将元素添加到新数组中
newElements[len] = e;
// 重置容器底层数组
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
// 遍历操作
// 遍历的是创建迭代器时的数组,后面再修改已经不是修改的此数组了。
public ListIterator<E> listIterator() {
return new COWIterator<E>(getArray(), 0);
}
ConcurrentHashMap
ConcurrentHashMap
使用锁分段
的思想,将原来的一个锁应用在整个容器上拆分为多个锁每个锁锁定容器一部分数据的形式。这能减少锁竞争,提高程序的可伸缩性。
队列
Queue、Deque
- | - | - |
---|---|---|
add | 增加一个元索 | 如果队列已满,则抛出一个IIIegaISlabEepeplian异常 |
remove | 移除并返回队列头部的元素 | 如果队列为空,则抛出一个NoSuchElementException异常 |
element | 返回队列头部的元素 | 如果队列为空,则抛出一个NoSuchElementException异常 |
offer | 添加一个元素并返回true | 如果队列已满,则返回false |
poll | 移除并返问队列头部的元素 | 如果队列为空,则返回null |
peek | 返回队列头部的元素 | 如果队列为空,则返回null |
put | 添加一个元素 | 如果队列满,则阻塞 |
take | 移除并返回队列头部的元素 | 如果队列为空,则阻塞 |
LinkedBlockingQueue
底层有链表实现。
ArrayBlockingQueue
底层有数组实现。
DelayQueue
延迟队列,只有延迟期满时才能从中提取元素。
SynchronousQueue
SynchronousQueue
并不是一个真正的队列,它没有存储功能,不缓存元素。它维护的是是一组工作线程,任务直接从生产者到消费者手中,中间没有延迟。newCachedThreadPool
使用的正是这种队列。
同步工具类
CountDownLatch
- 确保某个计算在其需要的所有资源都被初始化之后才继续执行。
- 确保某个服务在其所依赖的其它所有服务都启动后才启动。
等待知道某个动作的所有参与者都就绪再继续执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33/**
* 模拟田径比赛
*/
public class Track {
public static void main(String[] args) throws InterruptedException {
// 发令枪
CountDownLatch startingGun = new CountDownLatch(1);
Random random = new Random();
for (int i = 0; i < 5; i++) {
// runner code
final int number = i;
// runner
new Thread(() -> {
long start = 0;
System.out.printf("Runner-%d 准备好了\n", number);
try {
startingGun.await(); // 所有线程到这里开始等待一起执行
start = System.currentTimeMillis();
// 随机跑步时间
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException ignore) { }
System.out.printf("Runner-%d 用时:%d ms\n", number, System.currentTimeMillis() - start);
}).start();
}
Thread.sleep(1000);
startingGun.countDown(); // startingGun
}
}
FutureTask
多个耗时的任务可以异步执行,通过
get
拿到执行后的结果。任务只会执行一次,最终状态不会改变(类似Promise)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public class FutureTaskTest {
public static void main(String[] args) {
Random random = new Random();
Callable<Integer> callable = () -> {
// 模拟计算耗时
Thread.sleep(random.nextInt(1000));
// 返回结果
return random.nextInt(10);
};
FutureTask<Integer> queryOne = new FutureTask<>(callable);
FutureTask<Integer> queryTwo = new FutureTask<>(callable);
try {
new Thread(queryOne).start();
new Thread(queryTwo).start();
// 未执行完的情况下,get时会阻塞。
// 最终执行时间取决于执行时间最长的任务。
int revOne = queryOne.get();
int revTwo = queryTwo.get();
System.out.printf("%d + %d = %d\n", revOne, revTwo, revOne + revTwo);
System.out.println(queryOne.get() + queryTwo.get());
} catch (InterruptedException ignore) {
// e.printStackTrace();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
//if (cause instanceof ) {
//
//} else {
//
//}
}
}
}Semaphore
Semaphore中管理者一组虚拟的许可证,许可证的数量通过构造函数指定。获得操作前只有获得许可证后才能继续执行,执行结束后释放许可证。常用于连接池等类似的场景。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25// 模拟银行窗口
public class BankWindow {
public static void main(String[] args) {
Random random = new Random();
// 共六个窗口工作
Semaphore window = new Semaphore(6);
// 共20个顾客在排队办理
for (int i = 0; i < 20; i++) {
final int number = i;
new Thread(() -> {
try {
System.out.printf("顾客%d等待中..\n", number);
window.acquire(); // 获得一个牌
System.out.printf("开始为顾客%d办理业务\n", number);
Thread.sleep(random.nextInt(1000));
System.out.printf("顾客%d办理完成\n", number);
} catch (InterruptedException ignore) {
} finally {
window.release(); // 归还牌
}
}).start();
}
}
}Barrier
同
CountDownLatch
不同的地方是CyclicBarrier
是先干后集合,而CountDownLatch
一般是先集合后开始干。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// 模拟银行金库。
// 金库大门共三把锁,分别有三个经理拿着钥匙。
// 只有三个经理人都开锁时,金库大门才会开。
public class BankVault {
public static void main(String[] args) {
Random random = new Random();
int lockNumber = 3;
// 保险库大门有三把锁
CyclicBarrier bankVaultDoor = new CyclicBarrier(lockNumber, () -> {
System.out.println("金库已开");
});
for (int i = 0; i < lockNumber; i++) {
final int number = i;
// 经理人
new Thread(() -> {
try {
System.out.printf("经理%d开始开锁\n", number);
// 模拟每个经理人的开锁时间
Thread.sleep(random.nextInt(1000));
System.out.printf("经理%d开始开锁完成\n", number);
// 开锁
bankVaultDoor.await();
} catch (InterruptedException ignore) {
} catch (BrokenBarrierException e) {
//await中断
}
}).start();
}
}
}ForkJoin
ForkJoin采用分治算法,将大任务拆分成小任务来处理。通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
- RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。
常用模式(分治):
1
2
3
4
5
6
7
8
9
10
11
12Result solve(Problem problem) {
if (problem is small) {
// 直接解决
directly solve problem
} else {
// 拆分问题
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}Fork/Join求和
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46public class ForkJoinSum extends RecursiveTask<Long> {
private int[] nums;
private int low;
private int high;
private int THRESHOLD = 10;
public ForkJoinSum(int[] nums) {
this(nums, 0, nums.length);
}
public ForkJoinSum(int[] nums, int low, int high) {
this.nums = nums;
this.low = low;
this.high = high;
}
protected Long compute() {
// 小任务直接处理
if (high - low < THRESHOLD) {
return Arrays.stream(Arrays.copyOfRange(nums, low, high)).asLongStream().sum();
} else {
// 大人物先拆分再合并结果
ForkJoinSum left = new ForkJoinSum(nums, low, low + (high - low) / 2);
ForkJoinSum right = new ForkJoinSum(nums, low + (high - low) / 2, high);
// 注意:不要两个线程都fork,不然当前线程将无任务可做。
// invokeAll其中left会在当前线程内执行,right会fork在线程池中另一个线程中执行。
invokeAll(left, right);
return left.join() + right.join();
}
}
public static void main(String[] args) {
int count = 100;
int[] nums = new int[count];
Random random = new Random();
for (int i = 0; i < count; i++) nums[i] = random.nextInt(count);
ForkJoinPool pool = new ForkJoinPool(4); // 最大并发数4
ForkJoinSum task = new ForkJoinSum(nums);
long ret = pool.invoke(task);
System.out.println(ret);
}
}
锁
内置锁
内置锁能同时保证
原子性
和可见性
。内置锁是独占锁,会增加串行代码比例,降低程序的可伸缩性。内置锁是一种可重入锁,如果已经获得了锁,后面在遇到相同的锁时可以直接进入。
是用不用的锁组合时一定要注意加锁的顺序,避免交叉造成死锁的问题。
1
2
3
4
5
6
7
8// 锁为实例(this)对象
synchronized (this) {
...
}
public synchronized void fun() {
...
}1
2
3
4// 特定的锁对象,可以将锁对象封闭在程序内部。
synchronized (lock) {
...
}1
2
3
4// 锁为class对象
public static synchronized fun() {
...
}ReentrantLock
ReentrantLock
提供了和synchronized
一样的互斥性及内存可见性。在大多数场景下使用synchronized
就够了,但是内置锁有一些局限性。例如,我们无法中断一个等待获取锁的线程。但是使用ReentrantLock
也有缺点,我们必须手动释放锁。ReentrantLock
和内置锁性能相当,仅当内置锁不能满足需求时才考虑使用ReentrantLock
。就性能而言,我们也应当首选内置锁,因为内置锁的JVM的内置属性,它能执行一些优化。ReentrantLock
可以提供公平锁,线程按照它们发出请求的顺序来获得锁。非公平锁允许插队
行为,当请求一把非公平锁时,如果在发出请求时同时改锁状态变为可用,那么这个线程将跳过所有等待线程而获得这个锁。由于公平锁所有线程在等待锁时都要排队,这将会增加线程切换降低性能,而插队
将减少一次上下文切换。当持有锁的时间较长时,应该使用公平锁。ReentrantLock
实现了Lock
接口。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public interface Lock {
// 获得一把锁
void lock();
// 获得一把锁,锁获取时可以中断
void lockInterruptibly() throws InterruptedException;
// 获取一把锁,获得返回true
boolean tryLock();
// 尝试获得一把锁,等待一定的时间(获取锁时可中断)。获得返回true
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 条件队列
Condition newCondition();
}组合运用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public class ConcurrentIncrease {
private int total = 0;
private ReentrantLock lock = new ReentrantLock();
public int incr() {
lock.lock();
try {
++total;
// 内部输出后会发生IO阻塞,会放弃CPU资源
// System.out.printf("total: %d\n", total);
return total;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
int count = 100;
final ConcurrentIncrease increase = new ConcurrentIncrease();
CountDownLatch start = new CountDownLatch(count);
CyclicBarrier done = new CyclicBarrier(count, () -> {
System.out.printf("total: %d\n", increase.total);
});
for (int i = 0; i < count; i++) {
new Thread(() -> {
try {
start.await();
} catch (InterruptedException ignored) { }
System.out.println(increase.incr());
try {
done.await();
} catch (InterruptedException | BrokenBarrierException ignored) { }
}).start();
start.countDown();
}
}
}ReadWriteLock
ReentrantLock
和内置锁都是互斥锁,每次只能有一个线程持有锁。互斥锁是一种保守的加锁策略,它可以避免写\写
和读\写
的冲突,但它同时也限制了读\读
。读写锁正是来解决这一问题的,只要能保证每个线程都能读到最新的数据,并且读数据时不会有其它线程来修改数据,那就不会发生问题。使用了读写锁后,一个资源可以同时被多个读操作或者一个写操作,但二者不能同时进行。读写锁实现方式
- 释放优先。当写入线程释放写锁时,如果正在排队的同时存在读和写线程,改优先选择读线程还是写线程。
- 读线程插队。如果锁有读线程获取,但是有写线程等待,那么新到的读线程是否能立即获得锁,还是排队在写线程后面。
- 重入性。读取锁和写入锁是否可重入。
- 降级。如果一个线程持有写入锁,它是否能再不释放该锁的情况下获取读锁。这个线程将同时拥有读写锁,同时将阻止其它线程对被保护资源的读写。
升级。读锁能否由于其它正在等待的读线程和写线程升级为一个写入锁。(这协商不好将发生死锁的情况,如果两个读锁同时要升级为写锁)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class ReadWriteMap <K,V> {
private final Map<K, V> map;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock r = lock.readLock();
private final Lock w = lock.writeLock();
public ReadWriteMap(Map<K, V> map) {
this.map = map;
}
public V put(K key, V value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
public V get(Object key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
...
// remove/putAll/clear/size/isEmpty/containsKey/containsValue
}
原子变量
在java.util.concurrent.atomic
包中提供了大量原子变量类,原子类底层采用CAS(Compare-and-Swap)方式。CAS在竞争失败时会再试而基于锁保护的操作在获取锁失败时线程将会被挂起,在竞争不激烈环境下失败重试能避免线程挂起而获得不错的性能,在竞争激烈时CAS操作会频繁失败重试。
CAS:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66/**
* CAS 的典型使用模式是:首先从 V 中读取值 A ,并根据 A 计算新值 B ,
* 然后再通过 CAS 以原子方式将 V 中的值由 A 变成 B (只要在这期间没有任何线程将 V 的值修改为其他值)。
* 由于 CAS 能检测到来自其他线程的干扰,因此即使不使用锁也能够实现原子的读一改一写操作序列。
*/
public class CAS<T> {
private T value;
public CAS(T value) {
this.value = value;
}
public synchronized T get() {
return value;
}
/**
* 比较成功则换值
*
* 对于 i = 1; i = i + 1
* 我们的预期是1,操作后应该结果是2;
* 如果此时i确实是1,那就说明没人修改,那就直接将预期结果2赋值给它。
* @param expectedValue 期望值,即我们看到的值
* @param newValue 新值
* @return
*/
public synchronized T compareAndSwap(T expectedValue, T newValue) {
T oldValue = value;
if (value == expectedValue)
value = newValue;
return oldValue;
}
/**
* {@link #compareAndSwap(int, int)} 返回的是旧值,也就是我们期望的那个值的。
* <br>
* 如果它的返回值和我们的期望值一样,说明此时交换成功了。
* @param expectedValue 期望值
* @param newValue 新值
* @return
*/
public synchronized boolean compareAndSet(T expectedValue, T newValue) {
return (expectedValue == compareAndSwap(expectedValue, newValue));
}
}
class CASCounter {
private CAS<Integer> value;
public CASCounter(int value) {
this.value = new CAS<>(value);
}
public int getValue() {
return value.get();
}
public int increment() {
int v;
do {
v = value.get();
// 相等表示替换成功,失败则重试
} while (v == value.compareAndSwap(v, v + 1));
return v + 1;
}
}
线程池
ThreadPoolExecutor
- corePoolSize 初始线程池大小
- maximumPoolSize 最大线程池大小
- keepAliveTime 不活动线程存活时间
- workQueue 任务缓冲队列
threadFactory 创建工作线程的工厂,特殊情况下我们需要定制我们的工作线程。
工作线程满了后新任务可以放到队列中,队列分为有界队列和无界。使用无界队列在工作线程处理不及时时可能会出现排队任务太多,内存溢出情况。
使用有界队列时,如果队列满了以后有不同的
饱和策略
来处理。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5));
// 饱和策略
//executorService.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
//executorService.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
//executorService.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executorService.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 15; i++) {
Thread.sleep(10);
final int task = i;
System.out.printf("put task %d\n", task);
try {
executorService.submit(() -> {
try {
Thread.sleep(2000);
System.out.printf("run task: %d; thread: %s \n", task, Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
} catch (Exception e) {
System.out.println(e.getClass());
System.out.printf("put fail %d [%s]\n", task, e.getMessage());
}
}
newFixedThreadPool
大小固定的线程池。注意
LinkedBlockingQueue
是无界的。1
2
3
4
5public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}newCachedThreadPool
可缓存的线程池,会回收空闲的线程。注意:这种方式创建的线程池
maximumPoolSize
为Integer.MAX_VALUE
。1
2
3
4
5public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}newSingleThreadExecutor
创建一个单线程线程
1
2
3
4
5
6public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}newScheduledThreadPool
创建一个可定时及周期性任务执行任务的线程池。注意:线程池大小为
Integer.MAX_VALUE
。newWorkStealingPool
这个线程池是1.8引入的。该线程池使用
ForkJoinPool
实现,参数parallelism
为线程的并行数量其继承自AbstractExecutorService
。ForkJoinPool
实现了一个工作窃取
算法,使得空闲线程能够窃取别的线程分解出来的子任务,从而让所有的线程都尽可能处于满负荷,提高执行效率。有一个无参数版本,其将parallelism
设为Runtime.getRuntime().availableProcessors()
,即处理器核数。ForkJoinPool
TODO
饱和策略
Abort
默认策略直接拒绝新任务,抛出
RejectedExecutionException
异常。Discard
悄悄的丢弃最新的任务,不会有任何异常抛出。
DiscardOld
同
Discard
恰恰相反,悄悄的抛弃最旧的待执行任务。CallerRuns
及不抛弃,也不会抛异常。线程池和队列都满时会在调用了
execute
的线程中执行,这种情况主线程可能会被阻塞。
线程池停止
shutdown
非阻塞方法。停止线程,会等待所有工作及排队线程都执行完毕。
shutdownNow
非阻塞。先停止(
interrupt
)正在执行的线程,然后返回正在等待的任务。通过检测Thread.currentThread().isInterrupted()
适时结束正在执行的线程。awaitTermination(timeout, unit)
阻塞等待最长
timeout
等待线程池结束。
Amdahl定律
在有些问题中,如果可用资源越多那么解决问题的速度就越快。可用资源与速度是一个什么关系呢?我们增加了10倍机器能获得性几倍的提升呢?Amdahl定律描述的正是这个问题。
在增加计算资源的情况下,程序理论上能实现的最高加速比,这个值取决于程序中科并行组件与串行组件所占的比重。假设F是必须被串行执行的部分,那么根据Amdahl定律,在包含N个处理器的机器中,最高的加速比为:
当N趋近无穷大时,最大的加速比趋近1/F。因此如果程序中如果有50%的计算需要串行执行,那么最高的加速比只能是2(不管有多少个处理器多少个线程可用)。如果程序中有10%的计算需要串行,那么最高的加速比将接近10。
Amdahl定律的应用场景:
在一个订单处理系统中,我们应用服务器可以很方便的扩展很多倍。那么在数据库中并发扣减库存这个点就是整个系统的串行点,这个点直接决定了整个系统最终的吞吐量。遇到这种问题,我们盲目堆机器初期能提高吞吐量,增加到一定程度性能将不再提升,而且还会因为线程竞争激烈造成性能下降。
常见异常
java.util.ConcurrentModificationException
对容器迭代的时候如果同时对其进行修就会抛出
ConcurrentModificationException
。这类似一种预警机制,它将计数器与容器变化关联。如果迭代期间计数器被修改那么hasNext
或者next
将抛出异常。在迭代期间迭代器可能并没有意识到容器已经修改了,这是一种权衡机制来尽量避免并发修改操作对程序的影响。modCount
是List的一个成员变量,表示容器修改(add/remove)次数。
expectedModCount
是Iterator
内部变量,这个值的初始值就是modCount
的值。如果迭代过程中修改了容器,modCount
就会改变,而此时expectedModCount
还是modCount
的旧值。不管是简单的for,还是增强for循环编译后都是Iterator迭代。
直接调用Iterator.remove
来删除元素不会出现ConcurrentModificationException
异常,其方法内部会重新修正modCount
和expectedModCount
的值。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79public Iterator<E> iterator() {
return new Itr();
}
/**
* An optimized version of AbstractList.Itr
* 内部类,可以直接访问宿主类属性。用于容器迭代。
*/
private class Itr implements Iterator<E> {
int cursor; // index of next element to return
int lastRet = -1; // index of last element returned; -1 if no such
// 保留原始的modCount值,可以看做是oldModCount
int expectedModCount = modCount;
public boolean hasNext() {
return cursor != size;
}
"unchecked") (
public E next() {
// 每次迭代是都检查
checkForComodification();
int i = cursor;
if (i >= size)
throw new NoSuchElementException();
Object[] elementData = ArrayList.this.elementData;
if (i >= elementData.length)
throw new ConcurrentModificationException();
cursor = i + 1;
return (E) elementData[lastRet = i];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
checkForComodification();
try {
ArrayList.this.remove(lastRet);
cursor = lastRet;
lastRet = -1;
// 直接调用it.remove()后不会触发ConcurrentModificationException异常
// 这里会重置expectedModCount的值
expectedModCount = modCount;
} catch (IndexOutOfBoundsException ex) {
throw new ConcurrentModificationException();
}
}
"unchecked") (
public void forEachRemaining(Consumer<? super E> consumer) {
Objects.requireNonNull(consumer);
final int size = ArrayList.this.size;
int i = cursor;
if (i >= size) {
return;
}
final Object[] elementData = ArrayList.this.elementData;
if (i >= elementData.length) {
throw new ConcurrentModificationException();
}
while (i != size && modCount == expectedModCount) {
consumer.accept((E) elementData[i++]);
}
// update once at end of iteration to reduce heap write traffic
cursor = i;
lastRet = i - 1;
checkForComodification();
}
final void checkForComodification() {
// modCount 为宿主类属性,实时的
// expectedModCount 为创建迭代器时的 modCount
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
}1
2
3
4
5
6// 如果这里没有即时抛出`ConcurrentModificationException`异常,
// 因为我们在循环开始前已经取过size值,那么后面肯定会因为删除元素抛出`ArrayIndexOutOfBoundsException`异常。
// 这种机制被称为及时失败(fail-fast),及早发现问题抛出问题避免无意义的操作,因为最终迭代过程可能出现异常。
for (int i = 0; i < strs.size(); i++) {
if ("pill".equals(strs.get(i))) strs.remove(strs.get(i));
}解决方法使用
CopyOnWriteArrayList
替代ArrayList
。java.lang.UnsupportedOperationException
容器逸出时,为了避免容器被修改可以使用
Collections.unmodifiable(Map|List)
等静态方法包装容器来保护原始容器不被修改。调用这些容器的修改操作(add/remove)时会抛出UnsupportedOperationException异常。
另外:
Arrays.asList
方法返回的是一个fixed-size
list,我们调用它的修改成操作时也会抛出UnsupportedOperationException
异常。对其重新封装new ArrayList<>(Arrays.asList("a", "b", "c"))
后就不会出现此问题。1
2List<String> strs = Collections.unmodifiableList(new ArrayList<>(Arrays.asList("a", "b", "c")));
strs.remove("b");java.lang.IllegalMonitorStateException
这在我们调用对象的
wait
或者notify/notifyAll
时会抛出,主要原因是我们没有获取被被调用对象的锁。错误示例:
1
2
3
4Object lock = new Object();
lock.wait();
doSomething();
lock.notifyAll();正确示例:
1
2
3
4
5
6Object lock = new Object();
synchronized (lock) {
lock.wait();
doSomething()
lock.notifyAll();
}