软件事务内存导论(五)创建嵌套事务

1.1    创建嵌套事务

在之前的示例中,每个用到事务的方法都是各自在其内部单独创建事务,并且事务所涉及的变动也都是各自独立提交的。但如果我们想要将多个方法里的事务调整成一个统一的原子操作的时候,上述做法就无能为力了,所以我们需要使用嵌套事务来实现这一目标。

通过使用嵌套事务,所有被主控函数调用的那些函数所创建的事务都会默认被整合到主控函数的事务中。除此之外,Akka/Multiverse还提供了很多其他配置选项,如新隔离事务(new isolated transactions)等。总之,使用了嵌套事务之后,只有位于最外层的主控函数事务提交时,其内部所做的变更才会被提交。在具体使用时,为了保证所有嵌套事务能够作为一个整体成功完成,我们需要保证所有函数都必须在一个可配置的超时范围内做完。

我们在4.6节中通过加锁方式实现的AccountService的transfer()函数将会受益于嵌套事务。因为这个版本的transfer()函数需要按自然顺序对所有账户排序并显式地对锁进行管理。STM将为我们消除所有这些负担。下面我们会首先在Java中用嵌套事务重新实现这一示例,然后再来看一下该示例在Scala中是如何实现的。

在Java中使用嵌套事务

现在让我们开始对Account类进行事务化的改造吧。首先我们需要把保存账户余额的变量balance改成托管引用,下面我们就来定义这个字段以及该字段的getter函数。


