博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊rocketmq的SequenceProducerImpl
阅读量:6819 次
发布时间:2019-06-26

本文共 2403 字,大约阅读时间需要 8 分钟。

  hot3.png

本文主要研究一下rocketmq的SequenceProducerImpl

SequenceProducerImpl

io/openmessaging/rocketmq/producer/SequenceProducerImpl.java

public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {    private BlockingQueue
msgCacheQueue; public SequenceProducerImpl(final KeyValue properties) { super(properties); this.msgCacheQueue = new LinkedBlockingQueue<>(); } @Override public KeyValue properties() { return properties; } @Override public void send(final Message message) { checkMessageType(message); org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message); try { Validators.checkMessage(rmqMessage, this.rocketmqProducer); } catch (MQClientException e) { throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); } msgCacheQueue.add(message); } @Override public void send(final Message message, final KeyValue properties) { send(message); } @Override public synchronized void commit() { List
messages = new ArrayList<>(); msgCacheQueue.drainTo(messages); List
rmqMessages = new ArrayList<>(); for (Message message : messages) { rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message)); } if (rmqMessages.size() == 0) { return; } try { SendResult sendResult = this.rocketmqProducer.send(rmqMessages); String[] msgIdArray = sendResult.getMsgId().split(","); for (int i = 0; i < messages.size(); i++) { Message message = messages.get(i); message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]); } } catch (Exception e) { throw checkProducerException("", "", e); } } @Override public synchronized void rollback() { msgCacheQueue.clear(); }}
  • 采用的是LinkedBlockingQueue,send方法实际调用的是添加到队列
  • 另外提供了commit以及rollback方法,都加了synchronized保证对LinkedBlockingQueue操作的线程安全
  • commit的时候,将queue的数据drainTo到list,然后批量发送;rollback的时候清空整个LinkedBlockingQueue

小结

rocketmq的SequenceProducerImpl在send方法的时候不是真正方法,而是添加到队列,只有在commit的时候才批量发送,rollback的时候清空队列。这里的send方法语义不是太好,可以改为pending之类的名称。

doc

转载于:https://my.oschina.net/go4it/blog/1919697

你可能感兴趣的文章
安装ffmpeg过程中可能会遇到的问题详解
查看>>
CISCO和华为设备配置跨域MPLS×××的区别
查看>>
深入了解css的行高Line Height属性
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
linux常用命令笔记---磁盘阵列
查看>>
我的友情链接
查看>>
PHP常用函数
查看>>
特大型网站技术架构脑图
查看>>
数据挖掘 pandas基础入门之创建对象
查看>>
[问题解决] sudo apt-get install *** 出现“E: 无法定位软件包 ***”
查看>>
SD卡tune clk
查看>>
Struts2用户输入验证(4)
查看>>
Centos7安装Redis3.0.5过程记录
查看>>
千方百计隔绝病毒
查看>>
git使用技巧总结
查看>>
robot+selenium编写web UI自动化用例
查看>>
JAVA中Date转换成Strign
查看>>
电脑内存条松动导致的故障现象及解决方法
查看>>
最小化安装的Centos7安装图形化界面
查看>>