RocketMQ具有高可用性,消息消费到达主服务器Master后需要将消息同步到消息从服务器Slave,如果主服务器Master宕机后,消息消费者可以向Slave拉取消息。
整体流程 主服务器Master启动,监听从服务器Slave的监听;从服务器Slave启动,主动向主服务器建立Tcp长连接,然后获取从服务器Slave的commitlog的最大偏移量,以此偏移量向主服务器Master主动拉取消息(间隔5s发送一次),主服务器根据偏移量,与自身commitlog文件的最大偏移量进行比较,如果大于Slave的最大偏移量,主服务器Master将向从服务器Slave返回一定数量的消息,将消息写入到Slave的commitlog文件中,该过程循环进行,如果从服务器Slave的最大偏移量大于等于主服务器Master的最大偏移量,说明主从服务器数据同步完成。
相关类介绍 HAservice: 主从同步核心实现类 AcceptSocketService: HA Master端监听客户端连接实现类 GroupTransferService: 主从同步通知实现类 HAClient: HA Client端实现类(从服务器) HAConnection: HA Master服务端HA连接对象的封装,与Broker从服务器的网络读写实现类 ReadSocketService: HA Master网络读实现类 WriterSocketService: HA Master网络写实现类
主从同步Slave端实现 相关类是HAClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class HAClient extends ServiceThread { private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4 ; private final AtomicReference<String> masterAddress = new AtomicReference<>(); private final ByteBuffer reportOffset = ByteBuffer.allocate(8 ); private SocketChannel socketChannel; private Selector selector; private long lastWriteTimestamp = System.currentTimeMillis(); private long currentReportedOffset = 0 ; private int dispatchPostion = 0 ; private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private boolean connectMaster () throws ClosedChannelException { if (null == socketChannel) { String addr = this .masterAddress.get(); if (addr != null ) { SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); if (socketAddress != null ) { this .socketChannel = RemotingUtil.connect(socketAddress); if (this .socketChannel != null ) { this .socketChannel.register(this .selector, SelectionKey.OP_READ); } } } this .currentReportedOffset = HAService.this .defaultMessageStore.getMaxPhyOffset(); this .lastWriteTimestamp = System.currentTimeMillis(); } return this .socketChannel != null ; }
Slave服务器连接Master服务器,获取当前文件的最大偏移量。
1 2 3 4 5 6 7 8 private boolean isTimeToReportOffset () { long interval = HAService.this .defaultMessageStore.getSystemClock().now() - this .lastWriteTimestamp; boolean needHeart = interval > HAService.this .defaultMessageStore.getMessageStoreConfig() .getHaSendHeartbeatInterval(); return needHeart; }
判断是否向Master反馈当前待拉取偏移量,默认间隔5s发送一次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private boolean reportSlaveMaxOffset (final long maxOffset) { this .reportOffset.position(0 ); this .reportOffset.limit(8 ); this .reportOffset.putLong(maxOffset); this .reportOffset.position(0 );this .reportOffset.limit(8 );for (int i = 0 ; i < 3 && this .reportOffset.hasRemaining(); i++) {try {this .socketChannel.write(this .reportOffset);} catch (IOException e) { log.error(this .getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception" , e); return false ;} } return !this .reportOffset.hasRemaining();}
向Master服务器反馈拉取偏移量,如果发送失败,会再进行尝试,总共三次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private boolean processReadEvent () { int readSizeZeroTimes = 0 ; while (this .byteBufferRead.hasRemaining()) { try { int readSize = this .socketChannel.read(this .byteBufferRead); if (readSize > 0 ) { lastWriteTimestamp = HAService.this .defaultMessageStore.getSystemClock().now(); readSizeZeroTimes = 0 ; boolean result = this .dispatchReadRequest(); if (!result) { log.error("HAClient, dispatchReadRequest error" ); return false ; } } else if (readSize == 0 ) { if (++readSizeZeroTimes >= 3 ) { break ; } } else { log.info("HAClient, processReadEvent read socket < 0" ); return false ; } } catch (IOException e) { log.info("HAClient, processReadEvent read socket exception" , e); return false ; } } return true ; }
处理网络请求,即处理从Master服务器传回的消息数据。循环判断readByteBuffer是否有剩余空间,如果存在剩余空间,则将通道里的数据读入到读缓存区中。 1) 如果读取到的字节数大于0,重置读取到0字节的次数,并更新最后一次写入时间戳,然后将读取到的所有消息全部追加到消息内存映射文件中,然后再次反馈拉取进度给服务器。 2) 如果连续3次从网络通道里读取到0个字节,则结束本次读,并返回true。 3) 如果读取到的字节数小于0或发生IO异常,则返回false。
主服务器Master处理请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class HAConnection { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private final HAService haService; private final SocketChannel socketChannel; private final String clientAddr; private WriteSocketService writeSocketService; private ReadSocketService readSocketService; private volatile long slaveRequestOffset = -1 ; private volatile long slaveAckOffset = -1 ;
Master服务器每隔1s处理一次slave发起的拉取请求。 首先会调用proccessReadEvent方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private boolean processReadEvent () { int readSizeZeroTimes = 0 ; if (!this .byteBufferRead.hasRemaining()) { this .byteBufferRead.flip(); this .processPostion = 0 ; } while (this .byteBufferRead.hasRemaining()) { try { int readSize = this .socketChannel.read(this .byteBufferRead); if (readSize > 0 ) { readSizeZeroTimes = 0 ; this .lastReadTimestamp = HAConnection.this .haService.getDefaultMessageStore().getSystemClock().now(); if ((this .byteBufferRead.position() - this .processPostion) >= 8 ) { int pos = this .byteBufferRead.position() - (this .byteBufferRead.position() % 8 ); long readOffset = this .byteBufferRead.getLong(pos - 8 ); this .processPostion = pos; HAConnection.this .slaveAckOffset = readOffset; if (HAConnection.this .slaveRequestOffset < 0 ) { HAConnection.this .slaveRequestOffset = readOffset; log.info("slave[" + HAConnection.this .clientAddr + "] request offset " + readOffset); } HAConnection.this .haService.notifyTransferSome(HAConnection.this .slaveAckOffset); } } else if (readSize == 0 ) { if (++readSizeZeroTimes >= 3 ) { break ; } } else { log.error("read socket[" + HAConnection.this .clientAddr + "] < 0" ); return false ; } } catch (IOException e) { log.error("processReadEvent exception" , e); return false ; } } return true ; }
1) 如果byteBufferRead没有剩余空间,则调用byteBufferRead.flip()清空,并设置processPostion为0,表示从头开始处理。 2) 当byteBuffer有剩余空间时,先预设readSizeZeroTimes为0,如果读取的字节大于0并且本次读取到的内容大于8,则表明收到了从服务器Slave一条拉取消息的请求,并调用notifyTransferSome 方法。 3) 如果读取到的字节数等于0,则判断readSizeZeroTimes,当小于3时需要再进行循环处理,如果大于3,说明该批次请求已成功处理。
1 2 3 4 5 6 7 8 9 10 11 public void notifyTransferSome (final long offset) { for (long value = this .push2SlaveMaxOffset.get(); offset > value; ) { boolean ok = this .push2SlaveMaxOffset.compareAndSet(value, offset); if (ok) { this .groupTransferService.notifyTransferSome(); break ; } else { value = this .push2SlaveMaxOffset.get(); } } }
该方法的参数offset记录的是从服务器slave反馈的已拉取完成的数据偏移量, push2SlaveMaxOffset记录的是写入到Slave的最大偏移量。 如果从服务器slave确认的偏移量大于push2SlaveMaxOffset,说明该批次主从同步成功,并更新push2SlaveMaxOffset, 则唤醒GroupTransferService线程,各消息发送者线程再次判断自己
然后介绍WriteSocketService线程
1 2 3 4 if (-1 == HAConnection.this .slaveRequestOffset) { Thread.sleep(10 ); continue ; }
如果slaveRequestOffset 等于-1,说明Master还未收到从服务器的拉取请求,放弃本事件的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 if (-1 == this .nextTransferFromWhere) { if (0 == HAConnection.this .slaveRequestOffset) { long masterOffset = HAConnection.this .haService.getDefaultMessageStore().getCommitLog().getMaxOffset(); masterOffset = masterOffset - (masterOffset % HAConnection.this .haService.getDefaultMessageStore().getMessageStoreConfig() .getMapedFileSizeCommitLog()); if (masterOffset < 0 ) { masterOffset = 0 ; } this .nextTransferFromWhere = masterOffset; } else { this .nextTransferFromWhere = HAConnection.this .slaveRequestOffset; } log.info("master transfer data from " + this .nextTransferFromWhere + " to slave[" + HAConnection.this .clientAddr + "], and slave request " + HAConnection.this .slaveRequestOffset); }
如果nextTransferFromWhere 等于-1,表示初次进行数据传输,计算待传输的物理偏移量,如果slaveRequestOffset为0,则从当前commitlog文件最大偏移量开始传输,否则根据从服务器的拉取请求偏移量开始传输。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 if (this .lastWriteOver) { long interval = HAConnection.this .haService.getDefaultMessageStore().getSystemClock().now() - this .lastWriteTimestamp; if (interval > HAConnection.this .haService.getDefaultMessageStore().getMessageStoreConfig() .getHaSendHeartbeatInterval()) { this .byteBufferHeader.position(0 ); this .byteBufferHeader.limit(headerSize); this .byteBufferHeader.putLong(this .nextTransferFromWhere); this .byteBufferHeader.putInt(0 ); this .byteBufferHeader.flip(); this .lastWriteOver = this .transferData(); if (!this .lastWriteOver) continue ; } } else { this .lastWriteOver = this .transferData(); if (!this .lastWriteOver) continue ; }
判断上次写事件是否已将消息全部写入客户端,如果已全部写入,并且当前系统时间与上次最后写入的时间间隔大于HA心跳检测时间,则发送一个心跳包,避免长连接由于空闲被关闭;如果上次数据未写完,则先传输上一次的数据,如果消息还是未全部传输,则结束此次事件处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 SelectMappedBufferResult selectResult = HAConnection.this .haService.getDefaultMessageStore().getCommitLogData(this .nextTransferFromWhere); if (selectResult != null ) { int size = selectResult.getSize(); if (size > HAConnection.this .haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { size = HAConnection.this .haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize(); } long thisOffset = this .nextTransferFromWhere; this .nextTransferFromWhere += size; selectResult.getByteBuffer().limit(size); this .selectMappedBufferResult = selectResult; this .byteBufferHeader.position(0 ); this .byteBufferHeader.limit(headerSize); this .byteBufferHeader.putLong(thisOffset); this .byteBufferHeader.putInt(size); this .byteBufferHeader.flip(); this .lastWriteOver = this .transferData(); } else { HAConnection.this .haService.getWaitNotifyObject().allWaitForRunning(100 ); }
传输消息到从服务器slave 1) 根据消息从服务器slave请求的待拉取偏移量,查找该偏移量之后的所有的可读消息,如果未查到匹配的消息,通知所有等待线程继续等待100ms。 2) 如果匹配到消息,且查找到的消息总长度大于配置的一次传输最大字节数,则会进行截取,可能会包含不完整的消息。
主服务器Master会一直执行该线程,每次事件处理完成后等待1s。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void doWaitTransfer () { synchronized (this .requestsRead) { if (!this .requestsRead.isEmpty()) { for (CommitLog.GroupCommitRequest req : this .requestsRead) { boolean transferOK = HAService.this .push2SlaveMaxOffset.get() >= req.getNextOffset(); for (int i = 0 ; !transferOK && i < 5 ; i++) { this .notifyTransferObject.waitForRunning(1000 ); transferOK = HAService.this .push2SlaveMaxOffset.get() >= req.getNextOffset(); } if (!transferOK) { log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); } req.wakeupCustomer(transferOK); } this .requestsRead.clear(); } } }
GroupTransferService#doWaitTransfer()方法 该类是主从同步复制的实现类
在进行主从同步时,有同步主从模式和异步主从模式,当主服务器Master发送消息给从服务器Slave时,需要先判断上一次同步主从复制的结果,如果Slave中已成功复制的最大偏移量是否大于Master发送给Slave消息后返回的下一条消息的起始偏移量,如果大于,则说明上一个主从同步复制已经完成,则会唤醒阻塞等待的消息发送到Slave的命令。
RocketMq读写分离与其他中间件的实现方式完全不同,RoketMq是消息者首先向主服务器Master发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,建议下次消息拉取是从主服务器还是从从服务器拉取。
决定消费者是否向从服务器拉取消息消费的值存在 GetMessageResult 类中:
1 private boolean suggestPullingFromSlave = false ;
其默认值为 false,即默认消费者不会消费从服务器
1 2 3 4 long diff = maxOffsetPy - maxPhyOffsetPulling;long memory = (long ) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this .messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0 )); getResult.setSuggestPullingFromSlave(diff > memory);
其中 maxOffsetPy 为当前最大物理偏移量,maxPhyOffsetPulling 为本次消息拉取最大物理偏移量,他们的差即可表示消息堆积量,TOTAL_PHYSICAL_MEMORY_SIZE 表示当前系统物理内存,accessMessageInMemoryMaxRatio 的默认值为 40,以上逻辑即可算出当前消息堆积量是否大于物理内存的 40 %,如果大于则将 suggestPullingFromSlave 设置为 true。