上一篇文章中,我们用串行的方法解决了非并发的问题,利用互斥锁锁住整个类或者this对象将关联和不关联的问题去解决,但是如果在并发的环境下,还用串行代码设计的方法去写产品,势必会让整个项目的执行效率就降低,所以为了这种情况下还能让项目高效的去运行,就需要进行性能方面的提升。
仿照现实情况
因为现实中的银行去和客户做业务的时候也是多个窗口并行处理。我们先不说代码怎么写,就说现实情况,如果没有计算机技术的话。
账户的存在形式真的就是一个账本,而且每个账户都有一个账本,这些账本 都统一存放在文件架上。银行柜员在给我们做转账时,要去文件架上把转出账本和转入账本都拿到手,然后 做转账。这个柜员在拿账本的时候可能遇到以下三种情况:
- 文件架上恰好有转出账本和转入账本,那就同时拿走;
- 如果文件架上只有转出账本和转入账本之一,那这个柜员就先把文件架上有的账本拿到手,同时等着其 他柜员把另外一个账本送回来
- 转出账本和转入账本都没有,那这个柜员就等着两个账本都被送回来。
而如果用代码来说的话,意思就是,拿转出账本这件事要加一把锁,拿转入账本这件事要加一把锁。
然后在交易的方法内部,我们要实现的时候,先拿到转出的账本,然后再拿到转入钱的账本。这个时候完成转账的操作。
然后按照这样的思路去设计代码,完成多个用户之间的转账操作,就可以实现了。
class Account {
private int balance;
//转账
void transfer(Account target,int amt){
//锁定转入账户
synchronized(this){
//锁定转入账户
synchronized(target){
if(this.balance>amt){
this.balance-=amt;
target.balance += amt;
}
}
}
}
}
这样设计的代价
上面的代码,从锁住一个类,到锁住一个this对象,上次我们说过了,这种锁叫做细粒度锁,使用细粒度锁,可以提升并行度,是性能优化的一个重要手段。
但是,使用细粒度锁是有代价的,这个代价就是会导致死锁。
我们还是用一种现实中的情况来解释一下死锁的概念:
A转给B 100块钱和B转给A 100块钱,这两个操作同时执行,假如一个线程先获取到了账本A,另一个线程拿到了账本B,这时他们就要wait彼此使用完资源,然后才能去做自己的操作,这就会死锁了,因为只有两个账本都到手才能执行操作。
死锁 死锁的一个比较专业的定义是:一组互相竞争资源的线程因互相竞争资源的线程因互相等待,导致“永久”阻塞的现象相等待,导致“永久”阻塞的现象。
以防读者被带着思路走,那我们就来看看上面的代码为什么会死锁吧。
class Account {
private int balance;
//转账
void transfer(Account target,int amt){
//锁定转入账户
synchronized(this){ ①
//锁定转入账户
synchronized(target){ ②
if(this.balance>amt){
this.balance-=amt;
target.balance += amt;
}
}
}
}
}
我们假设这样的流程:
- 线程T1执行 A转账B的操作,也就是 A.transfer(B)
- 同时,线程T2执行 B转账A的操作 也就是B.transfer(A)
- 当T1 T2同时执行完①的代码,T1获得A的this锁,T2获得B的this锁
- 然后代码再跑到②的时候,T1试图获取B,T2试图获取A,就死锁了。
如何预防死锁
一般情况下,只能重启业务才能解决死锁的问题,因此,解决死锁最好的方法就是,避免死锁。
当以下四种条件都发生的时候就会出现死锁了:
- 互斥,共享资源X和Y只能被一个线程占用
- 占有且等待,就是wait别人持有且不撒手的资源
- 不可抢占,申请获取一个不可能获取到的资源
- 循环等待,互相wait对方持有的资源
反过来说,只要有一点没触发,就不会死锁了。
- 对于占用且等待这个条件,我们可以一次性申请所有资源,这样就不需要等待。
- 对于不可抢占这个条件,我们可以设置一个超时检测,如果申请不到资源就释放或者提交
- 对于循环等待这个条件,可以按照顺序去申请资源。
然后接下来说一下代码实现:
- 破坏占用且等待条件
从理论上说,就是申请所有资源。就用上面转账的例子来说,它需要两个资源,一个是转出账户,一个是转入账户。
我们可以增加一个账本管理员,然后只允许账本管理员去取账本,然后同时给申请者,这样也就完成了一次性申请所有资源。
从代码的角度说,我们也需要一个角色来管理临界区,我们定义这个角色为Allocator,这个角色以类的身份出现,并且为单例。创建一个申请资源的方法apply(),一个释放资源的方法free()。
class Allocator{
private List<Object> als = new ArrayList();
//一次性申请所有资源
synchronized boolean apply(Object from , Object to){
if(als.contains(from)||als.contains(to)){
return false;
}else {
als.add(from);
als.add(to);
}
return true;
}
//归还资源
synchronized void free(Object from , Object to){
als.remove(from);
als.remove(to);
}
}
class Account{
//actr应该为单例
private Allocator actr;
private int balance;
//转账
void transfer(Account target,int amt){
//一次性申请转储账户和转入账户,知道成功
while(!actr.apply(this,target));
try{
//锁定转出账户
synchronized(this){
//锁定转入账户
synchronized(target){
if(this.balance > amt){
this.balance -=amt;
target.balance += amt;
}
}
}
}finally{
actr.free(this,target);
}
}
}
- 破坏不可抢占条件
核心在于主动释放占有的资源,这一点使用sync是不行的,因为如果sync申请不到资源,就进入阻塞了,而阻塞状态程序放空,什么措施也做不了。
所以就要使用Lock了,Lock下周详细说。
- 破坏循环等待
首先我们要对资源进行排序,然后排序,按照顺序申请资源。然后按照序号从小到大来锁定账户,就不存在循环等待了。
class Account{
private int id;
private int balance;
//转账
void transfer(Account target,int amt){
Account left = this;
Account right = target;
if(this.id>target.id){
left = target;
right = this;
}
//锁定序号小的账户
synchronized(left){
//锁定序号大的账户
synchronized(right){
if(this.balance>amt){
this.balance -=amt;
target.balance +=amt;
}
}
}
}
}