- 同步容器类实现线程安全的方式是:
将他们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态.
-
同步容器类都是
线程安全
的,但在某些情况下可能需要额外的客户端加锁来保护复合操作. -
由于同步容器类要遵守
同步策略
(支持客户端加锁),因此可能会创建一些新的操作,只要我们知道应该使用哪一个锁,那么这些新操作就与容器的其他操作一样是原子操作
.
-
在设计同步容器类的迭代器时并没有考虑到并发修改的问题,它们表现出的行为是
"及时失败"(fail-fast)
的
这意味着,当发现容器在迭代过程中被修改时,就会抛出迭代器与ConcurrentModificationException
-
避免出现
迭代器与ConcurrentModificationException
的方法:- 迭代过程持有容器的锁
- "克隆"容器,并在副本上迭代(克隆过程仍需要对容器加锁)
-
标准容器的
toString
方法将隐式
迭代容器,并在每个元素上调用toString
来生成容器内容的格式化表示 -
容器的
containsAll
,hashCode
和equals
等方法也会间接地执行迭代操作.
正如封装对象的状态有助于维持
不变性条件
一样,封装对象的同步机制
(同步代码)同样有助于确保实施同步策略
(eg:用synchronizedSet
包装HashSet
)
-
同步容器将所有对容器的状态访问都串行化,严重降低并发性,当多个线程访问锁时,吞吐量将严重降低.
-
同步容器是
synchronizedXXX
这类,仅仅给容器提供同步,效率很低
并发容器是针对多个线程并发访问设计的,ConcurrentMap
,CopyOnWriteList
(遍历操作较多的情况下代理同步的List
),ConcurrentQueue
(传统先进先出队列)BlockingQueue
(扩展Queue
,提供可阻塞操作),BlockingDeque
(可阻塞双端队列)..
用并发容器来代替同步容器,可以极大地提高伸缩性并降低风险
-
ConcurrentHashMap
使用一种粒度更细的加锁机制
来实现更大程度的共享,这种机制称为分段锁(Lock Strip)
任意数量的读取线程可以并发地访问Map,并且一定数量的写入线程可以并发地修改Map
ConcurrentHashMap
带来的结果是,在并发访问的环境下实现更高的吞吐量
,而在单线程环境中值损失非常小的性能 -
ConcurrentHashMap
返回的迭代器具有弱一致性(Weakly Consistent) -
用
ConcurrentHashMap
代替同步Map能进一步提高代码的可伸缩性
,只有当应用程序需要加锁Map进行独占访问
时,才应放弃使用ConcurrentHashMap
- "若没有则添加","若相等则移除"和"若相等则替换"等,都已经实现为
原子操作
并且在ConcurrentMap
的接口中声明
-
CopyOnWriteArrayList
用于替代同步List
在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制 -
在每次修改时,它都会创建并重新发布一个
新的容器副本
,从而实现可变性
-
仅当迭代操作远远多于修改操作时,才应该使用"写入时复制"容器
-
阻塞队列提供了可阻塞的
put
和take
方法,以及支持定时的offer
和poll
方法 -
"生产者"和"消费者"角色是相对的,某种环境中的消费者在另一种不同的环境中可能成为生产者
在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:
它们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加强壮
-
类库中包含了
BlockingQueue
(并发队列)的多种实现:-
LinkedBlockingQueue
和ArrayBlockingQueue
是FIFO队列,比同步List拥有更好的并发性能 -
PriorityBlockingQueue
是按优先级排序的队列 -
SynchronousQueue
不是一个真正的队列,不会为元素维护储存空间,它只维护一组等待着把元素加入或移出队列的线程
实现了直接交付,从而降低了从生产者
移动到消费者
的延迟,当交付被接受时,它就知道消费者已经得到了任务
仅当有足够多的消费者,并且总有一个消费者准备好获取交付的工作时,才适合使用同步队列
-
-
生产者——消费者模式能带来许多性能优势:
生产者
和消费者
可以并发地执行.
如果一个是I/O密集型
,另一个是CPU密集型
,那么并发执行
的吞吐率
要高于串行执行
的吞吐率.
如果生产者
和消费者
的并行度不同,将它们紧密耦合
在一起会把整体并行度降低为二者中最下的并行度
可变对象
,生产者——消费者
与阻塞队列
一起,促进了串行线程封闭,将对象所有权从生产者
交付给消费者
封闭对象
只能由单个线程拥有,但可以通过安全地发布对象来"转移"所有权,在转移后,另一个线程独占该对象访问权限
-
Deque
是一个双端队列
,实现了在队列头和队列尾的高效插入和移除
具体实现包括ArrayDeque
和LinkedBlockingDeque
-
双端队列适用与
工作密取模式(Work Stealing)
在工作密取模式
中,每个消费者
都有各自的双端队列
.如果一个消费者
完成了自己双端队列
中的全部工作,那它就可以从其他消费者
的双端队列
末尾秘密地获取工作 -
密取模式比传统的
生产者——消费者
模式具有更高的可伸缩性
,因为工作者线程不会在单个共享的任务队列上发生竞争
-
线程可能会阻塞或暂停执行的原因: 等待I/O操作结束,等待获得一个锁,等待从
Thread.sleep
方法中醒来,或是等待另一个线程的计算结果
当线程阻塞时,它通常被挂起,并处于某种阻塞状态(BLOCKED,WAITING或TIMED_WAITING)
阻塞操作与执行时间长的差别在于,被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行(eg:等待I/O操作完成,等待某个锁变可用,等待外部计算结果) -
当某方法抛出
InterruptedException
时,表示该方法是阻塞方法,如果这个方法被中断,那么它将女里提前结束阻塞状态 -
中断是一种
协作机制
,一个线程不能强制其他线程停止正在执行的操作而去执行其他的操作
当线程A中断线程B时,A仅仅要求B在执行到某个可以暂停的地方停止正在执行的操作,前提是B愿意停下来 -
处理对中断的响应,有两种选择:
- 传递
InterruptedException
- 恢复中断
- 传递
-
在出现
InterruptException
时最不应该做的是,捕获它但不作出任何响应
-
同步工具类可以是任何一个对象,只要它根据自身的状态来协调线程的控制流
阻塞队列
可以作为同步工具类,其他类型的同步工具还有信号量(Semaphore)
,栅栏(Barrier)
以及闭锁(Latch)
-
所有同步工具类都包含一些特定的结构化属性:
- 封装了一些状态(决定线程是继续执行还是等待)
- 提供一些方法对状态进行操作
- 另一些方法用于高效地等待同步工具进入到预期状态
-
闭锁相当于一扇门: 在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门就会打开并允许所有线程通过
当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态
闭锁可以用来确保某些活动直到其他活动都完成后才能执行 -
闭锁状态包括:
- 一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量
countDown
方法递减计数器,表示有一个事件已经发生了await
方法等待计数器达到零,表示所有需要等待的事件都已经发生
- 如果计数器的值非零(或者等待的线程中断/超时),那么
await
会一直阻塞到计数器为零
-
FutureTask
表示的计算是通过Callable
来实现的,相当于一中可生成结果的Runnable
有以下三种状态:等待运行(Waiting to Run
,正在运行(Running)
和运行完成(Completed)
-
通过提前启动计算,可以减少在等待结果时需要的时间
-
在
get
方法抛出ExecutionException
时,可能是以下三种情况之一:
Callable
抛出的受检查异常,RuntimeException
, 以及Error
(必须对每种情况进行单独处理)
-
计数信号量(Counting Semaphore)
用来控制同时访问某个特点资源的操作数量,或者同时执行某个指定操作的数量. -
Semaphore
中管理着一组虚拟的许可,许可的初始数量可通过构造函数来指定
在执行操作时可以先获得许可,并在使用后release
释放许可给信号量
如果没有许可,那么acquire
将阻塞直到有许可 -
Sequence
可以用于实现资源池,也可以将任何一种容器变成有界阻塞容器
-
栅栏
与闭锁
的关键区别在于,所有线程必须都到达栅栏位置才能继续执行闭锁
用于等待事件,而栅栏
用于等待其他线程
-
如果对
await
的调用超时,或者await
阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await
调用都终止并抛出BrokenBarrierException
-
在模拟程序中通常需要使用栅栏(eg.等待所有玩家加载完毕开启游戏)
-
另一种形式的栅栏是
Exchanger
,它是一种两方(Two-Party)栅栏
,各方在栅栏位置上交换数据
当双方执行不对称时,Exchanger
非常有用
-
public interface Computable<A, V>{ V compute(A arg) throws InterruptedException; } public class ExpensiveFunction implements Computable<String, BigInteger>{ public BigInteger compute(String arg){ //长时间计算 return new BigInteger(arg); } } public class Memoizerl<A, V> implements Computable<A, V>{ private final Map<A, V> cache = new HashMap<A, V>(); private final Computable<A, V> c; public Memoizer1(Computable<A, V> c){ this.c = c; } public synchronized V compute(A arg) throws InterruptedException{ V result = cache.get(arg); if(result == null){ result = c.compute(arg); cache.put(arg, result); } return result; } }
- 对整个方法进行同步,能保证线程安全性.但会带来一个明显的
可伸缩性
问题:每次只有一个线程能执行该方法
如果有多个线程在排队等待还未计算出的结果,那么该方法的计算时间可能比没有缓存操作的计算时间还长
-
public class Memoizer2<A, V> implements Computable<A, V>{ private final Map<A, V> cache = new ConcurrentHashMap<A, V>(); private final Computable<A, V> c; public Memoizer2(Computable<A, V> c){ this.c = c; } public V compute(A arg) throws InterruptedException{ V result = cache.get(arg); if(result == null){ // 线程B进入(在这之前A已经进入) result = c.compute(arg); // 线程A在计算 cache.put(arg, result); } return result; } }
- 代码二的问题在于:
如果某个线程启动了一个开销很大的计算,而其他线程并不知道这个计算正在进行,那么会产生重复计算.
-
public class Memoizer3<A, V> implements Computable<A, V>{ private final Map<A, Future<V>> cache = new ConcurrentHashMap<>(); private final Computable<A, V> c; public Memoizer3(Computable<A, V> c){ this.c = c; } public V compute(final A arg) throws InterruptedException{ Future<V> f = cache.get(arg); if(f == null){ // 非原子,多个线程可进入 Callable<v> eval = new Callable<v>(){ public V call() throws InterruptedException{ return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<v>(eval); f = ft; cache.put(arg, ft); ft.run(); } try{ return f.get(); }cache(ExcutionException e){ throw launderThrowable(e.getCause()); } } }
compute
方法中的if代码块是非原子
的"先检查再执行"操作,因此两个线程仍有可能在同一时间内调用compute
来计算相同的值
-
public class Memoizer3<A, V> implements Computable<A, V>{ private final Map<A, Future<V>> cache = new ConcurrentHashMap<>(); private final Computable<A, V> c; public Memoizer3(Computable<A, V> c){ this.c = c; } public V compute(final A arg) throws InterruptedException{ while(true){ Future<V> f = cache.get(arg); if(f == null){ // 非原子,多个线程可进入 Callable<v> eval = new Callable<v>(){ public V call() throws InterruptedException{ return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<v>(eval); f = cache.putIfAbsent(arg, ft); //原子操作,在此限制其他线程 if(f == null){ f = ft; ft.run(); } } try{ return f.get(); }cache(CancellationException e){ cache.remove(arg, f); //计算取消,移除Future }cache(ExcutionException e){ throw launderThrowable(e.getCause()); } } } }
- 可变状态是至关重要的
所有的并发问题都可以归结为如何协调对并发状态的访问.可变状态越少,就越容易确保线程安全性- 尽量将域声明为
final
型,除非需要它们是不可变的- 不可变对象一定是线程安全的
不可变对象能极大地降低并发编程的复杂性.它们更为简单而且安全,可以任意共享而无须使用加锁或保护性复制等机制- 封装有助于管理复杂性
在编写线程安全的程序时,虽然可以将所有数据都保存在全局变量中.
但将数据封装在对象中,更易于维持不变性条件: 将同步机制封装在对象中,更易于遵循同步策略- 用锁来保护每个可变变量
- 当保护同一个不变性条件中的所有变量时,要使用同一个锁
- 在执行复操作期间,要持有锁
- 如果从多个线程中访问同一个变量时没有同步机制,那么程序会出现问题
- 不要故作聪明地推断出不需要使用同步
- 在设计过程中考虑线程安全,或者在文档中指出它不是线程安全的
- 将同步策略文档化
~~done! 洗澡睡觉