Springboot+Redis+RabbitMQ实战教程(二)RabbitMQ

2020-08-13 13:39:52  晓掌柜  版权声明:本文为站长原创文章,转载请写明出处


一、背景

    承接上文Springboot+Redis+RabbitMQ实战教程(一)Redis,本次主要对RabbitMQ的集成及相关知识点做一些总结。

二、概述

    2.1、什么是消息队列

        消息总线(Message Queue)是一种跨进程、一步的通信机制。主要用于上下游的消息传递。

    2.2、消息队列有什么用,什么场景下适合

        ① 消息队列的作用

            消息队列主要的作用就是应用解耦、异步操作、流量削峰、日志记录等。

        ② 典型的适用场景

            应用解耦 -- 库存和订单的分离

            异步操作 -- 用户注册后发送邮件

            并行操作 -- 用户注册发送邮件的同时也发送一条短信

            流量削峰 -- 在类似秒杀活动中把请求放置到队列中控制数量(达到数量即停止)

                       根据服务器性能慢慢消费(每次消费XX条)

            日志收集 -- 比如和logback的集成

    2.3、注意事项

        2.3.1、消息堆积(生产者 > 消费者、失败的消息无法ACK、消费者瓶颈或挂掉)

            ① 排查消费者效率及性能

            ② 增加消费者或消费者线程

        2.3.2、消息丢失

            ① 生产者丢失(断网、宕机):生产者在消息投递时开启确认机制

            ② 队列中丢失(队列服务器宕机):持久化处理

            ③ 消费者中丢失:手动ACK

        2.3.3、重复消费(没有ACK,手动ACK时失败)

            ① 消息中加入标识作为检测依据

            ② 消费后变更消息标识

            ③ 如遇到再次消费是检测消息标识,检测到未消费时继续进行

三、使用教程

    3.1、简明安装说明

        3.1.1、安装Erlang

             https://www.erlang.org/downloads

        3.1.2、安装RabbitMQ

             http://www.rabbitmq.com/download.html

        3.1.3、注意事项

            详细的安装操作可以在网络上搜索,这里提示一下(rabbitMQ解压完成后一定要安装插件)

            cmd进入sbin目录下执行 rabbitmq-plugins enable rabbitmq_management

        3.1.3、默认登录密码及端口

             guest、guest (可修改);登录的端口号是15672

        3.1.4、Host和User分配

            添加Host并和user进行关联分配

        注意事项:

            ① 安装erlang后需配置系统环境变量,变量名为 ERLANG_HOME,路径为安装根目录

               

            ② 修改Path用户环境变量配置,在原有上加上 %ERLANG_HOME%\bin

                                  

    3.2、相关配置

        3.2.1、maven依赖

        <!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

       

         3.2.1、相关yml配置           
        
        # RabbitMQ基础配置

  rabbitmq:
host: 127.0.0.1
port: 5672
username: xa
password: guangmuhua
listener:
simple:
retry:
# 开启消费者(程序出现异常的情况下会)进行重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔次数
initial-interval: 3000
# 开启手动ack
acknowledge-mode: manual
# 指定最小的消费者数量
concurrency: 1
# 指定最大的消费者数量
max-concurrency: 1
# 指定一个请求能处理多少的消息,如果有事务的话,必须大于等于transaction的数量
prefetch: 1


四、核心代码

    4.1、rabbitConfig

        声明队列和交换机并绑定。

        
        /**
* @author XA
* date 2021/1/25 15:36
* description: 队列和交换机绑定
*/
@Configuration
public class RabbitConfig {

@Value("${rabbitmqConf.article.exchange}")
private String articleExchange;

@Value("${rabbitmqConf.article.query}")
private String articleQuery;

@Value("${rabbitmqConf.article.routing}")
private String articleRouting;

@Value("${rabbitmqConf.mail.exchange}")
private String mailExchange;

@Value("${rabbitmqConf.mail.query}")
private String mailQuery;

@Value("${rabbitmqConf.mail.routing}")
private String mailRouting;


/* 文章队列和交换机绑定 */
@Bean
public Queue articleQuery() {
/* durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 */
/* exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable */
/* autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 */

/* 一般设置一下队列的持久化就好,其余两个就是默认false */
return new Queue(articleQuery,true,false,false);
}

@Bean
DirectExchange articleExchange() {
return new DirectExchange(articleExchange,true,false);
}

/* 绑定将队列和交换机绑定, 并设置用于匹配键:articleRouting */
@Bean
Binding bindingOrder() {
return BindingBuilder.bind(articleQuery()).to(articleExchange()).with(articleRouting);
}


/* 邮件发送交换机和队列绑定 */
@Bean
public Queue mailQuery() {
return new Queue(mailQuery, true, false, false);
}

@Bean
DirectExchange mailExchacge() {
return new DirectExchange(mailExchange, true, false);
}

@Bean
Binding bindingMail() {
return BindingBuilder.bind(mailQuery()).to(mailExchacge()).with(mailRouting);
}
}


    4.2、队列生产者       

        
        private void sendMailMq(MailMqDTO mailMqDTO){
String str = JSONObject.toJSONString(mailMqDTO);
Message message = MessageBuilder.withBody(str.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID()+"")
.build();

       rabbitTemplate.convertAndSend(articleExchange,articleRouting, message);

}

   

     4.3、队列消费者       

        
        /**
* 功能描述: 消息监听 (注意:这里的RabbitListener注解放置在类上会报错)
* * 没有队列时创建 -- queuesToDeclare
* * 持久化 -- durable
* * 是否自动删除 -- autoDelete
*
* Param: [message]
* Return: void
*/
@RabbitHandler
@RabbitListener(queues="mail")
@Transactional(rollbackFor = Exception.class)
public void process(Message message, Channel channel) throws IOException {
try {
MailMqDTO mailMqDTO = JSON.parseObject(new String(message.getBody()), MailMqDTO.class);
assert mailMqDTO != null;
mailUtils.sendMail(mailMqDTO);
/* 消费成功之后进行手动签收 */

         /* todo 注意:MQ在ack时是线程不安全的,channel的publicer可能已经失效 */

channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}catch (Exception e){
/* 消费失败,将消息重新放置到队列中 */
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
e.printStackTrace();
}
}


五、后记

    这里仅做初步的项目集成,更多高级用法会在后续陆续放出,敬请期待。

    也欢迎大家沟通交流,共同进步!        




最新评论: