RocketMQ

简介

消息中间件是什么?

中间件:顾名思义 介于两者之间的一个技术

消息中间件:消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

RocketMQ是什么?

RocketMQ是阿里巴巴开源的一个消息中间件,是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。目前已贡献给apache

参考阅读链接:[RocketMQ的前世今生](https://yq.aliyun.com/articles/66129)

功能

异步化

将一些可以进行异步化的操作通过发送消息来进行异步化,提高效率

具体场景:用户为了使用某个应用,进行注册,系统需要发送注册邮件并验证短信。对这两个操作的处理方式有两种:串行及并行。

  1. 串行方式:新注册信息生成后,先发送注册邮件,再发送验证短信

在这种方式下,需要最终发送验证短信后再返回给客户端。

  1. 并行处理:新注册信息写入后,发短信和发邮件并行处理

在这种方式下,发短信和发邮件 需处理完成后再返回给客户端。

假设以上三个子系统处理的时间均为50ms,且不考虑网络延迟,则总的处理时间:

串行:50 + 50 + 50 = 150ms	并行:50 + 50 = 100ms
  1. 使用消息队列

并在写入消息队列后立即返回成功给客户端,则总的响应时间依赖于写入消息队列的时间,而写入消息队列的时间本身是可以很快的,基本可以忽略不计,因此总的处理时间相比串行提高了2倍,相比并行提高了一倍;

限流削峰

在高并发场景下把请求存入消息队列,利用排队思想降低系统瞬间峰值

具体场景:购物网站开展秒杀活动,一般由于瞬时访问量过大,服务器接收过大,会导致流量暴增,相关系统无法处理请求甚至崩溃。而加入消息队列后,系统可以从消息队列中取数据,相当于消息队列做了一次缓冲。

优点

  1. 请求先入消息队列,而不是由业务处理系统直接处理,做了一次缓冲,极大地减少了业务处理系统的压力;
  2. 事实上,秒杀时,后入队列的用户无法秒杀到商品,这些请求可以直接被抛弃,返回活动已结束或商品已售完信息;

对比

消息中间件不仅仅只有RocketMQ,市面上还有很多其他的消息中间件,这里列举几个常见的和RocketMQ作为一个对比

ActiveMQ:ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,是使用Java语言编写的。

JMS: 全称是Java Message Service,即消息服务应用程序接口,是一个Java面向消息中间件平台的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信

RabbitMQ:AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用,使用Erlang语言编写的。

AMQP: 即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计

Kafka: Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

模型(RocketMQ)

概念模型

  • Producer: 消息生产者,负责消息的产生,由业务系统负责产生
  • Consumer:消息消费者,负责消息消费,由后台业务系统负责异步消费
  • Topic:消息的逻辑管理单位(消息的一个属性,并且每个消息都一定有这个属性)

这三者是RocketMq中最最基本的概念。Producer是消息的生产者。Consumer是消息的消费者。消息通过Topic进行传递。Topic存放的是消息的逻辑地址。

具体来说是Producer将消息发往具体的Topic。Consumer订阅Topic,主动拉取或被动接受消息,如果Consumer消费消息失败则默认会重试16次

  • Broker: 消息的中转角色,负责存储消息,转发消息,一般也称为server,可以理解为一个存放消息的服务,里面可以有多个Topic
  • MessageQueue: 消息的物理管理单位,一个Topic下有多个Queue,默认一个Topic创建时会创建四个MessageQueue
  • ConsumerGroup: 具有同样消费逻辑消费同样消息的Consumer,可以归并为一个group
  • ProducerGroup: 具有同样属性的一些Producer可以归并为同一个Group

同样属性是指:发送同样Topic类型的消息

  • Nameserver 注册中心

作用:

  • 每个Broker启动的时候会向namesrv注册
  • Producer发送消息的时候根据Topic获取路由到Broker里面Broker的信息
  • Consumer根据Topic到Namesrv 获取topic的路由到Broker的信息

部署模型

  1. 注册中心Nameserver启动
  2. 消息中转服务Broker启动
  • 启动的时候会去创建Topic并创建对应的MessageQueue
  • 启动的时候会去注册中心注册,把自己的地址以及负责的Topic告诉注册中心
  • Broker和Nameserver之间通过心跳机制来检测对方是否存活
     连接: 单个broker和所有nameserver保持长连接 
     心跳: 
     	心跳间隔:每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。
       心跳超时:nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。
  1. 消息生产者Produer启动

启动时:

  • 单个生产者者和一台nameserver保持长连接,定时查询topic配置信息
  • 单个生产者和该生产者关联的所有broker保持长连接。

运行时:

  • 默认情况下,生产者每隔30秒从nameserver获取所有topic的最新队列情况
  • 发送消息时,根据从nameserver获取的路由信息,根据发送消息的Topic和目标Broker建立连接
  • 默认情况下,生产者每隔30秒向所有broker发送心跳,该时间由DefaultMQProducer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接
  1. 消息消费者Consumer启动

启动时:

  • 单个消费者和一台nameserver保持长连接,定时查询topic配置信息
  • 单个消费者和该消费者关联的所有broker保持长连接。

运行时:

  • 默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况
  • 默认情况下,消费者每隔30秒向所有broker发送心跳,该时间由DefaultMQPushConsumer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接

注意事项

  • 同步刷盘与异步刷盘

​ RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。

RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式:

  • 异步刷盘:在返回写成功状态时,消息可能只是被写入了内存中,写操作的返回快,

吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入

  • 同步刷盘:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

同步刷盘还是异步刷盘,是通过Broker配置文件里的flushDiskType参数设置的,这个参数被设置成SYNC_FLUSH、ASYNC_FLUSH

中的一个

  • 同步复制与异步复制

​ 如果一个broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。

同步复制是等Master和Slave均写成功后才反馈给客户端写成功状态;异步复制方式是只要Master写成功即可反馈给客户端写成功状态

同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成

ASYNC_MASTER、SYNC_MASTER、SLAVE三个值中的一个。

整合

导包

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.4</version>
        </dependency>

普通消息

  • 消息生产者
public static void main(String[] args){
  	// 新增消息生产者
  	DefaultMQProucer producer = new DefaultMQProucer("producer_group");
  	// 配置注册中心
  	producer.setNamesrvAddr("localhost:9876");
  	// 启动
  	producer.start();
  	// 新建消息对象
  	Message message = new Message("topicA","message".context.getBytes(Charset.forName("utf-8")));
  	// 发送消息
  	producer.send(message);
}
  • 消息消费者
public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer mqConsumer = new DefaultMQPushConsumer("consumer_group");
        mqConsumer.setNamesrvAddr("localhost:9876");
        mqConsumer.subscribe("topicA", "*");
				// 设置消息监听器
        mqConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt message = msgs.get(0);
              //获取消息内容
                byte[] body = message.getBody();
                
        });

        mqConsumer.start();

