在前两个版本中,每次发起请求一次就新建一个netty的channel连接,如果在高并发情况下就会造成资源的浪费,这时实现异步请求 就十分重要,当有多个请求线程时,需要设计一个线程池 来进行管理。除此之外,当前方法过于依赖注册中心,在高并发情况下对注册中心造成了压力;另外如果注册中心出现宕机等情况,那么整合系统就崩溃了,为了解决这个问题,添加了一个适合高并发的服务缓存机制 。以上为该版本的新增内容。
异步请求和线程池 这里就不具体介绍异步请求的概念了。用一个通俗的例子解释,如你在饭店点餐,当你点好餐后,会得到一个点餐号,但是饭菜并不会立即做好送过,需要你等待一段时间,在这个时间段中,你可以做其他的事情,当饭菜做好后,会根据点餐号进行广播,通知你去拿饭菜。这就是一个典型的异步处理。 在项目中涉及到异步的主要有三个自定义类,即ChannelHolder
,LwRequestPool
和LwRequestManager
。 在ChannelHolder
中定义的变量:
1 2 3 4 5 6 7 8 @Data @Builder @NoArgsConstructor @AllArgsConstructor public class ChannelHolder { private Channel channel; private EventLoopGroup eventLoopGroup; }
在LwRequestManager
中的变量:
1 2 3 4 5 6 private static final ConcurrentHashMap<String, ChannelHolder> channelHolderMap = new ConcurrentHashMap<>(); private static ExecutorService requestExecutor = new ThreadPoolExecutor(30 , 100 , 0 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(30 ), new BasicThreadFactory.Builder().namingPattern("request-service-connector-%d" ).build()); private static LwRequestPool requestPool = SpringBeanFactory.getBean(LwRequestPool.class);
在LwRequestPool
中定义的变量:
1 private final ConcurrentHashMap<String, Promise<LwResponse>> requestPool = new ConcurrentHashMap<>();
刚开始在动态代理中会调用send()
方法,开始了有关异步调用的内容。通过requestId来确定是哪个请求,利用线程池执行netty客户端的运行,并利用CountDownLatch
来先暂停下面代码的运行,如果latch执行了countDown()方法,会再返回这里执行下面的步骤。
1 2 3 4 5 6 7 8 9 public static void send (LwRequest request, URL url) throws Exception { String requestId = request.getRequestId(); CountDownLatch latch = new CountDownLatch(1 ); requestExecutor.execute(new NettyClient(requestId, url, latch)); latch.await(); ChannelHolder channelHolder = channelHolderMap.get(requestId); channelHolder.getChannel().writeAndFlush(request); log.info("客户端发送消息:{}" , channelHolder); }
之后运行Netty客户端中的run()方法,如果与服务端连接成功,将该请求id和对应的channel注册到channelHolderMap
变量中,并执行submitRequest
方法,将请求id和eventLoop注册到变量requestPool
中。最后执行了countDown()
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Override public void run () { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true ) .option(ChannelOption.TCP_NODELAY, true ) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000 ) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(65535 , 0 , 4 )); pipeline.addLast(new LwRpcEncoder(LwRequest.class, new HessianSerializer())); pipeline.addLast(new LwRpcDecoder(LwResponse.class, new HessianSerializer())); pipeline.addLast(clientHandler); } }); try { ChannelFuture future = bootstrap.connect(url.getHostname(), url.getPort()).sync(); if (future.isSuccess()) { ChannelHolder channelHolder = ChannelHolder.builder() .channel(future.channel()) .eventLoopGroup(group).build(); LwRequestManager.registerChannelHolder(requestId, channelHolder); latch.countDown(); } } catch (InterruptedException e) { e.printStackTrace(); } }
1 2 3 4 5 6 requestPool.submitRequest(requestId, channelHolder.getChannel().eventLoop()); public void submitRequest (String requestId, EventExecutor executor) { requestPool.put(requestId, new DefaultPromise<>(executor)); }
当执行了countDown()
方法,会跳转到原来最初的地方,执行剩下的代码部分,进行请求发送。等待服务端的响应。
1 2 ChannelHolder channelHolder = channelHolderMap.get(requestId); channelHolder.getChannel().writeAndFlush(request);
当客户端接收到服务端发回的结果信息时,会执行notifyRequest
方法。
1 2 3 4 @Override protected void channelRead0 (ChannelHandlerContext channelHandlerContext, LwResponse response) throws Exception { lwRequestPool.notifyRequest(response.getRequestId(), response); }
在notifyRequest
方法中,会从变量requestPool
中获取到返回的LwResponse
变量,并封装在Promise
中,最后调用setsuccess()方法。
1 2 3 4 5 6 public void notifyRequest (String requestId, LwResponse response) { Promise<LwResponse> promise = requestPool.get(requestId); if (promise != null ) { promise.setSuccess(response); } }
setsuccess()
方法是netty的Promise中的方法。它会通知所有的监听器。在官方解释如下:Marks this future as a success and notifies all
此时就可以通过fetchResponse
根据请求id获取到了服务端发送过来的消息,此时已经执行完毕,需要从requestpool
中删除该请求信息。
1 2 3 4 5 6 7 8 9 10 11 12 LwResponse response = lwRequestPool.fetchResponse(requestId); public LwResponse fetchResponse (String requestId) throws Exception { Promise<LwResponse> promise = requestPool.get(requestId); if (promise == null ) return null ; LwResponse response = promise.get(10 , TimeUnit.SECONDS); requestPool.remove(requestId); LwRequestManager.destroyChannelHolder(requestId); return response; }
高并发下的缓存机制 在原来的版本中,每次请求远程服务时,都需要从注册中心获取服务地址,在高并发情况下,会对注册中心造成一定的影响;或者如果注册中心突然宕机,那么就无法获取待服务地址,整个系统就崩溃了。所以设计一个缓存机制,将请求到的服务地址持久化到本地,当下次请求时,就无须再需要注册中心了,直接从持久化文件中获取,减轻了注册中心的压力。
在进行本地缓存时,会先调用saveServices
方法,将URL数组信息保存到Properties
中,并获取当前version
版本号,然后执行doSaveProperties
方法来保存到本地。这个步骤支持同步和异步两种方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void saveServices (String serviceName, List<URL> urlList) { if (file == null ) return ; try { StringBuilder buf = new StringBuilder(); for (URL url : urlList) { if (buf.length() > 0 ) { buf.append(";" ); } buf.append(url.getAllInformation()); } properties.setProperty(serviceName, buf.toString()); long version = lastCacheChanged.incrementAndGet(); if (syncSaveFile) { doSaveProperties(version); } else { registerCacheExecutor.execute(new SaveProperties(version)); } } catch (Throwable t) { log.warn(t.getMessage(), t); } }
在doSaveProperties
方法中,如果传入的版本号不是最新的版本号,说明其他线程已经修改了,内容发生了变化,直接退出。在写入到文件时会添加锁,进一步保证信息的准确性。如果添加失败,会进行重试操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private void doSaveProperties (long version) { if (version < lastCacheChanged.get()) return ; if (file == null ) return ; try { File lockfile = new File(file.getAbsolutePath() + ".lock" ); if (!lockfile.exists()) { lockfile.createNewFile(); } try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw" ); FileChannel channel = raf.getChannel();) { FileLock lock = channel.tryLock(); if (lock == null ) { throw new IOException("不能锁住注册的缓存文件" ); } try { if (!file.exists()) { file.createNewFile(); } try (FileOutputStream outputFile = new FileOutputStream(file)) { properties.store(outputFile, "RPC Server Cache" ); } } finally { lock.release(); } } }catch (Throwable e) { savePropertiesRetryTimes.incrementAndGet(); if (savePropertiesRetryTimes.get() > SAVE_MAX_RETRY) { log.warn("超过最大重试次数,缓存失败!" ); savePropertiesRetryTimes.set(0 ); return ; } if (version < lastCacheChanged.get()) { savePropertiesRetryTimes.set(0 ); return ; } e.printStackTrace(); } }
具体详细代码可以到我的项目中进行查看轻量级RPC第三版