admin 管理员组

文章数量: 887021


2024年2月26日发(作者:vb语言知识点)

495657585966676869767778798687888996979899108109 hutool-all 5.5.4 compile args4j args4j 2.33 guava 28.1-jre s commons-lang3 3.9 mysql-binlog-connector-java 0.17.0 gson 2.8.5 spring-boot-maven-plugin tlombok lombok

82936373839package

import

import

import

/** *

监听配置信息 * * @author zrj * @since 2021/7/27 **/@Data@ComponentData;;Value;Component;public class BinLogConstants { @Value("${}") private String host; @Value("${}") private int port; @Value("${me}") private String username; @Value("${}") private String passwd; @Value("${}") private String db; @Value("${}") private String table; public static final int consumerThreads = 5; public static final long queueSleep = 1000;}Colum

1718192627Conf17181920package

import Data;;/** *

字段属性对象 * * @author zrj * @since 2021/7/27 **/@Datapublic class Colum { public int inx; public String colName; //

列名 public String dataType; //

类型 public String schema; //

数据库 public String table; //

表 public Colum(String schema, String table, int idx, String colName, String dataType) { = schema; = table; e = colName; pe = dataType; = idx; }}package

import

import

;AllArgsConstructor;Data;/** *

数据库配置 * * @author zrj * @since 2021/7/27 **/@Data@AllArgsConstructorpublic class Conf { private String host; private int port; private String username; private String passwd;}BinLogItem111213package

import

import

import

import

import

import

import static

import static

Data;Serializable;HashMap;Map;te;e;Maps;;EventType;