延迟消息

  • 消息生产者
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
    // 1. 创建Producer对象
        DefaultMQProducer produce = new DefaultMQProducer("delay_producer_group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        // 准备消息
        Message message = new Message();
        message.setTopic("test_delay");
        message.setBody("hello,delay".getBytes("utf-8"));
        // 非常简单  延迟级别
        //  1s   5s  10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 32m 1h 2h
        //  1    2    3  4   5  6  7  8  9 10  11 12 13 14 15  16   17 18
        message.setDelayTimeLevel(2);
        // 发送
        SendResult send = producer.send(message);
        System.out.println(send);
    }
  • 消息消费者(和普通的消息消费者没有区别)

服务中的使用

如果我们想在服务中发送消息可以这样做:

package com.cskaoyan.duolai.clean.rocketmq.client;

@Component
@Slf4j
public class RocketMQProducerConfig {

    @Value("${rocketmq.namesrv.address}")
    String namesrvAddr;


    @Bean
    public DefaultMQProducer producer(){

        // 创建对象
        mqProducer = new DefaultMQProducer(生产者组);

        // 设置nameserver
        mqProducer.setNamesrvAddr(namesrvAddr);

        try {
            // 启动producer
            mqProducer.start();
            log.info("mqProducer inited successed...namesrcAddr:{}, producerGroup:{}", namesrvAddr,producerGroup);
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
   
}

如果我们想在服务中消费消息,可以这样写代码:

@Slf4j
@Component
public class RocketConsumer {

    @Value("${rocketmq.namesrv.address}")
    String namesrvAddr;


    private DefaultMQPushConsumer consumer;


    @PostConstruct
    public void init() {
        log.info("RocketConsumer init ...");
        // 创建consumer对象
        consumer = new DefaultMQPushConsumer(消费者组);

        // 设置nameserver地址
        consumer.setNamesrvAddr(namesrvAddr);

        // 设置消息监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                MessageExt messageExt = msgs.get(0);
                log.info("接收消息 ({})-> {}",  new String(messageExt.getBody(), StandardCharsets.UTF_8));
                try {
                   // 消息处理逻辑
                } catch (Exception e) {
                    log.error("parseMsg error,msg={}", new String(messageExt.getBody(), StandardCharsets.UTF_8), e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        try {
            // 订阅主题
            consumer.subscribe(主题名称, "*");

            // 启动consumer
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }


    }
}
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