百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Redis应用—7.大Value处理方案

csdh11 2025-02-06 14:31 17 浏览

大纲

1.?案设计

2.安装与配置环境


1.?案设计

步骤一:首先需要配置一个crontab定时调度shell脚本,然后该脚本每天凌晨会通过rdbtools?具解析Redis的RDB?件,接着对解析出的内容进行过滤,把RDB?件中的?key导出到CSV?件


步骤二:使?SQL导?CSV?件到MySQL数据库中,同时使?Canal监听MySQL的binlog?志。


步骤三:Canal会发送增量的大key数据消息到RocketMQ,RocketMQ的消费者系统会对增量的大key数据消息进?消费,消息中便会包含?key的详情信息。这样消费者就可以将?key的信息通过邮件等?式,通知开发?员


为什么要把?key的CSV?件导?到MySQL存储?为什么不直接监听?key的CSV?件进?通知?


原因一:如果不导?MySQL,那么就?法使?Canal来监听。这样就要开发?个程序,定时去扫描Redis节点下解析出来的CSV?件。如果Redis集群中有多个节点,那么每?个节点都要去扫描。?将CSV导?到MySQL后,只需要使?Canal去监听MySQL表的binlog,就可以把增量数据同步到RocketMQ中,由消费者统?进?处理。


原因二:解析CSV?件?直接从MySQL中查询复杂很多,尤其是需要进行信息过滤。导?到MySQL后可以通过SQL轻松的对?key的记录进?条件筛选,并且可以对每天产?的?key数据进?存储分析。


RDB解析?成的CSV?件结构如下:

database,type,key,size_in_bytes,encoding,num_elements,len_largest_element, expiry 
0,string,key1-string,20536,string,17280,17280, 
0,list,key1-list,4006,quicklist,24,1530,


2.安装与配置环境

(1)依赖环境

(2)安装Python3 & pip3

(3)安装rdb-tools

(4)安装RocketMQ

(5)安装Canal

(6)rdbtools扫描RDB?件

(7)将CSV?件导?MySQL


(1)依赖环境

Python3、pip3、rdb-tools、Redis、MySQL、JDK、RocketMQ、Canal。


rdb-tools是开源的?个python项?,它可以?来解析Redis的RDB?件,但是要先安装Python环境。

连接地址:
https://github.com/sripathikrishnan/redis-rdb-tools

pip是Python的包管理?具,安装Python后,这个?具就会配套安装好。


(2)安装Python3 & pip3

# 安装编译?具
$ yum -y groupinstall "Development tools"
$ yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel 
$ yum install libffi-devel -y
# 下载python3.7.0
$ cd /usr/local
$ wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tar.xz
$ tar -xvJf Python-3.7.0.tar.xz
# 编译
$ mkdir /usr/local/python3
$ cd Python-3.7.0
$ ./configure --prefix=/usr/local/python3
$ make && make install

(3)安装rdb-tools

# 使?pip包管理程序安装rdb-tools
$ pip3 install rdbtools python-lzf
# 配置环境变量
$ vim /etc/profile
# 在?件底部最末尾,追加如下两?内容
PATH=/usr/local/python3/bin:$PATH
export PATH

验证安装:

# 验证rdbtools是否安装成功
$ rdb -h

(4)安装RocketMQ

一.下载安装包

# 下载安装包,要注意版本与项?中依赖的RocketMQ版本兼容
$ wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip

二.修改默认配置

# 解压
$ unzip rocketmq-all-4.7.1-bin-release.zip
# 切换?录
$ cd /usr/local/rocketmq-all-4.7.1-bin-release
# 修改nameserver默认堆栈??
$ vim ./bin/runserver.sh
# 修改brokerserver默认堆栈??
$ vim bin/runbroker.sh

NameServer默认配置如下:

 -server -Xms4g -Xmx4g -Xmn2g 
 -XX:MetaspacesSize=128m -XX:MaxMetaspaceSize=320m

BrokerServer默认配置如下:

 -server -Xms8g -Xmx8g -Xmn4g

三.修改BrokerServer的IP地址

$ vim /usr/local/rocketmq-all-4.7.1-bin-release/conf/broker.conf

