`

Future 模式实现java 多线程异步调用

 
阅读更多

在多线程交互的中2,经常有一个线程需要得到另个一线程的计算结果,我们常用的是Future异步模式来加以解决。
Future顾名思意,有点像期货市场的“期权”,是“对未来的一种凭证”,例如当我们买了某个房地产开发商的期房,交钱之后,开发商会给我们一个凭证 (期权),这个凭证告诉我们等明年某个时候拿这个凭证就可以拿到我们所需要的房子,但是现在房子还没建好。市场上之所以有“期货”,也正由于有这种需求, 才有这种供给。

 

这种应用在GUI上用的比较多,在设计模式中一般称为“虚拟代理模式”。

 

例如:现在有个这样的需求,Client向Server提交一个Request(int count,char c),希望获取一个由count个字符c构造出来的字符串。比如发送Request(10,'K'),那么反馈字符串“KKKKKKKKKK”,但是我们 假设这个生成字符串的过程很费时间。

 

于是,为了获取比较好的交互性,我们的Server收到请求后,先构造一个FutureData,并把这个所谓的“期权(未来凭证)”反馈给 Client;于此同时,通过另一个并发线程去构造一个真正的字符串RealData,并在构造完毕后,RealData给FutureData报告一个 消息,说数据(期房)已经准备好了,此时Client可以通过期权拿到期房,但是假如我们的Client比较着急,还没等房子假好的时,就想要房子,怎么 办呢?这个时候我们可以阻塞Client所在的线程,让Client等待,直到最后RealData通知FutureData说房子好了,才返回。

这里的要点:

(1)Server先给Client一个“期权”,同时开一个线程去干活建房子(未来的“现房”);

(2)当“现房”RealData准备好了的时候,如何告诉FutureData说已经准备好了。(本处采用“回调过程”(借用观察者模式,来实现回调))

(3)如果客户比较着急,现房还没准备好的时候,就要取房,怎么办?  本处采用“阻塞”。

 

Data(公共数据接口)

 

Java代码  收藏代码
  1. package  com.umpay.future;  
  2.   
  3. public   interface  Data {  
  4.     public   abstract  String getContent();  
  5. }  

 

FutureData(期权)

 

Java代码  收藏代码
  1. package  com.umpay.future.extend;  
  2.   
  3. import  java.util.Observable;  
  4. import  java.util.Observer;  
  5.   
  6. import  com.umpay.future.Data;  
  7.   
  8. public   class  FutureData2  implements  Data,Observer {  
  9.   
  10.     /**   
  11.      * 存放真实数据,并且标志真正的数据是否已经准备完毕  
  12.      * 被多线程享受  
  13.      * 如果realData2==null,表示数据还准备好  
  14.      * */   
  15.     private   volatile  RealData2 realData2 =  null ;  
  16.     /**  
  17.      * 查看真正的数据是否准备完毕  
  18.      * */   
  19.     public   boolean  isFinished() {  
  20.         return  realData2 !=  null ;  
  21.     }  
  22.       
  23.     /**  
  24.      * 如果数据已经准备好,则返回真正的数据;  
  25.      * 否则,阻塞调用线程,直到数据准备完毕后,才返回真实数据;  
  26.      * */   
  27.     public  String getContent() {  
  28.         synchronized  (mutex) {  
  29.             while (!isFinished()) { //只要数据没有准备完毕,就阻塞调用线程   
  30.                 try  {  
  31.                     mutex.wait();  
  32.                 } catch  (InterruptedException e) {  
  33.                     e.printStackTrace();  
  34.                 }  
  35.             }  
  36.             return  realData2.getContent();  
  37.         }  
  38.     }  
  39.   
  40.     /**  
  41.      *  当 RealData2 准备完数据后,RealData2 应该通知 FutureData2 数据准备完毕。  
  42.      *  并在输入参数 realData 传入真实数据,在参数 event 传入事件(比如数据如期准备好了,或出了什么异常)  
  43.      *  
  44.      *  @param  realData    真实的数据  
  45.      *  @param  event       事件类型  
  46.      * */   
  47.     public   void  update(Observable realData, Object event) {  
  48.         System.out.println("通知...." +event);  
  49.         if (!(realData  instanceof  RealData2)) {  
  50.             throw   new  IllegalArgumentException( "主题的数据类型必须是RealData2" );  
  51.         }  
  52.         if (!(event  instanceof  String)) {  
  53.             throw   new  IllegalArgumentException( "事件的数据类型必须是String" );  
  54.         }  
  55.         synchronized  (mutex) {  
  56.             if (isFinished()) {  
  57.                 mutex.notifyAll();  
  58.                 return ; //如果数据已经准备好了,直接返回.   
  59.             }  
  60.             if ( "Finished" .equals(event)) {  
  61.                 realData2 = (RealData2)realData;//数据准备好了的时候,便可以通知数据准备好了   
  62.                 mutex.notifyAll();//唤醒被阻塞的线程   
  63.             }   
  64.         }  
  65.     }  
  66.   
  67.     private  Object mutex =  new  Object();  
  68. }  

 

RealData(实际数据)

 

Java代码  收藏代码
  1. package  com.umpay.future.extend;  
  2.   
  3. import  java.util.Observable;  
  4.   
  5. import  com.umpay.future.Data;  
  6.   
  7. public   class  RealData2  extends  Observable  implements  Data {  
  8.   
  9.     private  String content;  
  10.   
  11.     public  RealData2() {  
  12.           
  13.     }  
  14.       
  15.     public   void  createRealData2( int  count,  char  c) {  
  16.         System.out.println("        making RealData("  + count +  ", "  + c  
  17.                 + ") BEGIN" );  
  18.         char [] buffer =  new   char [count];  
  19.         for  ( int  i =  0 ; i < count; i++) {  
  20.             buffer[i] = c;  
  21.             try  {  
  22.                 Thread.sleep(100 );  
  23.             } catch  (InterruptedException e) {  
  24.             }  
  25.         }  
  26.         System.out.println("        making RealData("  + count +  ", "  + c  
  27.                 + ") END" );  
  28.         this .content =  new  String(buffer);  
  29.           
  30.         //真实数据准备完毕了,通知FutureData2说数据已经准备好了.   
  31.         setChanged();//必须先设置本对象的状态发生了变化,并且通知所有的观察者   
  32.         notifyObservers("Finished" );  
  33.     }  
  34.       
  35.   
  36.     public  String getContent() {  
  37.         return  content;  
  38.     }  
  39. }  

 

 

服务端代码:

 

Java代码  收藏代码
  1. package  com.umpay.future.extend;  
  2.   
  3. import  com.umpay.future.Data;  
  4.   
  5. public   class  HostServer2 {  
  6.   
  7.     public  Data request( final   int  count,  final   char  c) {  
  8.         System.out.println("    request("  + count +  ", "  + c +  ") BEGIN" );  
  9.   
  10.         // (1) 建立FutureData的实体   
  11.         final  FutureData2 future2 =  new  FutureData2();  
  12.   
  13.         // (2) 为了建立RealData的实体,启动新的线程   
  14.         new  Thread() {  
  15.             public   void  run() {  
  16.                 RealData2 realdata2 = new  RealData2();  
  17.                 realdata2.addObserver(future2);//以便当RealData2把数据准备完毕后,通过该回调口子,通知FutureData2表示数据已经贮备好了   
  18.                 realdata2.createRealData2(count, c);  
  19.             }  
  20.         }.start();  
  21.   
  22.         System.out.println("    request("  + count +  ", "  + c +  ") END" );  
  23.   
  24.         // (3) 取回FutureData实体,作为传回值   
  25.         return  future2;  
  26.     }  
  27.   
  28. }  

 

客户端代码:

 

Java代码  收藏代码
  1. package  com.umpay.future;  
  2.   
  3. import  com.umpay.future.extend.HostServer2;  
  4.   
  5. public   class  MainClient {  
  6.     public   static   void  main(String[] args) {  
  7. //      testHostServer();   
  8.         testHostServer2();  
  9.     }  
  10.       
  11.     static   void  testHostServer() {  
  12.         System.out.println("main BEGIN" );  
  13.         HostServer hostServer = new  HostServer();  
  14.         Data data1 = hostServer.request(10 'A' );  
  15.         Data data2 = hostServer.request(20 'B' );  
  16.         Data data3 = hostServer.request(30 'C' );  
  17.   
  18.         System.out.println("main otherJob BEGIN" );  
  19. //        try {   
  20. //            Thread.sleep(2000);   
  21. //        } catch (InterruptedException e) {   
  22. //        }   
  23.         System.out.println("main otherJob END" );  
  24.   
  25.         System.out.println("data1 = "  + data1.getContent());  
  26.         System.out.println("data2 = "  + data2.getContent());  
  27.         System.out.println("data3 = "  + data3.getContent());  
  28.         System.out.println("main END" );  
  29.   
  30.     }  
  31.   
  32.     static   void  testHostServer2() {  
  33.         System.out.println("main BEGIN" );  
  34.         HostServer2 hostServer2 = new  HostServer2();  
  35.         Data data1 = hostServer2.request(10 'A' );  
  36.         Data data2 = hostServer2.request(20 'B' );  
  37.         Data data3 = hostServer2.request(30 'C' );  
  38.   
  39.         System.out.println("main otherJob BEGIN" );  
  40. //        try {   
  41. //            Thread.sleep(2000);   
  42. //        } catch (InterruptedException e) {   
  43. //        }   
  44.         System.out.println("main otherJob END" );  
  45.   
  46.         System.out.println("data1 = "  + data1.getContent());  
  47.         System.out.println("data2 = "  + data2.getContent());  
  48.         System.out.println("data3 = "  + data3.getContent());  
  49.         System.out.println("main END" );  
  50.   
  51.     }  
  52. }  

 

分享到:
评论

相关推荐

    C++11中多线程编程-std::async的深入讲解

    1、std::async会自动创建线程去调用线程函数,相对于低层次的std::thread,使用起来非常方便; 2、std::async返回std::future对象,通过返回的std::future对象我们可以非常方便的获取到线程函数的返回结果; 3、std:...

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第二阶段38讲、多线程Active Objects设计模式(接受异步消息的主动对象)-上.mp4 │ 高并发编程第二阶段39讲、多线程Active Objects设计模式(接受异步消息的主动对象)-中.mp4 │ 高并发编程第二阶段40...

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第二阶段38讲、多线程Active Objects设计模式(接受异步消息的主动对象)-上.mp4 │ 高并发编程第二阶段39讲、多线程Active Objects设计模式(接受异步消息的主动对象)-中.mp4 │ 高并发编程第二阶段40...

    精通并发与 netty 视频教程(2018)视频教程

    Channel选择器工厂与轮询算法及注册底层实现 72_Netty线程模型深度解读与架构设计原则 73_Netty底层架构系统总结与应用实践 74_Netty对于异步读写操作的架构思想与观察者模式的重要应用 75_适配器模式与模板方法模式...

    使用JavaFX并发实现多人聊天室.txt

    为了实现这个功能,我们使用了JavaFX的`ExecutorService`和`Future`类来异步地发送消息,并使用了一个独立的线程来监听用户输入的消息。 具体来说,在`ChatApplication`类的`start()`方法中,我们首先创建了一个...

    Java并发编程实战

    本书深入浅出地介绍了Java线程和并发,是一本完美的Java并发参考手册。书中从并发性和线程安全性的基本概念出发,介绍了如何使用类库提供的基本并发构建块,用于避免并发危险、构造线程安全的类及验证线程安全的规则...

    精通并发与netty视频教程(2018)视频教程

    20_通过Apache Thrift实现Java与Python的RPC调用 21_gRPC深入详解 22_gRPC实践 23_Gradle Wrapper在Gradle项目构建中的最佳实践 24_gRPC整合Gradle与代码生成 25_gRPC通信示例与JVM回调钩子 26_gRPC服务器流式调用...

    精通并发与netty 无加密视频

    第20讲:通过Apache Thrift实现Java与Python的RPC调用 第21讲:gRPC深入详解 第22讲:gRPC实践 第23讲:Gradle Wrapper在Gradle项目构建中的最佳实践 第24讲:gRPC整合Gradle与代码生成 第25讲:gRPC通信示例与...

    简单介绍Python的Tornado框架中的协程异步实现原理

    Tornado 4.0 已经发布了很长一段时间了, 新版本广泛的应用了协程(Future)特性. 我们目前已经将 Tornado 升级到最新版本, 而且也大量的使用协程特性. 很长时间没有更新博客, 今天就简单介绍下 Tornado 协程实现原理, ...

    java核心知识点整理.pdf

    25 JAVA8 与元数据.................................................................................................................................25 2.4. 垃圾回收与算法 .................................

    JAVA核心知识点整理(有效)

    25 JAVA8 与元数据.................................................................................................................................25 2.4. 垃圾回收与算法 .................................

    开涛高可用高并发-亿级流量核心技术

    13.2 异步Future 252 13.3 异步Callback 253 13.4 异步编排CompletableFuture 254 13.5 异步Web服务实现 257 13.6 请求缓存 259 13.7 请求合并 261 14 如何扩容 266 14.1 单体应用垂直扩容 267 14.2 单体应用水平扩...

Global site tag (gtag.js) - Google Analytics