86373839464748495657585966676869767778/** * binlog对象 * * @author zrj * @since 2021/7/26 **/@Datapublic class BinLogItem implements Serializable { private static final long serialVersionUID = 5521290L; private String dbTable; private EventType eventType; private Long timestamp = null; private Long serverId = null; //

存储字段-之前的值之后的值 private Map before = null; private Map after = null; //

存储字段--类型 private Map colums = null; /** *

新增或者删除操作数据格式化 */ public static BinLogItem itemFromInsertOrDeleted(Serializable[] row, Map columMap, EventType eventType) { if (null == row || null == columMap) { return null; } if ( != ()) { return null; } //

初始化Item BinLogItem item = new BinLogItem(); ype = eventType; = columMap; = hMap(); = hMap(); Map beOrAf = hMap(); et().forEach(entry -> { String key = (); Colum colum = ue(); (key, row[]); }); //

写操作放after,删操作放before if (isWrite(eventType)) { = beOrAf; } if (isDelete(eventType)) { = beOrAf; } return item; } /** *

更新操作数据格式化 */ public static BinLogItem itemFromUpdate( mapEntry, Map columMap, EventType eventType if (null == mapEntry || null == columMap) { return null; } //

初始化Item BinLogItem item = new BinLogItem(); ype = eventType;

75975975166167 if (y(table)) { return null; } return ByTab(table); } /** *

根据DBTable获取table * * @param dbTable * @return */ public static String getTable(String dbTable) { if (y(dbTable)) { return ""; } String[] split = ("-"); if ( == 2) { return split[1]; } return ""; } /** *

将逗号拼接字符串转List * * @param str * @return */ public static List getListByStr(String str) { if (y(str)) { return ayList(); } return ((",")); } /** *

根据操作类型获取对应集合 * * @param binLogItem * @return */ public static Map getOptMap(BinLogItem binLogItem) { //

获取操作类型 EventType eventType = ntType(); if (isWrite(eventType) || isUpdate(eventType)) { return er(); } if (isDelete(eventType)) { return ore(); } return null; } /** *

获取操作类型 * * @param binLogItem * @return */ public static Integer getOptType(BinLogItem binLogItem) { //

获取操作类型 EventType eventType = ntType(); if (isWrite(eventType)) { return 1; } if (isUpdate(eventType)) { return 2;

4864842219 return 2; } if (isDelete(eventType)) { return 3; } return null; } /** *

根据storeId获取imgUrl */ public static String getImgUrl(Long storeId) { if (storeId == null) { return ""; } //获取url SearchStoreLogo searchStoreLogo = new SearchStoreLogo(); reId(storeId); List searchStoreLogos = List(searchStoreLogo); if (mpty(searchStoreLogos)) { SearchStoreLogo storeLogo = (0); if (storeLogo != null) { return reLogo(); } } return ""; } /** *

格式化date * * @param date * @return */ public static Date getDateFormat(Date date) { if (date == null) { return null; } String dateFormat = "yyyy-MM-dd HH:mm:ss"; String strDate = (date, dateFormat); if (y(strDate)) { return null; } Date formatDate = (strDate, dateFormat); return formatDate; }}BinLogListener

11121314package

/** * BinLogListener监听器 * * @author zrj * @since 2021/7/26 **/@FunctionalInterface;public interface BinLogListener { void onEvent(BinLogItem item);}MysqlBinLogListener82936373839464748package

import

import

import

import

import

import

import

import

import

import

import

import

import static

import static

import static

/** *

数据库监听器 * * @author zrj * @since 2021/7/26 **/@Slf4jpublic class MysqlBinLogListener implements istener { @Option(name = "-binlog-consume_threads", usage = "the thread num of consumer") private int consumerThreads = erThreads; private BinaryLogClient parseClient; private BlockingQueue queue; private final ExecutorService consumer; //

存放每张数据表对应的listener private Multimap listeners; private Conf conf; private Map> dbTableCols; private String dbTable; /** *

监听器初始化 * * @param conf */ public MysqlBinLogListener(Conf conf) {Slf4j;Option;Resource;IOException;Serializable;Map;*;EventType.*;Map;able;ArrayListMultimap;Multimap;;BinaryLogClient;*;EventDeserializer;

495657585966676869767778798687888996979899112113 public MysqlBinLogListener(Conf conf) { BinaryLogClient client = new BinaryLogClient(t(), t(), rname(), swd()); EventDeserializer eventDeserializer = new EventDeserializer(); //patibilityMode(//序列化 // _AND_TIME_AS_LONG, // _AND_BINARY_AS_BYTE_ARRAY //); ntDeserializer(eventDeserializer); lient = client; = new ArrayBlockingQueue<>(1024); = conf; ers = (); eCols = new ConcurrentHashMap<>(); er = edThreadPool(consumerThreads); } /** *

监听处理 * * @param event */ @Override public void onEvent(Event event) { EventType eventType = der().getEventType(); if (eventType == _MAP) { TableMapEventData tableData = a(); String db = abase(); String table = le(); dbTable = getdbTable(db, table); } //

只处理添加删除更新三种操作 if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) { if (isWrite(eventType)) { WriteRowsEventData data = a(); for (Serializable[] row : s()) { if (nsKey(dbTable)) { BinLogItem item = omInsertOrDeleted(row, (dbTable), eventType); able(dbTable); (item); } } } if (isUpdate(eventType)) { UpdateRowsEventData data = a(); for ( row : s()) { if (nsKey(dbTable)) { BinLogItem item = omUpdate(row, (dbTable), eventType); able(dbTable); (item); } } } if (isDelete(eventType)) { DeleteRowsEventData data = a(); for (Serializable[] row : s()) { if (nsKey(dbTable)) { BinLogItem item = omInsertOrDeleted(row, (dbTable), eventType); able(dbTable); (item); } } } }

75975971162 } /** *

注册监听 * * @param db

数据库 * @param table

操作表 * @param listener

监听器 * @throws Exception */ public void regListener(String db, String table, BinLogListener listener) throws Exception { String dbTable = getdbTable(db, table); //

获取字段集合 Map cols = getColMap(conf, db, table); //

保存字段信息 (dbTable, cols); //

保存当前注册的listener (dbTable, listener); } /** *

开启多线程消费 * * @throws IOException */ public void parse() throws IOException { erEventListener(this); for (int i = 0; i < consumerThreads; i++) { (() -> { while (true) { if (() > 0) { try { BinLogItem item = (); String dbtable = able(); (dbtable).forEach(binLogListener -> t(item)); } catch (InterruptedException e) { tackTrace(); } } (leep); } }); } t(); }}TourBinLogListener


本文标签: 操作 获取 类型 监听