在broker.conf?件中追加如下内容:

# brokerserver所在机器的公?IP地址 
brokerIP1=192.168.95.129

四.启动RocketMQ

# 启动nameserver
$ nohup sh ./bin/mqnamesrv &
# 查看nameserver启动?志
$ tailf ~/logs/rocketmqlogs/namesrv.log
# 启动brokerserver
$ nohup sh bin/mqbroker -n 127.0.0.1:9876 -c conf/broker.conf &
# 查看brokerserver启动?志
$ tailf ~/logs/rocketmqlogs/broker.log

五.启动RocketMQ控制台

github地址:https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0

可以从github上clone下载,然后使?maven命令打包,然后如下启动:

$ nohup java -jar -server -Xms256m -Xmx256m \
-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Dserver.port=8080 \
/usr/local/rocketmq-console-ng-1.0.1.jar &

(5)安装Canal

一.下载安装包

# 下载canal-admin
$ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
# 下载canal-deployer
$ wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

二.安装canal-admin

# 创建解压?录
$ mkdir /usr/local/canal-admin
# 解压
$ tar -zxvf canal.admin-1.1.5.tar.gz -c /usr/local/canal-admin

目录结构如下:

三.初始化Canal数据库

执?conf?录下的canal_manager.sql?件

# 执?conf?录下的canal_manager.sql?件
$ cd /usr/local/canal-admin/conf
$ mysql -u?户名 -p密码 -hIP地址 -P端?号 < canal_manager.sql

canal_manger.sql?件内容如下:

CREATE DATABASE /*!32312 IF NOT EXISTS*/ `canal_manager` /*!40100 DEFAULT
CHARACTER SET utf8 COLLATE utf8_bin */;
USE `canal_manager`;
SET NAMES utf8;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for canal_adapter_config
-- ----------------------------
DROP TABLE IF EXISTS `canal_adapter_config`;
CREATE TABLE `canal_adapter_config` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `category` varchar(45) NOT NULL,
    `name` varchar(45) NOT NULL,
    `status` varchar(45) DEFAULT NULL,
    `content` text NOT NULL,
    `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
    CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_cluster
-- ----------------------------
DROP TABLE IF EXISTS `canal_cluster`;
CREATE TABLE `canal_cluster` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `name` varchar(63) NOT NULL,
    `zk_hosts` varchar(255) NOT NULL,
    `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
    CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_config
-- ----------------------------
DROP TABLE IF EXISTS `canal_config`;
CREATE TABLE `canal_config` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `cluster_id` bigint(20) DEFAULT NULL,
    `server_id` bigint(20) DEFAULT NULL,
    `name` varchar(45) NOT NULL,
    `status` varchar(45) DEFAULT NULL,
    `content` text NOT NULL,
    `content_md5` varchar(128) NOT NULL,
    `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
    CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`),
    UNIQUE KEY `sid_UNIQUE` (`server_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_instance_config
-- ----------------------------
DROP TABLE IF EXISTS `canal_instance_config`;
CREATE TABLE `canal_instance_config` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `cluster_id` bigint(20) DEFAULT NULL,
    `server_id` bigint(20) DEFAULT NULL,
    `name` varchar(45) NOT NULL,
    `status` varchar(45) DEFAULT NULL,
    `content` text NOT NULL,
    `content_md5` varchar(128) DEFAULT NULL,
    `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
    CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`),
    UNIQUE KEY `name_UNIQUE` (`name`) 
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_node_server
-- ----------------------------
DROP TABLE IF EXISTS `canal_node_server`;
CREATE TABLE `canal_node_server` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `cluster_id` bigint(20) DEFAULT NULL,
    `name` varchar(63) NOT NULL,
    `ip` varchar(63) NOT NULL,
    `admin_port` int(11) DEFAULT NULL,
    `tcp_port` int(11) DEFAULT NULL,
    `metric_port` int(11) DEFAULT NULL,
    `status` varchar(45) DEFAULT NULL,
    `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
    CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for canal_user
-- ----------------------------
DROP TABLE IF EXISTS `canal_user`;
CREATE TABLE `canal_user` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
    `username` varchar(31) NOT NULL,
    `password` varchar(128) NOT NULL,
    `name` varchar(31) NOT NULL,
    `roles` varchar(31) NOT NULL,
    `introduction` varchar(255) DEFAULT NULL,
    `avatar` varchar(255) DEFAULT NULL,
    `creation_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
    CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`) 
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;
-- ----------------------------
-- Records of canal_user
-- ----------------------------
BEGIN;
INSERT INTO `canal_user` VALUES (1, 'admin', '6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28');
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;     

