什么是RPC
RPC (Remote Procedure Call Protocol), 远程过程调用,通俗的解释就是:客户端在不知道调用细节的情况下,调用存在于远程计算机上的某个对象,就像调用本地应用程序中的对象一样,不需要了解底层网络技术的协议。
简单的整体工作流程
请求端发送一个调用的数据包,该包中包含有调用标识,参数等协议要求的参数。当响应端接收到这个数据包,对应的程序被调起,然后返回结果数据包,返回的数据包含了和请求的数据包中同样的请求标识,结果等。
性能影响因素
- 利用的网络协议。可以使用应用层协议,例如HTTP或者HTTP/2协议;也可以利用传输层协议,例如TCP协议,但是主流的RPC还没有采用UDP传输协议。
- 消息封装格式。选择或设计一种协议来封装信息进行组装发送。比如,dubbo中消息体数据包含dubbo版本号、接口名称、接口版本、方法名称、参数类型列表、参数、附加信息等。
- 序列化。信息在网络传输中要以二进制格式进行传输。序列化和反序列化,是对象到而二进制数据的转换。常见的序列化方法有JSON、Hessian、Protostuff等。
- 网络IO模型。可以采用非阻塞式同步IO,也可以在服务器上实现对多路IO模型的支持。
- 线程管理方式。在高并发请求下,可以使用单个线程运行服务的具体实现,但是会出现请求阻塞等待现象。也可以为每一个RPC具体服务的实现开启一个独立的线程运行,最大线程数有限制,可以使用线程池来管理多个线程的分配和调度。
第一版RPC
第一个版本简单实现了RPC的最基本功能,即服务信息的发送与接收、序列化方式和动态代理等。
项目利用Springboot来实现依赖注入与参数配置,使用netty实现NIO方式的数据传输,使用Hessian来实现对象序列化。
动态代理
这里要提到代理模式,它的特征是代理类与委托类有同样的接口,代理类主要负责为委托类预处理消息、过滤消息、把消息转发给委托类,以及事后处理消息等。代理类与委托类之间通常会存在关联关系。
根据创建代理类的时间点,又可以分为静态代理和动态代理。
在以往的静态代理中需要手动为每一个目标编写对应的代理类。如果系统已经有了成百上千个类,工作量太大了。
静态代理由程序员创建或特定工具自动生成源代码,也就是在编译时就已经将接口与被代理类,代理类等确定下来。在程序运行之前,代理类的.class文件就已经生成。
代理类在程序运行时创建的代理方式被称为代理模式。在静态代理中,代理类是自己定义好的,在运行之前就已经编译完成了。而在动态代理中,可以很方便地对代理类的函数进行统一的处理,而不用修改每个代理类中的方法。可以通过InvocationHandler
接口来实现。
客户端的动态代理
1 2 3 4 5
| public class ProxyFactory { public static <T> T create(Class<T> interfaceClass) throws Exception { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new LwRpcClientDynamicProxy<T>(interfaceClass)); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Slf4j public class LwRpcClientDynamicProxy<T> implements InvocationHandler { private Class<T> clazz; public LwRpcClientDynamicProxy(Class<T> clazz) throws Exception { this.clazz = clazz; }
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { LwRequest lwRequest = new LwRequest(); String requestId = UUID.randomUUID().toString(); String className = method.getDeclaringClass().getName(); String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes();
lwRequest.setRequestId(requestId); lwRequest.setClassName(className); lwRequest.setMethodName(methodName); lwRequest.setParameterTypes(parameterTypes); lwRequest.setParameters(args); NettyClient nettyClient = new NettyClient("127.0.0.1", 8888); log.info("开始连接服务器端:{}", new Date()); LwResponse send = nettyClient.send(lwRequest); log.info("请求后返回的结果:{}", send.getResult()); return send.getResult(); } }
|
在服务端会利用在客户端获取到的类名。参数等信息利用反射机制进行调用。
1 2 3 4 5 6
| Class<?>[] parameterTypes = request.getParameterTypes(); Object[] paramethers = request.getParameters(); FastClass fastClass = FastClass.create(serviceClass); FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes); return fastMethod.invoke(serviceBean, paramethers);
|
Netty客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| @Slf4j public class NettyClient { private String host; private Integer port; private LwResponse response; private EventLoopGroup group; private ChannelFuture future = null; private Object obj = new Object(); private NettyClientHandler nettyClientHandler; public NettyClient(String host, Integer port) { this.host = host; this.port = port; }
public LwResponse send(LwRequest request) throws Exception{ nettyClientHandler = new NettyClientHandler(request); group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4)); pipeline.addLast(new LwRpcEncoder(LwRequest.class, new HessianSerializer())); pipeline.addLast(new LwRpcDecoder(LwResponse.class, new HessianSerializer())); pipeline.addLast(nettyClientHandler); } }); System.out.println("host:" + host); future = bootstrap.connect(host, port).sync(); nettyClientHandler.getCountDownLatch().await(); this.response = nettyClientHandler.getLwResponse(); return this.response; }
@PreDestroy public void close() { group.shutdownGracefully(); future.channel().closeFuture().syncUninterruptibly(); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| @Slf4j public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final CountDownLatch countDownLatch = new CountDownLatch(1); private LwResponse response = null; private LwRequest request;
public NettyClientHandler(LwRequest request) { this.request = request; }
public CountDownLatch getCountDownLatch() { return countDownLatch; }
public LwResponse getLwResponse() { return this.response; } @Override public void channelActive(ChannelHandlerContext ctx) { log.info("客户端向客户端发送消息"); ctx.writeAndFlush(request); log.info("客户端请求成功"); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { LwResponse lwResponse = (LwResponse) msg; log.info("收到服务端的信息:{}", lwResponse.getResult()); this.response = lwResponse; this.countDownLatch.countDown(); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.close(); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); }
}
|
在客户端发送服务信息时,用LwQuest类进行封装,返回的结果用LwResponse进行封装,当客户端读取到服务器端返回的响应时,在NettyClientHandler
中进行处理,并利用CountDownLatch
进行线程的阻塞和运行。
Netty服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| @Component @Slf4j public class NettyServer { private EventLoopGroup boss = null; private EventLoopGroup worker = null; @Autowired private ServerHandler serverHandler; @Value("${server.address}") private String address; public void start() throws Exception { log.info("成功"); boss = new NioEventLoopGroup(); worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4)); pipeline.addLast(new LwRpcEncoder(LwResponse.class, new HessianSerializer())); pipeline.addLast(new LwRpcDecoder(LwRequest.class, new HessianSerializer())); pipeline.addLast(serverHandler); } }); String[] strs = address.split(":"); String addr = strs[0]; int port = Integer.valueOf(strs[1]); ChannelFuture future = serverBootstrap.bind(addr, port).sync(); future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } }
@PreDestroy public void destory() throws InterruptedException { boss.shutdownGracefully().sync(); worker.shutdownGracefully().sync(); log.info("关闭netty"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Component @Slf4j @ChannelHandler.Sharable public class ServerHandler extends SimpleChannelInboundHandler<LwRequest> implements ApplicationContextAware { private ApplicationContext applicationContext;
@Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; }
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, LwRequest msg) throws Exception { LwResponse lwResponse = new LwResponse(); lwResponse.setRequestId(msg.getRequestId()); log.info("从客户端接收到请求信息:{}", msg); try { Object result = handler(msg); lwResponse.setResult(result); } catch (Throwable throwable) { lwResponse.setCause(throwable); throwable.printStackTrace();
} channelHandlerContext.writeAndFlush(lwResponse); }
private Object handler(LwRequest request) throws ClassNotFoundException, InvocationTargetException {
Class<?> clazz = Class.forName(request.getClassName()); Object serviceBean = applicationContext.getBean(clazz); Class<?> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); log.info("获取到的服务类:{}", serviceBean); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] paramethers = request.getParameters(); FastClass fastClass = FastClass.create(serviceClass); FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes); return fastMethod.invoke(serviceBean, paramethers); } }
|
在Netty服务端中,会利用``serverHandler`来处理从客户端中接收的信息,并利用反射的思想调用本地的方法,并将处理的结构封装在LwResponse中。
LwRequest
和LwRespnse
要想在网络中进行传输,需要转化为二进制转换。具体方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class HessianSerializer implements Serializer { @Override public byte[] serialize(Object object) throws IOException { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); Hessian2Output output = new Hessian2Output(byteArrayOutputStream); output.writeObject(object); output.flush(); return byteArrayOutputStream.toByteArray(); }
public <T> T deserialize(Class<T> clazz, byte[] bytes) throws IOException { Hessian2Input input = new Hessian2Input(new ByteArrayInputStream(bytes)); return (T) input.readObject(clazz); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class LwRpcDecoder extends ByteToMessageDecoder {
private Class<?> clazz; private Serializer serializer;
public LwRpcDecoder(Class<?> clazz, Serializer serializer) { this.clazz = clazz; this.serializer = serializer; }
@Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { if (byteBuf.readableBytes() < 4) return; byteBuf.markReaderIndex(); int dataLength = byteBuf.readInt(); if (dataLength < 0) { channelHandlerContext.close(); } if (byteBuf.readableBytes() < dataLength) { byteBuf.resetReaderIndex(); } byte[] data = new byte[dataLength]; byteBuf.readBytes(data);
Object obj = serializer.deserialize(clazz, data); list.add(obj); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class LwRpcEncoder extends MessageToByteEncoder<Object> { private Class<?> clazz; private Serializer serializer;
public LwRpcEncoder(Class<?> clazz, Serializer serializer) { this.clazz = clazz; this.serializer = serializer; }
@Override protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception { if (clazz.isInstance(in)) { byte[] data = serializer.serialize(in); out.writeInt(data.length); out.writeBytes(data); }
}
}
|