软件事务内存导论(七)阻塞事务

阻塞事务——有意识地等待

我们经常会遇到这样一种情况,即某事务T能否成功完成依赖于某个变量是否发生了变化,并且由于这种原因所引起的事务运行失败也可能只是暂时性的。作为对这种暂时性失败的响应,我们可能会返回一个错误码并告诉事务T等待一段时间之后再重试。然而在事务T等待期间,即使其他任务已经更改了事务T所依赖的数据,事务T也没法立即感知到并重试了。为了解决这一问题,Akka为我们提供了一个简单的工具——retry(),该函数可以先将事务进行回滚,并将事务置为阻塞状态直到该事物所依赖的引用对象发生变化或事务阻塞的时间超过了之前配置的阻塞超时为止。我本人更愿意将这一过程称为“有意识地等待”,因为这种说法听起来比“阻塞”更合适一些。下面让我们将阻塞(或有意识地等待)用于下面的两个例子当中。


在Java中阻塞事务

程序员一般都会对咖啡因上瘾,所以加班的时候任何主动要去拿些咖啡回来喝的人都知道不能空手而归。但是这个拿咖啡的人很聪明,他没有忙等(busy wait)至咖啡罐被重新填满,而是在Akka的帮助下给自己设置了一个消息提醒,一旦咖啡罐有变化他就能收到这个通知。下面让我们用retry()来实现这个可以有意识等待的fillCup()函数。