四.修改conf?录下的application.yml?件

# 修改conf?录下的application.yml?件
$ vim /usr/local/canal-admin/conf/application.yml

application.yml?件内容如下:

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

五.启动canal-admin

$ cd /usr/local/canal-admin 
$ bin/startup.sh

然后访问Canal管理控制台:http://ip:8089,?户名和密码分别是:admin | 123456。


此时登录进?后,会发现?前什么数据都没有。但这没有关系,接着会启动canal-server,因为要?canal-admin来管理每个canal-server的实例


采?canal-admin来管理canal-server:当canal-server启动时,canal-server是会?动注册到canal-admin上的


六.关闭canal-admin

$ cd /usr/local/canal.admin-1.1.5
$ bin/stop.sh

注意:关闭时,请不要使?kill进程号的?式来关闭,而使?执?脚本的?式关闭。因为如果kill进程后,下次再次执?启动脚本时,会出现found admin.pid , Please run stop.sh first ,then startup.sh的提示。当然,出现这种情况的时候,可以到bin?录下将admin.pid删除掉。


七.安装canal-server

# 创建解压?录
$ mkdir /usr/local/canal-server
# 解压
$ tar -zxvf canal.deployer-1.1.5.tar.gz -c /usr/local/canal-server

目录结构如下:

八.修改conf?录下的canal_local.properties

# 修改conf?录下的canal_local.properties?件
$ vim /usr/local/canal-server/conf/canal_local.properties

canal_local.properties?件内容如下:

# register ip 这?的ip选择您本机的ip(也就是启动canal-server机器的所在ip地址) 
canal.register.ip = 192.168.95.129 

# canal admin config 这?是部署canal-admin的所在机器的ip,当然也可以把canal-admin和canal-server部署到?台机器 
canal.admin.manager = 192.168.95.129:8089 
canal.admin.port = 11110 
canal.admin.user = admin 
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 
# admin auto register 这??定得是true,否则?法在启动canal-server时候注册到canal-admin上 
canal.admin.register.auto = true 
canal.admin.register.cluster = 
# canal-server注册到canal-admin控制台的名称,这??canal-server的ip地址,注意改ip 
canal.admin.register.name = 192.168.95.129

九.启动canal-server

$ cd /usr/local/canal-server
# 切记后?加?local的参数
$ bin/startup.sh local

这时可以到canal-admin的界面中查看canal-server是否已经注册成功。


十.在canal-admin中配置Canal Instance来监听数据库

?先点击界面右侧的Instance管理,再点击新建Instance来创建实例。

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false 

# position info
# 需要修改
# 需要监听数据库的ip加端?
canal.instance.master.address=192.168.95.129:3306

# mysql主库连接时起始的binlog?件,这?可以不写,默认为mysql-bin
canal.instance.master.journal.name=

# binlog?志的位置
canal.instance.master.position=

# 开始同步binlog?志的时间戳,也就是从哪个时间点开始同步binlog?志,13位时间戳格式
canal.instance.master.timestamp=

# 如果数据库开启了gtid模式,这?填写master节点的gtid我们这?不写也是可以的,如果要开启,记得将canal.instance.gtidon改为true 
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true

#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# 需要修改
# username/password这?填写要连接数据库的?户名和密码
canal.instance.dbUsername=root

# 需要修改
# canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8

# enable druid Decrypt database password
canal.instance.enableDruid=false

#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# 需要修改
# table regex,这?填写订阅的数据库的库与表的相关正则表达式
canal.instance.filter.regex=careerplan_eshop_redis.redis_large_key_log

# table black regex
canal.instance.filter.black.regex=

# table field filter(format:
schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch

# table field black filter(format:schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# 需要修改 
# MQ Config,这?填写实例名称即可
# 如果canal.serverMode选择的不是tcp模式,这?填写相关的topic的名称,kafka和rocketmq默认的主题为example
# 同时需要注意,如果期望使?的canal-server的?作模式是MQ的?式来运?,那么需要修改canal.properties的配置
canal.mq.topic=binlog_monitor_large_key_topic

# dynamic topic route by schema or table regex
canal.mq.partition=0

# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

接着点击保存,就会跳转到列表页,然后在新创建的canal-instance后?点击启动,接着点击操作?志去查看相关?志。?此,?个canal-instance就启动成功了。


十一.关闭canal-server

$ cd /usr/local/canal-server
$ bin/stop.sh

(6)rdbtools扫描RDB?件

# 切换?录
[root@localhost bin]# cd /usr/local/redis/bin

# 在这个?录下存放着Redis的rdb?件
[root@localhost bin]# ls
dump.rdb redis-benchmark redis-check-aof redis-check-rdb redis-cli 
redis-sentinel redis-server

# 使?rdbtools?具过滤dump.rdb?件中的?key,?成dump.csv?件
[root@localhost bin]# rdb -c memory dump.rdb --bytes 10240 -f dump.csv
# 可以看到?成的dump.csv?件

[root@localhost bin]# ls
dump.csv dump.rdb redis-benchmark redis-check-aof redis-check-rdb
redis-cli redis-sentinel redis-server

# 查看dump.csv?件的内容
[root@localhost bin]# vim dump.csv

rdb参数说明:

rdb -c memory dump.rdb --bytes 10240 -f dump.csv

dump.rdb是指定Redis的rdb?件的路径,--bytes 10240表示过滤出key值??超过10240B的key,也就是10K。


dump.csv?件内容如下:

database,type,key,size_in_bytes,encoding,num_elements,len_largest_element,expiry 
0,string,key1-string,20536,string,17280,17280,

(7)将CSV?件导?MySQL

一.先查看secure_file_priv属性是否开启

secure_file_priv属性指定导??件的位置,只有在该属性指定的?录下的?件才可以导?MySQL。

mysql> show variables like '%secure%'; 
+--------------------------+------------------------------------------------+ 
| Variable_name            | Value                                          |
+--------------------------+------------------------------------------------+ 
| require_secure_transport | OFF                                            |
| secure_auth              | ON                                             | 
| secure_file_priv         | ...                                            | 
+--------------------------+------------------------------------------------+

二.修改MySQL配置?件?件

# 找到mysqld?件的位置
[root@localhost bin]# find / -name "mysqld"
/run/mysqld
/usr/sbin/mysqld

# 找到mysql的默认配置?件位置
[root@localhost bin]# /usr/sbin/mysqld --verbose --help |grep -A 1
'Default options'
Default options are read from the following files in the given order:
/etc/my.cnf /etc/mysql/my.cnf /usr/etc/my.cnf ~/.my.cn

# 修改mysql的默认配置?件
[root@localhost etc]# vim /etc/my.cnf

在[mysqld]模块下,如果存在下列属性就修改,如果不存在就追加:

[mysqld]
# 关闭安全?件导?路径
secure-file-priv=""
# 开启binlog
log-bin=mysql-bin
binlog-format=ROW
server_id=1 

三.重启MySQL服务

# 检查mysql服务运?状态
$ service mysqld status
# 重启服务
$ service mysqld restart

重启服务后重新连接MySQL,查看secure_file_priv和log_bin的值:

mysql> show variables like '%secure%';
+--------------------------+-------+ 
| Variable_name            | Value |
+--------------------------+-------+
| require_secure_transport | OFF   |
| secure_auth              | ON    |
| secure_file_priv         |       |
+--------------------------+-------+

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+

四.创建redis_large_key_log表

