# paradise **Repository Path**: flyme2020/paradise ## Basic Information - **Project Name**: paradise - **Description**: thrift showcase - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2016-02-29 - **Last Updated**: 2022-05-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README #Thrift Showcase ### 1.Thrift服务注册和服务发现 一般来说,一个端口上会启动一个服务。启动服务的同时会在zookeeper集群中注册服务,服务的路径为: > /root/namespace/serviceName/serverIP:port:weight 解释说明: - root : 表示在zk 上注册的服务的根路径,这里默认为rpc。 - namespace :表示服务注册方的应用名称 - serviceName : 表示注册的服务名字,一般为服务接口的类的全限定名 - serverIP:服务所在服务器的ip - port :提供服务的端口 - weight :服务的权重,用来负载均衡 比如这样的一个服务: > /rpc/com.usoft.thrift.example/com.usoft.thrift.example.GreetServiceRpc/1.0.0/10.255.23.246:9000:1 服务启动并注册服务后,服务消费方需要发现服务,根据什么来发现服务呢?根据服务名称serviceName,zk的客户端获取该节点下的服务注册信息。 获取到服务注册信息后(地址和端口),就会构造Thrift的客户端请求服务。 - [服务注册代码](http://git.oschina.net/xinxingegeya/Paradise/blob/master/thrift-server/src/main/java/com/usoft/thrift/server/ThriftServerAddressRegisterZookeeper.java?dir=0&filepath=thrift-server%2Fsrc%2Fmain%2Fjava%2Fcom%2Fusoft%2Fthrift%2Fserver%2FThriftServerAddressRegisterZookeeper.java&oid=afffebadf1e82d7c88b999a2e08f1e0182244c04&sha=5a71ff22540bb6085b5afa4cd47177e6ae17991e) - [服务发现代码](http://git.oschina.net/xinxingegeya/Paradise/blob/master/thrift-client/src/main/java/com/usoft/thrift/client/provider/ThriftServerAddressProviderZookeeper.java?dir=0&filepath=thrift-client%2Fsrc%2Fmain%2Fjava%2Fcom%2Fusoft%2Fthrift%2Fclient%2Fprovider%2FThriftServerAddressProviderZookeeper.java&oid=ad6a662781d0db308b640e5ac2873511df2043a5&sha=5a71ff22540bb6085b5afa4cd47177e6ae17991e) 当zk集群中注册的服务信息发生变化时,会触发服务发现的zk客户端重新缓存并构建服务的地址、端口、权重等信息。 ### 2.Thrift客户端的服务化改造 当我们知道了服务的地址和端口时,就可以这样来构造一个thrift客户端来请求服务,如下代码, ``` public class ThriftClient { public static void main(String[] args) throws Exception { TTransport transport = new TSocket("localhost", 8111); TProtocol protocol = new TCompactProtocol(transport); // ContactService.Client client = new ContactService.Client.Factory() // .getClient(protocol); ContactService.Client client = new ContactService.Client(protocol); transport.open(); List list = client.getAll(); System.out.println(list); client.save( new Contact(1, "zhangpu", Calendar.getInstance().getTimeInMillis(), "1389612222", "192.168.2.1", null)); transport.close(); } } ``` 一般来说构造thrift客户端的步骤大致分这几步: 1. 定义Transport,这里使用 TSocket,服务的地址和端口分别为localhost 和 8111 1. 定义 Protocol,这里使用 TCompactProtocol 1. 定义服务的客户端 ContactService.Client 1. 打开transport ,并请求服务。 这几步大致是固定的,我们可以把这几个步骤抽象一下,使用动态代理的方式请求服务,从而简化服务调用方的代码。 有下面这样一个 IDL 文件, ``` namespace java com.usoft.thrift.example service GreetServiceRpc{ string greet(string msg); } ``` 生成的service接口类 GreetServiceRpc - Iface(内部类) --------> Client (内部类,实现Iface接口) Iface 接口是GreetServiceRpc的内部类,Client类就是我们上边刚讲的thrift的客户端的实现,同时实现Iface接口。 那么现在我们不使用Client来调用远程服务,而是使用Iface接口直接来调用本地的服务方法,从而屏蔽和远程服务的通信细节,实现透明化的rpc调用。 实现原理就是当使用 Iface 接口直接调用本地服务方法时,使用动态代理来请求远程服务。动态代理的关键代码如下, ``` proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 获取池化的client对象 TServiceClient client = pool.borrowObject(); boolean flag = true; try { return method.invoke(client, args); } catch (Exception e) { flag = false; throw e; } finally { if (flag) { pool.returnObject(client); } else { pool.invalidateObject(client); } } } }); ``` 通过动态代理,我们请求一个远程服务时,就像请求本地服务一样, ``` GreetServiceRpc.Iface greetService = (GreetServiceRpc.Iface) context .getBean("greetService"); String resp = greetService.greet("hello world"); ``` 就这样简化了远程服务的调用过程。 ### 3.Thrift服务器端服务的启动 我们大多使用的Spring IoC容器。为了简化服务的启动,通过IoC 实例化bean的时候启动服务,让我们的bean实现 InitializingBean 接口,在 afterPropertiesSet 方法中实现启动服务和注册服务的逻辑。 首先来看一下thrift 服务器端启动的一个大致的固定的过程, ``` TServerSocket serverSocket = new TServerSocket(8111); ContactService.Processor processor = new ContactService.Processor( new ContactServiceImpl()); TCompactProtocol.Factory factory = new TCompactProtocol.Factory(); Args ag = new Args(serverSocket); ag.protocolFactory(factory); ag.processor(processor); TServer server = new TSimpleServer(ag); System.out.println("start server..."); server.serve(); ``` 总结来说以下几个步骤: 1. 定义 服务器端 transport ,这里是TServerSocket 1. 定义 服务器端的Processor 1. 定义 服务器端的Protocol(和客户端的protocol一致) 1. 使用服务器参数,实例化TServer,并启动服务。 可以看出来,上面启动服务器的过程,只有其中的 步骤2是有差别的。可以根据反射的原理来动态定义相应服务的Processor,从而实现一套代码可以启动或者发布所有的服务接口。 具体的代码实现,https://gitee.com/flyme2020/paradise/blob/master/thrift-server/src/main/java/com/usoft/thrift/server/ThriftServiceServerFactory.java 那么当我们要启动一个服务或者发布一个服务时,就可以在Spring IoC容器中配置一个bean, ``` ``` 配置服务发布的参数, - serviceImpl —— 服务器端服务实现类 - port —— 启动端口 - version —— 服务的版本 - weight —— 服务权重 - thriftServerAddressRegister —— 服务的注册器(注册服务到zk 集群) ### 4.客户端透明化的调用远程服务 在标题2中,实现了Thrift Client的服务化改造,那么如何使用? 在这里,我们也是使用Spring IoC容器来实例化Thrift Client的bean,当需要调用服务时,只需自动装配拿到client bean,就可以调用服务。 ``` ``` 配置参数: - maxTotal : 客户端连接池最大连接数量 - idleTime:空闲时间 - serverAddressProvider :服务的地址发现provider。其中参数service,表示客户端要请求的服务名称,version表示要请求的服务版本,zkClient表示zk的客户端。 配置client的bean,调用服务。 ``` GreetServiceRpc.Iface greetService = (GreetServiceRpc.Iface) context .getBean("greetService"); String resp = greetService.greet("hello world"); ``` 实例代码,http://git.oschina.net/xinxingegeya/Paradise/tree/master/thrift-showcase?dir=1&filepath=thrift-showcase&oid=834d37de4e4336a51be4bb24a011a06e45fec473&sha=5a71ff22540bb6085b5afa4cd47177e6ae17991e ### 5.实现Thrift 客户端的连接池 使用common pool2 实现了客户端的连接池,直接封装了Thrift客户端的构造过程,具体看代码, ``` package com.usoft.thrift.client.pool; import com.usoft.thrift.client.provider.ThriftServerAddressProvider; import com.usoft.thrift.exception.ThriftException; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.thrift.TServiceClient; import org.apache.thrift.TServiceClientFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; /** * BasePooledObjectFactory * 连接池,thrift-client for spring */ public class ThriftClientPoolFactory extends BasePooledObjectFactory { private Logger logger = LoggerFactory.getLogger(getClass()); private final ThriftServerAddressProvider serverAddressProvider; private final TServiceClientFactory clientFactory; private PoolOperationCallBack callback; protected ThriftClientPoolFactory( ThriftServerAddressProvider addressProvider, TServiceClientFactory clientFactory) throws Exception { this.serverAddressProvider = addressProvider; this.clientFactory = clientFactory; } public ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory clientFactory, PoolOperationCallBack callback) throws Exception { this.serverAddressProvider = addressProvider; this.clientFactory = clientFactory; this.callback = callback; } @Override public TServiceClient create() throws Exception { InetSocketAddress address = serverAddressProvider.selector(); if (address == null) { new ThriftException("No provider available for remote service"); } TSocket tsocket = new TSocket(address.getHostName(), address.getPort()); TTransport transport = new TFramedTransport(tsocket); TProtocol protocol = new TBinaryProtocol(transport); TServiceClient client = this.clientFactory.getClient(protocol); transport.open(); if (callback != null) { try { callback.create(client); } catch (Exception e) { logger.warn("create object:{}", e); } } return client; } @Override public PooledObject wrap(TServiceClient tServiceClient) { return new DefaultPooledObject(tServiceClient); } @Override public void destroyObject(PooledObject p) throws Exception { TServiceClient client = p.getObject(); if (callback != null) { try { callback.destroy(client); } catch (Exception e) { logger.warn("destroyObject:{}", e); } } logger.info("destroyObject:{}", client); TTransport pin = client.getInputProtocol().getTransport(); pin.close(); TTransport pout = client.getOutputProtocol().getTransport(); pout.close(); } @Override public boolean validateObject(PooledObject p) { TServiceClient client = p.getObject(); TTransport pin = client.getInputProtocol().getTransport(); logger.info("validateObject input:{}", pin.isOpen()); TTransport pout = client.getOutputProtocol().getTransport(); logger.info("validateObject output:{}", pout.isOpen()); return pin.isOpen() && pout.isOpen(); } } ``` ### 6.客户端的软负载均衡 主要思路就是根据weight来实现软负载均衡,代码实现, ``` private List transfer(String address) { String[] hostname = address.split(":"); Integer weight = DEFAULT_WEIGHT; if (hostname.length == 3) { weight = Integer.valueOf(hostname[2]); } String ip = hostname[0]; Integer port = Integer.valueOf(hostname[1]); List result = new ArrayList(); //根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载 //如果weight取8,那么就会实例化8个InetSocketAddress for (int i = 0; i < weight; i++) { result.add(new InetSocketAddress(ip, port)); } return result; } ``` 如果一个服务的weight是100,那么就会构造100个InetSocketAddress,而另一个服务器上相同的服务的weight是200,那么就构造200个InetSocketAddress,那么就有 300 个InetSocketAddress 可以供客户端来调用服务。最后把这300个InetSocketAddress放入集合中,打乱其顺序,然后依序取用,从而实际上实现了负载均衡。当调用多次后,两个服务调用的比例大致上为1:2; ### 7.服务器端的监控 总之一句话,动态代理实现服务器端服务调用成功次数、失败次数,调用成功所花费的最小时间、最大时间、平均时间等的统计。 代码:http://git.oschina.net/xinxingegeya/Paradise/tree/master/thrift-server/src/main/java/com/usoft/thrift/monitor?dir=1&filepath=thrift-server%2Fsrc%2Fmain%2Fjava%2Fcom%2Fusoft%2Fthrift%2Fmonitor&oid=c91b44d979f7aca38d74d7c5a873a330d76cd48a&sha=5a71ff22540bb6085b5afa4cd47177e6ae17991e ### 8.总结 && To Do List 总结: - 很大程度上简化了原生Thrift客户端调用远程服务的代码量 - 远程服务调用的透明化 - 发布服务简单,只需配置一个服务bean - 客户端的服务调用简单,自动装配服务bean - 客户端连接池的实现 - 不需要知道服务发布的地址和端口,明确指定服务名称就可以 To Do List: - 以一种更好的方式暴露服务器端的监控数据 - 服务治理中心 - 服务降级或服务的开关 - 动态调整服务权重 - ....... 还有不足的地方欢迎大家指出~ 注:实现的效果在thrift-showcase中演示,可以和原生的服务调用过程在thrift-introduce中作对比。 The End ~