01 public  class  CoffeePot  {
02     private  static  final  long  start  =  System.nanoTime();
03     private  static  final  Ref<Integer>  cups  =  new  Ref<Integer>(24);
04     private  static  void  fillCup(final  int  numberOfCups)  {
05         final  TransactionFactory  factory  =
06             new  TransactionFactoryBuilder()
07             .setBlockingAllowed(true)
08             .setTimeout(new  DurationInt(6).seconds())
09             .build();
10         new  Atomic<Object>(factory)  {
11             public  Object  atomically()  {
12                 if(cups.get()  <  numberOfCups)  {
13                     System.out.println("retry........  at  "  +
14                         (System.nanoTime()  -  start)/1.0e9);
15                     retry();
16                 }
17                 cups.swap(cups.get()  -  numberOfCups);
18                 System.out.println("filled  up...."  +  numberOfCups);
19                 System.out.println("........  at  "  +
20                     (System.nanoTime()  -  start)/1.0e9);
21                 return  null;
22             }
23         }.execute();
24     }

在fillCup()函数中,我们将事务配置成blockingAllowed,并将事务完成的超时时间设为6秒。当发现当前没有足够数量的咖啡时,fillCups()函数没有简单地返回一个错误码,而是调用了StmUtil的retry()函数进行有意识地等待。这将使得当前事务进入阻塞状态,直到与之相关的cups引用发生变化为止。一旦有任何相关的引用发生改变,系统将启动一个新事务将之前包含retry的原子性代码进行重做。

下面让我们通过调用fillCup()函数来观察retry()的实际效果:


01     public  static  void  main(final  String[]  args)  {
02         final  Timer  timer  =  new  Timer(true);
03         timer.schedule(new  TimerTask()  {
04             public  void  run()  {
05                 System.out.println("Refilling....  at  "  +
06                     (System.nanoTime()  -  start)/1.0e9);
07                 cups.swap(24);
08             }
09         },  5000);
10         fillCup(20);
11         fillCup(10);
12         try  {
13             fillCup(22);
14         catch(Exception  ex)  {
15             System.out.println("Failed:  "  +  ex.getMessage());
16         }
17     }
18 }

在main()函数中,我们启动了一个每隔大约5秒就往咖啡壶重新装填咖啡的定时任务。随后,第一个跑去拿咖啡的同事A立即取走了20杯咖啡。紧接着,当我们这边自告奋勇去取咖啡的同事B想再取走10杯咖啡时,他的动作将被阻塞直至重新装填任务完成为止,而这种等待要比不断重试的方案高效得多。重新装填的事务完成之后,同事B的请求将被自动重试,而这一次他的请求成功完成了。如果重新装填任务没有在我们设定的超时时间内发生,则请求咖啡的事务将会失败,在上例的try代码块中的那个请求就属于这种情况。我们可以通过输出日志来观察到这一行为,同时也可以更深入地体会到retry()为我们带来的便利:


filled  up....20
........  at  0.423589
retry........  at  0.425385
retry........  at  0.427569
Refilling....  at  5.130381
filled  up....10
........  at  5.131149
retry........  at  5.131357
retry........  at  5.131521
Failed:  Transaction  DefaultTransaction  has  timed  with  a
total  timeout  of  6000000000  ns


从上述输出结果中我们可以看到,第一个倒20杯咖啡的请求是在程序开始运行之后0.4秒左右完成的。而第一个请求完成之后,咖啡壶里就仅剩余4杯咖啡了,所以第二个倒10杯咖啡的请求就只能被阻塞,直到程序运行至5秒左右时重新装填的任务完成为止。在重新装填的任务完成之后,倒10杯咖啡的事务被重新启动,并在程序运行到5秒多一点的时候成功完成。最后一个倒22杯咖啡的任务则由于在规定的超时时间内没有再次发生重新装填而以失败告终。

其实我们在日常工作中并不会经常用到retry(),只有当程序逻辑需要执行某些操作、而这些操作又依赖于某些相关数据发生变化的情况下,我们才能受益于这个监控数据变化的特性。

在Scala中阻塞事务

在上面Java版的示例中,我们使用了一个提供了很多STM相关的便利接口的StmUtils对象。而在Scala中,我们可以直接使用StmUtils里提供的各种特性(trait)。此外,我们在Scala中同样可以用工厂方法来创建TransactionFactory。


01 object  CoffeePot  {
02     val  start  =  System.nanoTime()
03     val  cups  =  Ref(24)
04     def  fillCup(numberOfCups  :  Int)  =  {
05         val  factory  =  TransactionFactory(blockingAllowed  =  true,
06             timeout  =  6  seconds)
07         atomic(factory)  {
08             if(cups.get()  <  numberOfCups)  {
09                 println("retry........  at  "  +  (System.nanoTime()  -  start)/1.0e9)
10                 retry()
11             }
12             cups.swap(cups.get()  -  numberOfCups)
13             println("filled  up...."  +  numberOfCups)
14             println("........  at  "  +  (System.nanoTime()  -  start)/1.0e9)
15         }
16     }
17     def  main(args  :  Array[String])  :  Unit  =  {
18         val  timer  =  new  Timer(true)
19         timer.schedule(new  TimerTask()  {
20             def  run()  {
21                 println("Refilling....  at  "  +  (System.nanoTime()  -  start)/1.0e9)
22                 cups.swap(24)
23             }
24         },  5000)
25         fillCup(20)
26         fillCup(10)
27         try  {
28             fillCup(22)
29         catch  {
30             case  ex  =>  println("Failed:  "  +  ex.getMessage())
31         }
32     }
33 }

在创建TransactionFactory对象时,我们并没有直接使用DurationInt来配置事务的超时时间,而是用intToDurationInt()函数来完成从int到DurationInt的隐式转换。通过Scala隐式转换所带来的语法上的便利,我们在初始化TransactionFactory对象时只需简单调用6 seconds即可。示例代码中余下的部分就只是从Java到Scala的一个简单的翻译而已,其最终的结果输出如下所示:


filled  up....20
........  at  0.325964
retry........  at  0.327425
retry........  at  0.329587
Refilling....  at  5.105191
filled  up....10
........  at  5.106074
retry........  at  5.106296
retry........  at  5.106466
Failed:  Transaction  DefaultTransaction  has  timed  with  a
total  timeout  of  6000000000  ns


上一篇:PostgreSQL 10.1 手册_部分 IV. 客户端接口_第 33 章 libpq - C 库_33.11. 杂项函数


下一篇:分布式事务中间件 Fescar - 全局写排它锁解读