create table redis_large_key_log (
    `id` bigint primary key auto_increment,
    `database` tinyint comment 'Redis数据库索引',
    `type` varchar(20) comment 'Redis数据类型',
    `key` varchar(256) comment 'Redis key',
    `size_in_bytes` int comment 'value对于的bytes',
    `encoding` varchar(30) comment '编码',
    `num_elements` int comment '元素数量',
    `len_largest_element` int comment '元素?度',
    `expiry` varchar(30) comment '过期时间',
    `create_time` datetime DEFAULT CURRENT_TIMESTAMP,
    `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
    CURRENT_TIMESTAMP
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

五.编写导?csv?件的SQL脚本

$ cd /usr/local/redis/bin/
# 创建SQL脚本?件 
$ touch csv-transfer-db.sql
# 编辑SQL脚本
vim csv-transfer-db.sql

csv-transfer-db.sql?件内容如下:

# 指定数据库
USE `careerplan_eshop_redis`;
load data infile '/usr/local/redis/bin/dump.csv' into table redis_large_key_log fields terminated by',' lines terminated by'\n' ignore 1 lines 
(`database`,`type`,`key`,size_in_bytes,encoding,num_elements,len_largest_element,expiry);

六.编写定时任务脚本

脚本职能:

调?rdbtools?具,扫描?key,dump出csv?件。

调?SQL脚本,将csv?件导?数据库。

$ touch monitor-large-key-to-db.sh 
$ vim monitor-large-key-to-db.sh


monitor-large-key-to-db.sh?件内容如下:

# crontab没有环境变量给你运?,所以要在shell开头?动添加环境
source /etc/profile
. ~/.bash_profile
#!/bin/bash
echo "开始执?monitor-large-key-to-db.sh脚本" >> /usr/local/redis/monitor-large-key-log.txt
rdb -c memory /usr/local/redis/bin/dump.rdb --bytes 102400 -f /usr/local/redis/bin/dump.csv
echo "扫描redis过滤出?key,?key数据保存到/usr/local/redis/bin/dump.csv?件" >> /usr/local/redis/monitor-large-key-log.txt
mysql -u?户名 -p密码 -hIP地址 -P端?号 < /usr/local/redis/bin/csv-transfer-db.sql
echo "csv?件数据已导?mysql" >> /usr/local/redis/monitor-large-key-log.txt

七.创建调度任务

$ crontab -e
# 每天凌晨3点进??次调度,将会扫描rdb?件,将?key存储到MySQL
0 3 * * * sh /usr/local/redis/bin/monitor-large-key-to-db.sh

(8)binlog数据消费者

一.接?说明

消费redis_large_key_log表的binlog数据,该数据包含Redis的?key信息。


二.代码位置

com.demo.eshop.monitor.mq.consumer.ConsumerBeanConfig#receiveLargeKeyMonitorConsumer

具体实现如下:

@Configuration
public class ConsumerBeanConfig {
    //配置内容对象
    @Autowired
    private RocketMQProperties rocketMQProperties;

    //Redis大key binlog 消费者
    @Bean("cookbookLargeKeyMonitorTopic")
    public DefaultMQPushConsumer receiveLargeKeyMonitorConsumer(CookbookLargeKeyMonitorListener cookbookLargeKeyMonitorListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.BINLOG_MONITOR_LARGE_KEY_GROUP);
        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
        consumer.subscribe(RocketMqConstant.BINLOG_MONITOR_LARGE_KEY_TOPIC, "*");
        consumer.registerMessageListener(cookbookLargeKeyMonitorListener);
        consumer.start();
        return consumer;
    }
    ...
}

三.参数说明


CookbookLargeKeyMonitorListener表示针对
BINLOG_MONITOR_LARGE_KEY_GROUP的Listener,它会监听Canal推送的
BINLOG_MONITOR_LARGE_KEY_TOPIC消息,然后对消息解析,通过邮件、钉钉等推送给开发?员。具体实现如下:

@Component
public class CookbookLargeKeyMonitorListener implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        try {
            for (MessageExt messageExt : list) {
                String msg = new String(messageExt.getBody());
                // 解析binlog数据模型
                BinlogDataDTO binlogData = BinlogUtils.getBinlogData(msg);
                log.info("消费到binlog消息, binlogData: {}", binlogData);
                // 推送通知
                informByPush(binlogData);
            }
        } catch (Exception e) {
            // 本次消费失败,下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    //第三方平台推送消息到app
    private void informByPush(BinlogDataDTO binlogData) {
        log.info("消息推送中:消息内容:{}", binlogData);
    }
}

相关推荐

探索Java项目中日志系统最佳实践:从入门到精通

探索Java项目中日志系统最佳实践:从入门到精通在现代软件开发中,日志系统如同一位默默无闻却至关重要的管家,它记录了程序运行中的各种事件,为我们排查问题、监控性能和优化系统提供了宝贵的依据。在Java...

用了这么多年的java日志框架,你真的弄懂了吗?

在项目开发过程中,有一个必不可少的环节就是记录日志,相信只要是个程序员都用过,可是咱们自问下,用了这么多年的日志框架,你确定自己真弄懂了日志框架的来龙去脉嘛?下面笔者就详细聊聊java中常用日志框架的...

物理老师教你学Java语言(中篇)(物理专业学编程)

第四章物质的基本结构——类与对象...

一文搞定!Spring Boot3 定时任务操作全攻略

各位互联网大厂的后端开发小伙伴们,在使用SpringBoot3开发项目时,你是否遇到过定时任务实现的难题呢?比如任务调度时间不准确,代码报错却找不到方向,是不是特别头疼?如今,随着互联网业务规模...

你还不懂java的日志系统吗 ?(java的日志类)

一、背景在java的开发中,使用最多也绕不过去的一个话题就是日志,在程序中除了业务代码外,使用最多的就是打印日志。经常听到的这样一句话就是“打个日志调试下”,没错在日常的开发、调试过程中打印日志是常干...

谈谈枚举的新用法--java(java枚举的作用与好处)

问题的由来前段时间改游戏buff功能,干了一件愚蠢的事情,那就是把枚举和运算集合在一起,然后运行一段时间后buff就出现各种问题,我当时懵逼了!事情是这样的,做过游戏的都知道,buff,需要分类型,且...

你还不懂java的日志系统吗(javaw 日志)

一、背景在java的开发中,使用最多也绕不过去的一个话题就是日志,在程序中除了业务代码外,使用最多的就是打印日志。经常听到的这样一句话就是“打个日志调试下”,没错在日常的开发、调试过程中打印日志是常干...

Java 8之后的那些新特性(三):Java System Logger

去年12月份log4j日志框架的一个漏洞,给Java整个行业造成了非常大的影响。这个事情也顺带把log4j这个日志框架推到了争议的最前线。在Java领域,log4j可能相对比较流行。而在log4j之外...

Java开发中的日志管理:让程序“开口说话”

Java开发中的日志管理:让程序“开口说话”日志是程序员的朋友,也是程序的“嘴巴”。它能让程序在运行过程中“开口说话”,告诉我们它的状态、行为以及遇到的问题。在Java开发中,良好的日志管理不仅能帮助...

吊打面试官(十二)--Java语言中ArrayList类一文全掌握

导读...

OS X 效率启动器 Alfred 详解与使用技巧

问:为什么要在Mac上使用效率启动器类应用?答:在非特殊专业用户的环境下,(每天)用户一般可以在系统中进行上百次操作,可以是点击,也可以是拖拽,但这些只是过程,而我们的真正目的是想获得结果,也就是...

Java中 高级的异常处理(java中异常处理的两种方式)

介绍异常处理是软件开发的一个关键方面,尤其是在Java中,这种语言以其稳健性和平台独立性而闻名。正确的异常处理不仅可以防止应用程序崩溃,还有助于调试并向用户提供有意义的反馈。...

【性能调优】全方位教你定位慢SQL,方法介绍下!

1.使用数据库自带工具...

全面了解mysql锁机制(InnoDB)与问题排查

MySQL/InnoDB的加锁,一直是一个常见的话题。例如,数据库如果有高并发请求,如何保证数据完整性?产生死锁问题如何排查并解决?下面是不同锁等级的区别表级锁:开销小,加锁快;不会出现死锁;锁定粒度...

看懂这篇文章,你就懂了数据库死锁产生的场景和解决方法

一、什么是死锁加锁(Locking)是数据库在并发访问时保证数据一致性和完整性的主要机制。任何事务都需要获得相应对象上的锁才能访问数据,读取数据的事务通常只需要获得读锁(共享锁),修改数据的事务需要获...