用rabbitmq实现消息重发的功能
csdh11 2025-03-12 13:40 2 浏览
前言:
在开发工作中,有很多时候会遇到要把数据同步给其他部门或三方的场景,这个时候光写一个同步接口是不太稳定的,因为有很多因素会导致同步接口运行失败或未运行,比如调接口之前的代码出现了bug,异常被throws或被catch,没有往下走。再比如对方接收代码出现问题,或者网络问题,接口没通,同步失败。
遇到上面同步失败的情况,就会影响到业务的正常使用了,本文只讨论第二种调用失败的情况(第一种情况可以把同步代码封装起来,提供一个接口出来用于手动调用hhhh,很笨但是很救命的办法),所以必须要加入重发机制,来让程序更加的健壮。
因为项目中也用到了rabbitmq,所以第一时间就想到了死信队列,之前的一种实现方式是使用死信队列的超时时间特性,第一次失败后,把参数放入死信队列,但是参数的bean中要有一个记录次数的值,第一次放的时候set为1。在消息超时后放入死信队列,被监听到时,再去调用接口,如果失败了,就按照次数去计算下一次执行的时间,然后重新放入到参数的bean中,再把bean重新放入到正常消费队列中,直到下一次消息超时被死信队列接收。不过这样的是要在代码中设置一个最大循环次数的,否则调用不通的情况下,会一直循环。如果成功了,那么我们就手动调用下 channel.basicAck 去手动签收一下(这里是要在配置中把自动签收改成手动签收)。
spring.rabbitmq.listener.simple.acknowledge-mode=manual 手动签收
这种方式虽然实现了功能,但是确增大了代码量,尤其是需要增添2个队列(一个正常消费队列,一个消费队列绑定的死信队列),不太方便扩展,而且也增加了复杂度,所以不太推荐这样的写法(代码就不贴了,如果有人感兴趣,可以给我留言,到时候我再更新)。
为了优化掉现有代码,于是我就又重新研究了一下,发现不就是重发么,rabbitmq早就给我们准备好了,-_-|| 自己这又是死信又是手动签收的,,,一顿操作,确实浪费了不少功夫。
实现过程:
那下面我们来看一下是如何实现的。
首先写贴下配置
##rabbit地址
spring.rabbitmq.addresses=amqp://guest:guest@localhost:5672
# 开启重发
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重发次数
spring.rabbitmq.listener.simple.retry.max-attempts=10
# 重试间隔时间 单位毫秒
spring.rabbitmq.listener.simple.retry.initial-interval=3000ms
# 重试最大间隔时间 单位毫秒
spring.rabbitmq.listener.simple.retry.max-interval=86400000ms
# 重发间隔因子 间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
spring.rabbitmq.listener.simple.retry.multiplier=2
开启rabbitmq的重发机制,并且设置好重试间隔时间(这个间隔时间应该是第一次到第二次的间隔时间,往后的间隔时间是通过间隔因子算的),以及最大间隔时间(避免出现无限重试的问题),还有重要的间隔因子,这样保证了每次的间隔时间是成比例增长的。
配置好后,接下来就要声明我们所用到的队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitmq重发配置
*/
@Configuration
public class RepeatSendRabbitmqConfig {
/**
* 正常队列
*/
public final static String REPEAT_QUEUE = "repeat_queue";
/**
* 交换机
*/
public final static String REPEAT_EXCHANGE = "repeat_exchange";
/**
* 路由键
*/
public final static String REPEAT_ROUTING_KEY = "repeat_routing_key";
/**
* 声明队列
*/
@Bean
Queue repeatQueue() {
return QueueBuilder.durable(REPEAT_QUEUE).build();
}
/**
* 声明交换机
* @return
*/
@Bean
DirectExchange repeatExchange() {
return new DirectExchange(REPEAT_EXCHANGE);
}
/**
* 将队列和交换机进行绑定
* @param repeatQueue
* @param repeatExchange
* @return
*/
@Bean
Binding dlxBinding(Queue repeatQueue, DirectExchange repeatExchange) {
return BindingBuilder.bind(repeatQueue).to(repeatExchange).with(REPEAT_ROUTING_KEY);
}
}
我们把队列,交换机,路由键都声明好后,下一步就要写接收代码了。
首先我们要写好生产者,也就是把消息放入到消息队列中的那步。
import com.google.gson.Gson;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 生产者,将消息放入队列
*/
@Component
public class MqSender {
Gson gson = new Gson();
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 将消息放入队列
* @param request
*/
public void repeatSend(RequestBean request){
log.err("repeat request " + gson.toJson(request),"接收时间:" + LocalDateTime.now());
rabbitTemplate.convertAndSend(RepeatSendRabbitmqConfig.REPEAT_EXCHANGE,
RepeatSendRabbitmqConfig.REPEAT_ROUTING_KEY,request);
}
}
注意了,repeatSend这个方法,只运行一次,这里并没有指定超时时间,仅仅是传入了exchange和routing key,通过之前的绑定,就能定位到是哪个队列,然后把参数放到队列中。
(这里有一个小坑,之前是使用的 AmqpTemplate amqpTemplate 这个接口来做数据存放,但是用了其中的方法,在测试的时候偶尔不会消费,,不知道什么原因,为了赶工,就改成使用 RabbitTemplate rabbitTemplate 了,有知道的小伙伴可以留言讨论一下)。
前期准备工作好后,最后一步就是拿来消费了。
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.time.LocalDateTime;
/**
* 接收者,消费者
*/
@Component
public class MqReceiver {
Gson gson = new Gson();
@Autowired
private RemoteService remoteService;
/**
* 同步接口重发队列实现
* @param request
*/
@RabbitListener(queues = RepeatSendRabbitmqConfig.REPEAT_QUEUE)
public void ListenRepeatSend(RequestBean request){
log.err(" ListenRepeatSend request " + gson.toJson(request),
"同步时间 " + LocalDateTime.now());
BaseResp response = remoteService.send(request);
// 重新同步,失败后抛异常重试
if(!"SUCCESS".equals(response.getCode())){
throw new RuntimeException();
}
}
}
使用 @RabbitListener 指定监听队列,那么这个队列就会被这个消费者所监听了。把参数传入我们自己的同步方法中,如果失败了,我们就抛异常出去,不用做其他的任何操作,只需要抛出去,rabbitmq就会按照配置的时间,以及间隔,来重新执行方法了,直到不抛异常,或者超过了配置中的最大时间,就停止重复执行了。
最后,使用生产者时,直接在代码中直接调用一下就好。
相关推荐
- linux下的定时或计时操作(gettimeofday等的用法秒\微秒\纳秒
-
一、用select()函数实现非阻塞时的等待时间,用到结构体structtimeval{},这里就不多说了。二、用gettimeofday()可获得...
- guava限流器RateLimiter使用简介(Springboot实现)
-
在大型分布式系统中,限流是一种重要的防护机制,可以帮助我们控制流量并减轻系统的负担。Google的Guava库提供了一种方便的限流器实现,可以帮助我们轻松地实现限流功能。本文将介绍Guava中限流器的...
- Mybatis配置文件XML全貌详解,再不懂我也没招了
-
一、为什么要使用配置文件试想,如果没有配置文件,我们的应用程序将只能沿着固定的姿态运行,几乎不能做任何动态的调整,那么这不是一套完美的设计,因为我们希望拥有更宽更灵活的操作空间和更多的兼容度,同时也能...
- 五分钟搞懂分布式流控算法原理和滑动窗口设计实现
-
流控的使用场景保护系统稳定性:...
- 实时数据处理的性能瓶颈:如何用Flink实现毫秒级延迟?
-
在大数据实时处理领域,ApacheFlink凭借其低延迟、高吞吐的特性,成为了许多企业的首选框架。然而,在实际生产环境中,实现毫秒级延迟仍然是一个巨大的挑战。本文将从性能优化的角度出发,详细讲解如...
- Java时间类介绍:Date的使用
-
在Java中用来表示日期的类有很多,最早使用的应该是Date类。Date类大概有两个,分别是:java.util.Date和java.sql.Date。前者对应我们日常生活中常用的日期类,而后者则表示...
- 不会用Redis做分布式流水号?
-
引言最近做项目,需要做单据编号,格式固定为:单据类型固定前缀+年月日时间戳+4位流水号,要求是每个单据类型的流水号唯一,方便后续业务使用。之前项目中使用的是UUID作为其他业务的单据编号,和组长沟通了...
- 用rabbitmq实现消息重发的功能
-
前言:在开发工作中,有很多时候会遇到要把数据同步给其他部门或三方的场景,这个时候光写一个同步接口是不太稳定的,因为有很多因素会导致同步接口运行失败或未运行,比如调接口之前的代码出现了bug,异常被th...
- 如何实现延迟队列(JDK/mysql/redis/Rabbit)
-
何为延迟队列队列,即先进先出的数据结构,就和食堂打饭一样,排在最前面的先打饭,打完饭就走;延迟队列即队列中的元素相比以往多了一个属性特征:延迟...
- 面试突击40:线程休眠的方法有几种?
-
在Java中,让线程休眠的方法有很多,这些方法大致可以分为两类,一类是设置时间,在一段时间后自动唤醒,而另一个类是提供了一对休眠和唤醒的方法,在线程休眠之后,可以在任意时间对线程进行唤醒。PS:休...
- 让java日期和时间处理变得更简单
-
时间和日期处理是Java编程中不可或缺的一部分。Java提供了许多内置的日期和时间类,可以用来处理日期和时间数据。但是,由于时间和日期的复杂性,处理时间和日期数据可能会变得非常棘手。在本文中,我...
- Java如何获取当前日期?
-
Talkischeap,Showmethecode.--by:LinusTorvalds使用获取当时日期的方式如下:System.currentTimeMillis();获取当时...
- 流水号设计及Leaf的升级使用
-
一、需求背景由于业务需要对数据和消息等进行唯一的标识。需要进行唯一流水号的设计。...
- 糟糕,被SimpleDateFormat坑到啦!
-
1.问题背景问题的背景是这样的,在最近需求开发中遇到需要将给定目标数据通过某一固定的计量规则进行过滤并打标生成明细数据...
- JDK8新特性解析:深入比较LocalDateTime和Date之间的区别
-
JDK8引入了java.time包,其中包含了LocalDateTime类,它与旧的Date类在表示日期和时间方面有着显著的区别。下面是对这两者之间区别的详细解释:...
- 一周热门
- 最近发表
- 标签列表
-
- mydisktest_v298 (34)
- document.appendchild (35)
- 头像打包下载 (61)
- acmecadconverter_8.52绿色版 (39)
- word文档批量处理大师破解版 (36)
- server2016安装密钥 (33)
- mysql 昨天的日期 (37)
- parsevideo (33)
- 个人网站源码 (37)
- centos7.4下载 (33)
- mysql 查询今天的数据 (34)
- intouch2014r2sp1永久授权 (36)
- 先锋影音源资2019 (35)
- jdk1.8.0_191下载 (33)
- axure9注册码 (33)
- pts/1 (33)
- spire.pdf 破解版 (35)
- shiro jwt (35)
- sklearn中文手册pdf (35)
- itextsharp使用手册 (33)
- 凯立德2012夏季版懒人包 (34)
- 反恐24小时电话铃声 (33)
- 冒险岛代码查询器 (34)
- 128*128png图片 (34)
- jdk1.8.0_131下载 (34)