admin 管理员组文章数量: 887021
2024年2月26日发(作者:vb语言知识点)
495657585966676869767778798687888996979899108109
82936373839package
import
import
import
/** *
监听配置信息 * * @author zrj * @since 2021/7/27 **/@Data@ComponentData;;Value;Component;public class BinLogConstants { @Value("${}") private String host; @Value("${}") private int port; @Value("${me}") private String username; @Value("${}") private String passwd; @Value("${}") private String db; @Value("${}") private String table; public static final int consumerThreads = 5; public static final long queueSleep = 1000;}Colum
1718192627Conf17181920package
import Data;;/** *
字段属性对象 * * @author zrj * @since 2021/7/27 **/@Datapublic class Colum { public int inx; public String colName; //
列名 public String dataType; //
类型 public String schema; //
数据库 public String table; //
表 public Colum(String schema, String table, int idx, String colName, String dataType) { = schema; = table; e = colName; pe = dataType; = idx; }}package
import
import
;AllArgsConstructor;Data;/** *
数据库配置 * * @author zrj * @since 2021/7/27 **/@Data@AllArgsConstructorpublic class Conf { private String host; private int port; private String username; private String passwd;}BinLogItem111213package
import
import
import
import
import
import
import static
import static
Data;Serializable;HashMap;Map;te;e;Maps;;EventType;
86373839464748495657585966676869767778/** * binlog对象 * * @author zrj * @since 2021/7/26 **/@Datapublic class BinLogItem implements Serializable { private static final long serialVersionUID = 5521290L; private String dbTable; private EventType eventType; private Long timestamp = null; private Long serverId = null; //
存储字段-之前的值之后的值 private Map
存储字段--类型 private Map
新增或者删除操作数据格式化 */ public static BinLogItem itemFromInsertOrDeleted(Serializable[] row, Map
初始化Item BinLogItem item = new BinLogItem(); ype = eventType; = columMap; = hMap(); = hMap(); Map
写操作放after,删操作放before if (isWrite(eventType)) { = beOrAf; } if (isDelete(eventType)) { = beOrAf; } return item; } /** *
更新操作数据格式化 */ public static BinLogItem itemFromUpdate(
初始化Item BinLogItem item = new BinLogItem(); ype = eventType;
75975975166167 if (y(table)) { return null; } return ByTab(table); } /** *
根据DBTable获取table * * @param dbTable * @return */ public static String getTable(String dbTable) { if (y(dbTable)) { return ""; } String[] split = ("-"); if ( == 2) { return split[1]; } return ""; } /** *
将逗号拼接字符串转List * * @param str * @return */ public static List
根据操作类型获取对应集合 * * @param binLogItem * @return */ public static Map
获取操作类型 EventType eventType = ntType(); if (isWrite(eventType) || isUpdate(eventType)) { return er(); } if (isDelete(eventType)) { return ore(); } return null; } /** *
获取操作类型 * * @param binLogItem * @return */ public static Integer getOptType(BinLogItem binLogItem) { //
获取操作类型 EventType eventType = ntType(); if (isWrite(eventType)) { return 1; } if (isUpdate(eventType)) { return 2;
4864842219 return 2; } if (isDelete(eventType)) { return 3; } return null; } /** *
根据storeId获取imgUrl */ public static String getImgUrl(Long storeId) { if (storeId == null) { return ""; } //获取url SearchStoreLogo searchStoreLogo = new SearchStoreLogo(); reId(storeId); List
格式化date * * @param date * @return */ public static Date getDateFormat(Date date) { if (date == null) { return null; } String dateFormat = "yyyy-MM-dd HH:mm:ss"; String strDate = (date, dateFormat); if (y(strDate)) { return null; } Date formatDate = (strDate, dateFormat); return formatDate; }}BinLogListener
11121314package
/** * BinLogListener监听器 * * @author zrj * @since 2021/7/26 **/@FunctionalInterface;public interface BinLogListener { void onEvent(BinLogItem item);}MysqlBinLogListener82936373839464748package
import
import
import
import
import
import
import
import
import
import
import
import
import static
import static
import static
/** *
数据库监听器 * * @author zrj * @since 2021/7/26 **/@Slf4jpublic class MysqlBinLogListener implements istener { @Option(name = "-binlog-consume_threads", usage = "the thread num of consumer") private int consumerThreads = erThreads; private BinaryLogClient parseClient; private BlockingQueue
存放每张数据表对应的listener private Multimap
监听器初始化 * * @param conf */ public MysqlBinLogListener(Conf conf) {Slf4j;Option;Resource;IOException;Serializable;Map;*;EventType.*;Map;able;ArrayListMultimap;Multimap;;BinaryLogClient;*;EventDeserializer;
495657585966676869767778798687888996979899112113 public MysqlBinLogListener(Conf conf) { BinaryLogClient client = new BinaryLogClient(t(), t(), rname(), swd()); EventDeserializer eventDeserializer = new EventDeserializer(); //patibilityMode(//序列化 // _AND_TIME_AS_LONG, // _AND_BINARY_AS_BYTE_ARRAY //); ntDeserializer(eventDeserializer); lient = client; = new ArrayBlockingQueue<>(1024); = conf; ers = (); eCols = new ConcurrentHashMap<>(); er = edThreadPool(consumerThreads); } /** *
监听处理 * * @param event */ @Override public void onEvent(Event event) { EventType eventType = der().getEventType(); if (eventType == _MAP) { TableMapEventData tableData = a(); String db = abase(); String table = le(); dbTable = getdbTable(db, table); } //
只处理添加删除更新三种操作 if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) { if (isWrite(eventType)) { WriteRowsEventData data = a(); for (Serializable[] row : s()) { if (nsKey(dbTable)) { BinLogItem item = omInsertOrDeleted(row, (dbTable), eventType); able(dbTable); (item); } } } if (isUpdate(eventType)) { UpdateRowsEventData data = a(); for (
75975971162 } /** *
注册监听 * * @param db
数据库 * @param table
操作表 * @param listener
监听器 * @throws Exception */ public void regListener(String db, String table, BinLogListener listener) throws Exception { String dbTable = getdbTable(db, table); //
获取字段集合 Map
保存字段信息 (dbTable, cols); //
保存当前注册的listener (dbTable, listener); } /** *
开启多线程消费 * * @throws IOException */ public void parse() throws IOException { erEventListener(this); for (int i = 0; i < consumerThreads; i++) { (() -> { while (true) { if (() > 0) { try { BinLogItem item = (); String dbtable = able(); (dbtable).forEach(binLogListener -> t(item)); } catch (InterruptedException e) { tackTrace(); } } (leep); } }); } t(); }}TourBinLogListener
版权声明:本文标题:Java监听mysql的binlog详解(mysql-binlog-connector) 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/jishu/1708949201h535091.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论