1 public class Account {
2 final private Ref<Integer> balance = new Ref<Integer>();
3 public Account(int initialBalance) { balance.swap(initialBalance); }
4 public int getBalance() { return balance.get(); }

在构造函数中,我们用Ref的swap()函数将给定的数量设置成balance的初始值。由于swap()函数运行在自己独立的事务中,所以我们就无需再创建额外的事务了(同时我们假设调用者也不会为这个操作创建额外的事务)。getBalance()函数的情况与之类似,就不再赘述了。

由于deposit()函数需要对balance进行先读后写的操作,所以该函数内的所有操作需要整体封装到一个事务里运行。下面的代码为我们展示了如何将这两个操作封装到一个独立事务中的方法。


01 public void deposit(final int amount) {
02 new Atomic<Boolean>() {
03 public Boolean atomically() {
04 System.out.println("Deposit " + amount);
05 if (amount > 0) {
06 balance.swap(balance.get() + amount);
07 return true;
08 }
09 throw new AccountOperationFailedException();
10 }
11 }.

基于同样的理由,我们需要把withdraw()函数里的所有操作也封装到一个独立的事务中。


01 public void withdraw(final int amount) {
02 new Atomic<Boolean>() {
03 public Boolean atomically() {
04 int currentBalance = balance.get();
05 112 • Chapter 6. Introduction to Software Transactional Memory
06 if (amount > 0 && currentBalance >= amount) {
07 balance.swap(currentBalance - amount);
08 return true;
09 }
10 throw new AccountOperationFailedException();
11 }
12 }.execute();
13 }
14 }

如果运行过程中有异常抛出,则事务将会强制失败。所以当账户内余额不足或存款/取款操作输入了非法参数时,我们就可以利用这一点来表示操作失败。相当简单,是吧?从此我们就可以不用再担心同步、加锁、死锁等令人烦恼的问题了。

现在到了该浏览一下执行转账操作的AccountService类的时候了,让我们首先来看一下其中的transfer()函数(校注:java中应该叫方法)


01 public class AccountService {
02 public void transfer(
03 final Account from, final Account to, final int amount) {
04 new Atomic<Boolean>() {
05 public Boolean atomically() {
06 System.out.println("Attempting transfer...");
07 to.deposit(amount);
08 System.out.println("Simulating a delay in transfer...");
09 try { Thread.sleep(5000); } catch(Exception ex) {}
10 System.out.println("Uncommitted balance after deposit $" +
11 to.getBalance());
12 from.withdraw(amount);
13 return true;
14 }
15 }.execute();
16 }

在这个示例中,我们会将多个事务置于相互冲突的环境中,以此来演示嵌套事务的行为并帮助你加深对嵌套事务的理解。Transfer()函数中的所有操作都是在同一个事务中完成的。作为转账过程的一部分,我们首先将钱存到目标账户中。紧接着,在经过一个为引入事务冲突而专门设置的延时之后,我们将钱从源账户中划走。我们希望当且仅当从源帐户划款成功之后,向目标账户存款的操作才能够成功,这也是我们这个事务所要完成的目标。

我们可以通过打印balance的值来观察转账操作是否成功。如果有一个方便的函数来调用transfer()函数,处理下异常,并在最后打印一下balance的值就更好了,下面就让我们动手写一个吧:


01 public static void transferAndPrintBalance(
02 final Account from, final Account to, final int amount) {
03 boolean result = true;
04 try {
05 new AccountService().transfer(from, to, amount);
06 } catch(AccountOperationFailedException ex) {
07 result = false;
08 }
09 System.out.println("Result of transfer is " + (result ? "Pass" : "Fail"));
10 System.out.println("From account has $" + from.getBalance());
11 System.out.println("To account has $" + to.getBalance());
12 }

最后我们还需要一个main()函数来让整个示例运转起来。


01 public static void main(final String[] args) throws Exception {
02 final Account account1 = new Account(2000);
03 final Account account2 = new Account(100);
04 final ExecutorService service = Executors.newSingleThreadExecutor();
05 service.submit(new Runnable() {
06 public void run() {
07 try { Thread.sleep(1000); } catch(Exception ex) {}
08 account2.deposit(20);
09 }
10 });
11 service.shutdown();
12 transferAndPrintBalance(account1, account2, 500);
13 System.out.println("Making large transfer...");
14 transferAndPrintBalance(account1, account2, 5000);
15 }
16 }

在main函数中,我们创建了两个账户,并在一个单独的线程中从第二个账户里取走$20。与此同时,我们还启动了一个在账户之间转账的事务。由于这些操作都会影响到公共实例(即两个账户——译者注),所以这种做法将导致两个事务(存$20的事务和转账$500的事务——译者注)产生冲突。于是只有一个事务能够顺利完成,而另一个将会重做。最后,我们会启动一个超出源账户余额的转账操作,以此来演示存款和取款这两个相互关联的事务通过嵌套事务的方式在转账过程中实现了原子性的操作。下面让我们通过输出结果来观察事务的行为:


01 Attempting transfer...
02 Deposit 500
03 Attempting transfer...
04 Deposit 500
05 Simulating a delay in transfer...
06 Deposit 20
07 Uncommitted balance after deposit $600
08 Attempting transfer...
09 Deposit 500
10 Simulating a delay in transfer...
11 Uncommitted balance after deposit $620
12 Result of transfer is Pass
13 From account has $1500
14 To account has $620
15 Making large transfer...
16 Attempting transfer...
17 Deposit 5000
18 Simulating a delay in transfer...
19 Uncommitted balance after deposit $5620
20 Result of transfer is Fail
21 From account has $1500
22 To account has $620

输出结果起始处的重试操作让人看起来有些摸不着头脑。这个非预期的重试是由Multiverse对于单个对象上的只读事务的默认优化造成的。虽然有两种方法可以重新配置这一行为,但修改了之后可能会对性能造成影响。请参阅Akka/Multiverse文档来进一步了解变更这一配置所造成的影响。

在本例中,向帐户2存$20的操作会先完成。而与此同时,从账户1向账户2的转账事务则处于模拟的延迟当中。当转账事务重新恢复运行并察觉到其涉及的对象发生了变化时,该事务将悄悄地回滚并重做。如果事务在运行过程中一直出现内部数据有变化的情况,则该事务会不断重做直至成功或超时退出为止。本例中的转账事务是最终成功了的,帐户余额的变化充分地反映了这一结果——账户1转出了$500,而账户2则从并发的存款和转账操作中总共获取了$520。

本例的最后一个操作是从账户1向账户2转$5000。在这个事务中,存款操作顺利完成了,但事务能否最终成功还是要看取款操作的结果。不出所料,取款动作由于账户余额不足而失败并抛了异常。随后,之前的存款动作被回滚,系统最终保证了账户余额数据不受事务失败的影响。

再次声明,在事务中打印信息和插入延时都不是好习惯,我在本例中这样用是为了使你能够更好地观察事务的运行顺序和重做行为,在实际工作中请最好不要在事务代码里打印消息或打日志。请记住,事务是不应该有任何副作用的。如果事务中确实需要包含有副作用的操作,我们可以将这些代码放到后面将会提到的后置提交(post-commit)handler里面去。

我可以拍胸脯向你保证,使用事务绝对可以替你分担大部分并发编程方面的烦恼。下面就让我们通过一组对比来看看事务到底效用几何。让我们回顾一下4.6节中我们用加锁方式实现的转账函数transfer(),为方便起见我将代码列在下面:


01 public boolean transfer(
02 final Account from, final Account to, final int amount)
03 throws LockException, InterruptedException {
04 final Account[] accounts = new Account[] {from, to};
05 Arrays.sort(accounts);
06 if(accounts[0].monitor.tryLock(1, TimeUnit.SECONDS)) {
07 try {
08 if (accounts[1].monitor.tryLock(1, TimeUnit.SECONDS)) {
09 try {
10 if(from.withdraw(amount)) {
11 to.deposit(amount);
12 return true;
13 } else {
14 return false;
15 }
16 } finally {
17 accounts[1].monitor.unlock();
18 }
19 }
20 } finally {
21 accounts[0].monitor.unlock();
22 }
23 }
24 throw new LockException("Unable to acquire locks on the accounts");
25 }

你可以将上述代码与去掉了延时和log输出的事务版本进行比较:


01 public void transfer(
02 final Account from, final Account to, final int amount) {
03 new Atomic<Boolean>() {
04 public Boolean atomically() {
05 to.deposit(amount);
06 from.withdraw(amount);
07 return true;
08 }
09 }.execute();
10 }

旧版本的代码既要考虑加锁的问题又要顾及加锁的顺序,所以很容易出错。代码越多越容易出问题,这是显而易见的道理。在新版本中,我们显著地降低了代码量和复杂度。这让我想起了C.A.R.Hoare的名言:“这世界上有两种构建软件设计的方法。一种方法是使其足够简单以至于不存在明显的缺陷。而另一种方法是使其足够复杂以至于无法看出有什么毛病” 。只有让代码更少、结构更简单,我们才能将更多的时间投入到程序逻辑的设计开发中去。

在Scala中使用嵌套事务

从上例中我们可以看到,使用了嵌套事务的Java版转账函数是非常简洁的。然而,虽然事务的使用让我们得以去除Java中那些用于同步的冗余代码,但还是会有一些由于Java语法需要而存在的一些额外代码。正如我们下面所看到的那样,Scala的优雅和强大的表达能力使其在代码清晰简洁方面更胜一筹。下面就是Scala版的Account类:


01 class Account(val initialBalance : Int) {
02 val balance = Ref(initialBalance)
03 def getBalance() = balance.get()
04 def deposit(amount : Int) = {
05 atomic {
06 println("Deposit " + amount)
07 if(amount > 0)
08 balance.swap(balance.get() + amount)
09 else
10 throw new AccountOperationFailedException()
11 }
12 }
13 def withdraw(amount : Int) = {
14 atomic {
15 val currentBalance = balance.get()
16 if(amount > 0 && currentBalance >= amount)
17 balance.swap(currentBalance - amount)
18 else
19 throw new AccountOperationFailedException()
20 }
21 }
22 }

Scala版本的Account是逻辑直接从Java版本翻译过来的、但代码风格又带有Scala和Akka简洁优雅特征的一种实现。在Scala版本的AccountService中我们也可以看到同样的优点


01 object AccountService {
02 def transfer(from : Account, to : Account, amount : Int) = {
03 atomic {
04 println("Attempting transfer...")
05 to.deposit(amount)
06 println("Simulating a delay in transfer...")
07 Thread.sleep(5000)
08 println("Uncommitted balance after deposit $" + to.getBalance())
09 from.withdraw(amount)
10 }
11 }
12 def transferAndPrintBalance(
13 from : Account, to : Account, amount : Int) = {
14 var result = "Pass"
15 try {
16 AccountService.transfer(from, to, amount)
17 } catch {
18 case ex => result = "Fail"
19 }
20 println("Result of transfer is " + result)
21 println("From account has $" + from.getBalance())
22 println("To account has $" + to.getBalance())
23 }
24 def main(args : Array[String]) = {
25 val account1 = new Account(2000)
26 val account2 = new Account(100)
27 actor {
28 Thread.sleep(1000)
29 account2.deposit(20)
30 }
31 transferAndPrintBalance(account1, account2, 500)
32 println("Making large transfer...")
33 transferAndPrintBalance(account1, account2, 5000)
34 }
35 }

与Java版本一样,Scala版本的AccountService同样会将事务置于相互冲突的环境之下。所以毫无悬念,其输出结果也与Java版本完全相同:


01 Attempting transfer...
02 Deposit 500
03 Attempting transfer...
04 Deposit 500
05 Simulating a delay in transfer...
06 118 • Chapter 6. Introduction to Software Transactional Memory
07 Deposit 20
08 Uncommitted balance after deposit $600
09 Attempting transfer...
10 Deposit 500
11 Simulating a delay in transfer...
12 Uncommitted balance after deposit $620
13 Result of transfer is Pass
14 From account has $1500
15 To account has $620
16 Making large transfer...
17 Attempting transfer...
18 Deposit 5000
19 Simulating a delay in transfer...
20 Uncommitted balance after deposit $5620
21 Result of transfer is Fail
22 From account has $1500
23 To account has $620

前面我们已经比较过用Java实现的加锁同步版本和嵌套事务版本(如下所示)的转账函数


01 public void transfer(
02 final Account from, final Account to, final int amount) {
03 new Atomic<Boolean>() {
04 public Boolean atomically() {
05 to.deposit(amount);
06 from.withdraw(amount);
07 return true;
08 }
09 }.execute();
10 }

现在让我们将之与Scala版本进行一下比较:


1 def transfer(from : Account, to : Account, amount : Int) = {
2 atomic {
3 to.deposit(amount)
4 from.withdraw(amount)
5 }
6 }

从上面的对比中我们可以清晰地看到,Scala版本的代码除了核心逻辑之外没有任何冗余。这又让我想起了Alan Perlis的名言:“如果用某种编程语言写代码时还需要注意一些与核心逻辑无关的东西,那么这个语言就是低级语言。”

截至目前,我们已经学习了如何用Akka创建事务以及如何组合嵌套事务,但我们才刚上路呢。下面我们将一起了解一下在Akka中如何对事务进行配置。 

上一篇:php获取用户当前所使用浏览器和操作系统


下一篇:动软中,在连接服务器时,出现“添加服务器配置失败,请检查是否有写入权限或文件是否存在“错误