`
gh_aiyz
  • 浏览: 39450 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Missian指南六:异步客户端使用指南

阅读更多

重要:Missian刚刚更新到0.31,新增了Future风格的回调方式。

 

Missian没有绑定spring,但是强烈推荐配合spring一起使用。异步客户端由于需要调用BeanLocator去寻找回调的 Bean,如果配合Spring使用,可以直接使用SpringLocator(BeanLocator的唯一实现),否则需要自己实现。

 

使用异步客户端需要注意一点:由于是异步调用,所以一个远程方法的返回值永远是null(如果不是void的话)或者是原生数据类型的默认值。一段时间后(比如100毫秒)后客户端收到这个返回值,会去找到相应的回调对象进行调用。

 

异步的优势是:在调用的期间我们不需要像同步调用一样有一个线程一直在等着它的返回值,而是调用完即可返回释放线程,当客户端接受到返回值后会进行 回调,业务流程可以继续往下执行。不要小看这个等待的时间,假如A服务调用了一个跨机房的服务或者一个重型的服务B,那么B的响应时间可能是100毫秒甚 至更多,那么可以想象在高并发的情况下,可能A服务的全部线程都耗死在无穷的等待上了。

 

我们还是先看看如何配合Spring来使用Missian异步客户端。

 

步骤一:给Hello.hello(String, int)创建一个回调类

注意和0.2x相比,这里有比较大的不同:

public class HelloCallback {
	public void hello(String returnValue) {
		System.out.println(returnValue);
	}
}

这个类的方法要和Hello接口的方法一一对应,Hello中所有方法(除了返回值为void的方法)都应该有一个回调方法,回调方法名和Hello接口中对应的方法名一样,而且只接受一个参数,参数类型和对应方法的返回值一致。

 

例如,Hello有一个hello(String, int)方法的返回值是String类型,那么要求HelloCallback必须有一个hello(String)的方法。

 

 

步骤二:修改Hello接口,用注解的方法声明回调Bean

这里和0.21前的版本也有所不同,以前这个注解是用在方法上的,现在直接用在接口上,所以一个接口只需要注解一次了。

@CallbackTarget("helloCallback")
public interface Hello {
	public String hello(String name, int age);
}

 

步骤三:在Spring配置文件中配置这个回调Bean

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
	<!-- your callback bean, missian client will invoke its execute() method when received the returned object -->
	<bean id="helloCallback" class="com.missian.example.bean.HelloCallback">
	</bean>
</beans>

 

步骤四:在Spring中创建AsyncMissianProxyFactory

	<bean id="asyncMissianProxyFactory" class="com.missian.client.async.AsyncMissianProxyFactory" init-method="init" destroy-method="destroy">
		<constructor-arg >
			<bean class="com.missian.common.beanlocate.SpringLocator"/>
		</constructor-arg>
	</bean>

这里我们使用的是AsyncMissianProxyFactory的最简单的构造函数,只接受一个BeanLocator。这时候默认创建一个4 个线程的线程池用来处理回调逻辑,1个线程用来处理IO,需要指定线程数,或者将一个已经存在的线程池传入,可以参考其它几个构造函数:

public AsyncMissianProxyFactory(BeanLocator callbackLoacator, ExecutorService threadPool,  int callbackIoProcesses, boolean logBeforeCodec, boolean logAfterCodec, NetworkConfig networkConfig) {}
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, ExecutorService threadPool,  int callbackIoProcesses, boolean logBeforeCodec, boolean logAfterCodec){}
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, ExecutorService threadPool) {}
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, int threadPoolSize, int callbackIoProcesses, boolean logBeforeCodec, boolean logAfterCodec) {}
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, ExecutorService threadPool, NetworkConfig networkConfig) {}
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, int threadPoolSize, int callbackIoProcesses, boolean logBeforeCodec, boolean logAfterCodec, NetworkConfig networkConfig) {}
public AsyncMissianProxyFactory(BeanLocator callbackLoacator, int threadPoolSize){}

假如在服务器里使用Missian客户端,可以考虑将服务器主线程池传入给AsyncMissianProxyFactory,共享线程池。 

 

步骤五:实现异步调用

 

	public static void main(String[] args) throws IOException {
		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("com/missian/example/client/async/withspring/applicationContext-*.xml");
		//actually you can inject AsyncMissianProxyFactory into any other beans to use it.
		//we just show how AsyncMissianProxyFactory works here.
		AsyncMissianProxyFactory asyncMissianProxyFactory = (AsyncMissianProxyFactory)context.getBean("asyncMissianProxyFactory");
		Hello hello = (Hello)asyncMissianProxyFactory.create(Hello.class, "tcp://localhost:1235/hello");
		long time = System.currentTimeMillis();
		for(int i=0; i<10000; i++) {
			hello.hello("gg", 25);	
		}
		System.out.println(System.currentTimeMillis()-time);
	}

 你可以清楚地看到,所有的请求都发送出去之后,返回值陆续返回并回掉了HelloCallback。

和同步的客户端一样,可以使用http协议发送数据:

Hello hello = (Hello)asyncMissianProxyFactory.create(Hello.class, "http://localhost:1235/hello");

但目前比较遗憾的是,还不能够支持异步调用Hessian服务。  

另外需要说明的是,这个直接从Context里面取出AsyncMissianProxyFactory只是用来演示异步调用的用法;正常的做法应该是将AsyncMissianProxyFactory注入到我们需要使用它的Bean。


===============0.31.新增功能分割线===================

 

如何为重载方法都实现回调?

比如以下两个方法都需要回调:

public interface Hello {
    String hello(String name, int age, String country);
    String hello(String name, int age);
}

 按照上面所说的,他们的回调方法都映射到:

void hello(String);

 这样会造成回调错误,因此需要使用一个注解来说明回调方法名:

@CallbackTarget("helloCallback")
public interface Hello {
	@CallbackTargetMethod("hello0")
	public String hello(String name, int age, String country);

	@CallbackTargetMethod("hello1")
	public String hello(String name, int age);
}

 对应的,回调类的实现:

public class HelloCallback {
	public void hello0(String returnValue) {
		System.out.println(returnValue);
	}
	public void hello1(String returnValue) {
		System.out.println("hello1:"+returnValue);
	}
	
}

 注意如果不使用注解,系统寻找默认的方法。注解同样也可以用于非重载的方法。

 

另外一种回调的实现

如果不希望使用注解,那么还有另外一种方式可供选择:

如果服务器端的方法是:

String hello(String name, int age);

 那么客户端的接口可以写成(注意,Missian不要求服务器端和客户端使用同一个接口类,甚至接口名都可以不同,而只要求方法名及参数必须匹配):

public interface Hello {
	public String hello(String name, int age, Callback cb);
}

 调用时:

Hello hello = (Hello)factory.create(Hello.class, "http://localhost:1235/hello");
Callback cb = ......
hello.hello("name", 80, cb);

 即可以异步调用成功。

 

Future风格的异步实现

我个人非常喜欢Future这种方法,在Mina中就有大量的使用。同样Missian也提供了这样一个能力。提供了一个AysncFuture,即可以通过get()变成同步,也可以通过addListner()来监听,一旦返回值到达,就会出发监听器。

 

如果服务器端的方法是:

String hello(String name, int age);

 那么客户端的接口可以写成(注意,Missian不要求服务器端和客户端使用同一个接口类,甚至接口名都可以不同,而只要求方法名及参数必须匹配):

public interface Hello {
	public AysncFuture<String> hello(String name, int age, Class<String> returnType);
}

 调用时:

Hello hello = (Hello)factory.create(Hello.class, "http://localhost:1235/hello");
Async<String> future = hello.hello("name", 80, String.class);

 如果想阻塞直到数据返回,那么:

String value = future.get();
System.out.println(value);

 如果想通过监听器实现事件驱动:

AsyncListener listener = ....
future.addListener(listener);
 

 

 

 

 

 

 

分享到:
评论
8 楼 sooxin 2012-07-23  
沉年老东西,问题一堆,实在不敢用。
放入tomcat
警告: EXCEPTION :
java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:474)
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:117)
at org.apache.mina.core.buffer.AbstractIoBuffer.get(AbstractIoBuffer.java:492)
at com.missian.common.io.IoBufferInputStream.read(IoBufferInputStream.java:44)
at com.caucho.hessian.io.HessianInputFactory.readHeader(HessianInputFactory.java:74)
at com.missian.server.handler.MissianSkeleton.invoke(MissianSkeleton.java:142)
at com.missian.server.handler.MissianHandler.messageReceived(MissianHandler.java:61)
7 楼 shuibingfy 2011-01-17  
shuibingfy 写道
请教下楼主,如果用Future方式,用get同步调用,如何设置超时时间呢?
如果Future方式,又在哪里设置是否长连接呢?

哦,明白了
用get(long timeout, TimeUnit unit)就行了吧
至于异步的长连接问题,是不是只要不调用AsyncMissianProxyFactory.destroy(),就可以认为是长连接?
6 楼 shuibingfy 2011-01-14  
请教下楼主,如果用Future方式,用get同步调用,如何设置超时时间呢?
如果Future方式,又在哪里设置是否长连接呢?
5 楼 zhangcheng 2011-01-04  
楼主搞的这个东西,我最近也有这个打算。我们的一个项目中使用了hessian,但是同步的http请求,效率实在是不怎么样。所以打算用mina作为服务器端,但是又不想改变客户端的调用方式,用stub的rpc。所以楼主将这两个好项目结合在一起,很不错。决定试用一下。
4 楼 gh_aiyz 2010-12-15  
yanwt 写道
我不是这个意思,我是说我一次发了50个请求,如:
  for (int i = 0; i < 50; i++) {
            AsyncFuture<String> future = async.asyncHello(time + "gg", i, String.class);
            resultList.add(future);
            System.out.println("testasync:"+System.currentTimeMillis());
        }
后台HelloImpl中的
public String asyncHello(String name, int age) {
        try {
            Thread.sleep(5000);
            return "hi, " + name + ", " + age;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
还是一个一个同步执行的。这样耗时是50*5000,使用异步调用应该是大大减小这个时间才对
如果我使用spring的@Async注解,又不能使用hessian接口,FutureTask不能被序列化,比较郁闷啊。

为什么啊要Sleep5秒钟呢?后台应该是用一个线程池来处理这些请求,对于单个请求是同步的,但是这一批请求应该是并发处理的。后台你用的是Missian的Server吗?有需要可以加我MSN,在线讨论一下,效率高些。gh_aiyz#hotmail.com
3 楼 yanwt 2010-12-15  
我不是这个意思,我是说我一次发了50个请求,如:
  for (int i = 0; i < 50; i++) {
            AsyncFuture<String> future = async.asyncHello(time + "gg", i, String.class);
            resultList.add(future);
            System.out.println("testasync:"+System.currentTimeMillis());
        }
后台HelloImpl中的
public String asyncHello(String name, int age) {
        try {
            Thread.sleep(5000);
            return "hi, " + name + ", " + age;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
还是一个一个同步执行的。这样耗时是50*5000,使用异步调用应该是大大减小这个时间才对
如果我使用spring的@Async注解,又不能使用hessian接口,FutureTask不能被序列化,比较郁闷啊。
2 楼 gh_aiyz 2010-12-15  
fs.get();这种方式是同步的,文档中已有说明,异步可以用以下方式:
1、用annotation标注回调类及回调方法
2、在客户端方法最后一个参数传入Callback对象
3、使用AsyncFuture.addLisnter();

fs.get()是同步的,这符合所有Future操作的原义。
1 楼 yanwt 2010-12-15  
刚试用了一下,感觉请求是异步发送的,但服务还是同步调用的,一个处理完了才会处理下一个。
测试代码如下:
HelloAsync async = (HelloAsync) asyncMissianProxyFactory.create(HelloAsync.class, "tcp://localhost:1235/hello");
        long time = System.currentTimeMillis();
        List<AsyncFuture<String>> resultList = new ArrayList<AsyncFuture<String>>();

        for (int i = 0; i < 50; i++) {
            AsyncFuture<String> future = async.asyncHello(time + "gg", i, String.class);
            resultList.add(future);
            System.out.println("testasync:"+System.currentTimeMillis());
        }
        for (AsyncFuture<String> fs : resultList) {
                System.out.println(fs.get());     //打印各个线程(任务)执行的结果
        }

相关推荐

    missian:一个java RPC框架,无模式风格

    弥赛亚 一个java RPC框架,无模式风格

    Spring集成ActiveMQ配置

    Spring 集 成ActiveMQ 配置 异步RPC框架 Missian ActiveMq-JMS简单实例使用tomcat

    Toxi / Oxy Pro 便携式气体检测仪参考手册 使用说明书

    Toxi Oxy Pro 便携式气体检测仪参考手册 使用说明书

    科傻模拟网优化操作-教程书

    官方的的说明书资料,部分视频说明在这里: https://www.bilibili.com/video/BV1Fz4y1d7rn/?spm_id_from=333.999.0.0&vd_source=13dc65dbb4ac9127d9af36e7b281220e

    node-v8.14.0-x64.msi

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    2023商业银行数据资产体系白皮书,主要介绍了“三位一体”数据资产体系的构成与工作机制,以及商业银行数据资产体系建设实践

    2023商业银行数据资产体系白皮书 目录 第 1 章 数据资产化与数据要素市场化相辅相成,相互促进 第 2 章 数据资产化是企业数据治理向上演进的必经之路 第 3 章 数据资产体系发展概述 第 4 章 “三位一体”数据资产体系的构思 4.1“三位一体”数据资产体系的构成与工作机制 数据资产管理 数据资产运营 数据资产评价 数据资产体系工作机制 4.2“三位一体”数据资产体系的相互作用关系 4.3“三位一体”数据资产体系的构建 4.4“三位一体”数据资产体系的优势 第 5 章 商业银行数据资产体系建设实践 5.1商业银行开展数据资产体系建设的背景和目标 5.2商业银行数据资产体系建设的工作步骤 5.3上海银行数据资产体系建设实践的主要成果 第 6 章 数据要素流通市场赋能企业数据资产化 6.1全国多层次数据要素市场的建设 6.2上海数据交易所赋能企业数据资产化 6.3数据要素流通交易市场赋能企业数据资产化的展望 第 7 章 未来演进与展望

    基于微信小程序的助农扶贫小程序

    大学生毕业设计、大学生课程设计作业

    车辆销售数据Python爬取并做数据分析,项目源码注解清晰一看就懂.zip

    车辆销售数据Python爬取并做数据分析,项目源码注解清晰一看就懂

    毕业设计:基于SSM的mysql-学生社团管理系统(源码 + 数据库 + 说明文档)

    毕业设计:基于SSM的mysql_学生社团管理系统(源码 + 数据库 + 说明文档) 第2章 主要技术和工具介绍 1 2.1 JSP语言 1 2.2 MySQL数据库 1 2.3 jsp技术 2 2.4ssm简介 3 第3章 系统分析 1 3.1可行性分析 1 3.1.1经济可行性 1 3.1.2技术可行性 1 3.1.3操作可行性 1 3.2需求分析 1 3.3业务流程分析 2 3.4数据流程分析 3 第4章 系统设计 5 4.1系统结构设计 5 4.2功能模块设计 5 4.3数据库设计 6 4.3.1数据库设计概述 6 4.3.1概念设计 6 4.3.2表设计 7 第5章 系统实现 15 5.1基本任务 15 5.2登录模块的实现 15 5.2.1首页实现 15 5.2.2管理员后台登录 16 5.3用户模块的实现 19 5.3.1注册模块及登录的实现 19 5.2.2入团模块的实现 21 5.2.3场地预约模块的实现 22 5.4管理员模块的实现 24 5.4.1系统用户管理模块的实现 24 5.4.2活动公告管理模块的实现 26 5.5社团模块的实现 28 5.5.1活动信息

    大健康零售业务O2O数字化战略规划方案.pptx

    大健康零售业务O2O数字化战略规划方案.pptx

    数据中台项目主要岗位及其职责和任务

    数据中台项目主要岗位及其职责和任务

    node-v8.0.0-linux-armv7l.tar.gz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    流程制造行业数字化智能工厂总体规划建设方案.pptx

    流程制造行业数字化智能工厂总体规划建设方案.pptx

    c语言学生成绩管理系统源码.zip

    c语言学生成绩管理系统源码.zip

    DEV-C++-5.11下载链接

    DEV-C++-5.11下载链接

    电器租赁小程序.zip

    电器租赁小程序.zip

    学生成绩管理系统 数据结构与算法课程设计 C++.zip

    学生成绩管理系统 数据结构与算法课程设计 C++

    知乎小程序算法.zip

    知乎小程序算法.zip

    基于R语言SIR传染病传播的SIR模型,很全,可直接应用仿真模拟.rar

    基于R语言SIR传染病传播的SIR模型,很全,可直接应用仿真模拟.rar

    node-v6.13.0.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

Global site tag (gtag.js) - Google Analytics