使用阿里开源的canal作为数据同步工具。
方案架构图方案架构.jpg
总的来说有两种方案
方案1 使用canal+mq 1.借助mq(rocketmq或kafka),可以接入多语言的客户端 2.可以借助mq实现一些失败重新消费的逻辑 1.依赖mq中间件 2.mq消费binlog的顺序性没有canal client那么灵活,具体见 mq顺序性问题 方案2 使用canal+canal client 1.无需引入其他中间件 2.可以通过canal的ack机制实现失败重新消费 1.client端是通过pull模式拉取的,实时性可能 没mq那么高 2.代码量相比mq可能会多点
本文把两种方式都实现下。如果公司有统一的平台接入binlog的话,canal+mq应该是比较好的解耦的方式。
准备 准备工作需要开启binlog写入功能,配置binlog-format为row模式,在my.conf文件中配置
log-bin=mysql-bin # 开启 binlog mysql8默认开 binlog-format=ROW # 选择 ROW 模式 mysql8好像默认是row server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
查看binlog是否开启
show binary logs;
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
注意自己的密码强度 如果设置的强度不够会报错
mysql_native_password这个是mysql8.0可能会报caching_sha2_password Auth failed
CREATE USER canal IDENTIFIED WITH mysql_native_password BY 'canal1234'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
启动canal
下载canal
canal release页面 https://github.com/alibaba/canal/releases
进入canal.deployer文件夹
修改配置文件conf/example/instance.properties
## mysql serverId 不要和mysql slave重复 canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的数据库信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,需要改成自己的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal1234 canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 #table regex canal.instance.filter.regex = .*\..*
1.slaveId不要和上面my.conf配置的server_id重复(因为canal的实现原理是当成mysql的slave来实现的)
主要配置项是address,dbUsername,dbPassword
2.canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
3.如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
启动
sh bin/startup.sh
如果启动过程报了Could not find first log file name in binary log index file 需要rm -rf conf/example/meta.dat
可以看到启动使用的配置文件是canal.properties,指定了destination是example
查看instance日志
tail -100f logs/example/example.log
查看server日志
tail -100f logs/canal/canal.log
关闭
sh bin/stop.sh
使用canal client
官网示例的client demo https://github.com/alibaba/canal/wiki/ClientExample
pom依赖
4.0.0com.alibaba.ottercanal.sample1.0-SNAPSHOTjarcanal.samplehttp://maven.apache.orgUTF-8junitjunit4.12testcom.alibaba.ottercanal.client1.1.0
CanalClientMysql2redis
package com.alibaba.otter.simple; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.common.utils.NamedThreadFactory; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 从canal拉取数据同步到redis * * @author tangzihao * @date 2021/4/26 9:08 上午 */ public class CanalClientMysql2Redis { private volatile boolean running = true; private final ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(500), new NamedThreadFactory("canal-redis-thread"), new ThreadPoolExecutor.CallerRunsPolicy()); private final Jedis jedis; public CanalClientMysql2Redis() { JedisPool jedisPool; JedisPoolConfig config = new JedisPoolConfig(); //最大空闲连接数,需自行评估,不超过Redis实例的最大连接数 config.setMaxIdle(200); //最大连接数,需自行评估,不超过Redis实例的最大连接数 config.setMaxTotal(300); config.setTestOnBorrow(false); config.setTestOnReturn(false); String host = "127.0.0.1"; jedisPool = new JedisPool(config, host, 6379, 3000); jedis = jedisPool.getResource(); if (!"PONG".equals(jedis.ping())) { throw new RuntimeException("redis 连接失败"); } } public void clientStart() { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; try { connector.connect(); connector.subscribe(".*..*"); connector.rollback(); while (running) { final Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(1000L); } catch (Exception e) { e.printStackTrace(); } } else { //这边采用了异步写redis的方式 异步写的话相比同步写效率会提高 //缺点就是发生异常不能利用connector.rollback 重试 executor.submit(() -> handleBinlog(message.getEntries())); } connector.ack(batchId); } } finally { connector.disconnect(); } } private void handleBinlog(Listentries) { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChange; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry, e); } CanalEntry.EventType eventType = rowChange.getEventType(); //通过header获取binlog名称位点;schema名称和table名称 System.out.printf("================%s binlog[%s:%s] , name[%s,%s] , eventType : %s%n================", Thread.currentThread().getName(), entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { MapcolumnValueMap = getColumnValue(rowData.getBeforeColumnsList()); String id = columnValueMap.get("id"); jedis.del(id); } else if (eventType == CanalEntry.EventType.INSERT) { MapcolumnValueMap = getColumnValue(rowData.getAfterColumnsList()); String id = columnValueMap.get("id"); columnValueMap.remove("id"); jedis.hmset(id, columnValueMap); } else { //更新的情况只取id和更新的column 避免没必要的全量更新 MapcolumnValueMap = getColumnValueOnlyUpdate(rowData.getAfterColumnsList()); String id = columnValueMap.get("id"); columnValueMap.remove("id"); jedis.hmset(id, columnValueMap); } } } } private MapgetColumnValue(Listcolumns) { MapparamMap = new HashMap<>(columns.size()); for (CanalEntry.Column column : columns) { paramMap.put(column.getName(), column.getValue()); } return paramMap; } private MapgetColumnValueOnlyUpdate(Listcolumns) { MapparamMap = new HashMap<>(columns.size()); for (CanalEntry.Column column : columns) { if ("id".equals(column.getName()) || column.getUpdated()) { paramMap.put(column.getName(), column.getValue()); } } return paramMap; } public void stopCanalClient() { this.running = false; } public static void main(String[] args) { CanalClientMysql2Redis canalClientMysql2Redis = new CanalClientMysql2Redis(); canalClientMysql2Redis.clientStart(); } }
使用rocketmq写入redis 修改配置文件 修改instance.properties
## mysql serverId canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的数据库信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,需要改成自己的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal1234 canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 #table regex canal.instance.filter.regex = .*\..* #canal动态topic canal.mq.dynamicTopic=.*..* canal.mq.partition=0
修改canal.properties
canal.serverMode = RocketMQ canal.mq.servers = 127.0.0.1:9876 canal.mq.retries = 0 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 canal.mq.lingerMs = 100 canal.mq.bufferMemory = 33554432 canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all #canal.mq.properties. = canal.mq.producerGroup = test # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local # aliyun mq namespace #canal.mq.namespace =
启动rocketmq
#启动nameserver /Users/zihao/Documents/servers/rocketmq-all-4.3.1-bin-release/bin/mqnamesrv & #启动broker /Users/zihao/Documents/servers/rocketmq-all-4.3.1-bin-release/bin/mqbroker -c /Users/zihao/Documents/servers/rocketmq-all-4.3.1-bin-release/conf/broker.conf & #启动rocketmq console nohup java -jar /Users/zihao/Documents/servers/rocketmq-externals-master/rocketmq-console/target/rocketmq-console-ng-1.0.1.jar &
查看是否接入成功
往test.company表插入一条数据,然后取rocketmq控制台的message查看下
截屏2021-04-26 下午10.13.59.png 监听topic,写入redis
RocketmqMysql2Redis
package com.alibaba.otter.simple; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import com.alibaba.otter.model.BinlogModel; import com.alibaba.otter.model.Company; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import redis.clients.jedis.Jedis; import java.util.HashMap; import java.util.Map; /** * @author tangzihao * @date 2021/4/26 10:19 下午 */ public class RocketmqMysql2Redis { private static final Integer MAX_RETRIES = 3; public static void main(String[] args) throws MQClientException { Jedis jedis = new Jedis("127.0.0.1", 6379); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_company-consumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("test_company", "*"); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { MessageExt msg = msgs.get(0); String bodyString = new String(msg.getBody()); try { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), bodyString); handleBinlog(bodyString, jedis); System.out.println("topic:" + msg.getTopic()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { e.printStackTrace(); if (msg.getReconsumeTimes() >= MAX_RETRIES) { //自己做记录,补偿机制 不让rocketmq重试了 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } System.out.println("第" + msg.getReconsumeTimes() + "重试"); } return ConsumeConcurrentlyStatus.RECONSUME_LATER; }); consumer.start(); System.out.printf("Consumer Started.%n"); } private static void handleBinlog(String bodyString, Jedis jedis) { BinlogModelbinlogModel = JSON.parseObject(bodyString, new TypeReference<BinlogModel>() { }); String eventType = binlogModel.getType(); Company company = binlogModel.getData().get(0); String id = String.valueOf(company.getId()); if ("DELETE".equals(eventType)) { jedis.del(id); } else { MapparamMap = new HashMap<>(); paramMap.put("name", company.getName()); paramMap.put("city", company.getCity()); paramMap.put("domain", company.getDomain()); paramMap.put("email", company.getEmail()); paramMap.put("sdate", company.getSdate()); jedis.hmset(id, paramMap); } } }
binlog顺序性
官方文档有给出顺序性的说明 https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
在上面的配置中,我使用的是canal.mq.partition=0,是单分区(rocketmq中应该叫consume queue), 看下控制台可以看到全是发送到queue=0的消费队列
截屏2021-04-26 下午11.12.08.png
方案 | 实现 | 优点 | 缺点 |
---|