欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
欢迎跳转到本文的原文链接:https://honeypps.com/backend/read-mysql-binlog-by-using-openreplicator/
Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。
Open Replicator项目地址:https://github.com/whitesock/open-replicator
binlog事件分析结构图
在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。
这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:
DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):
{
"eventId": 1,
"databaseName": "canal_test",
"tableName": "`company`",
"eventType": 2,
"timestamp": 1477033198000,
"timestampReceipt": 1477033248780,
"binlogName": "mysql-bin.000006",
"position": 353,
"nextPostion": 468,
"serverId": 2,
"before": null,
"after": null,
"isDdl": true,
"sql": "DROP TABLE `company` /* generated by server */"
}
DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):
{
"eventId": 0,
"databaseName": "canal_test",
"tableName": "person",
"eventType": 24,
"timestamp": 1477030734000,
"timestampReceipt": 1477032161988,
"binlogName": "mysql-bin.000006",
"position": 242,
"nextPostion": 326,
"serverId": 2,
"before": {
"id": "3",
"sex": "f",
"address": "shanghai",
"age": "https://files.jxasp.com/image/23",
"name": "zzh3"
},
"after": {
"id": "3",
"sex": "m",
"address": "shanghai",
"age": "https://files.jxasp.com/image/23",
"name": "zzh3"
},
"isDdl": false,
"sql": null
}
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
相关的类文件如下:
CDCEvent.java
package or;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;
public class CDCEvent {
private long eventId = 0;//事件唯一标识
private String databaseName = null;
private String tableName = null;
private int eventType = 0;//事件类型
private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]
private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]
private String binlogName = null;// binlog file name
private long position = 0;
private long nextPostion = 0;
private long serverId = 0;
private Map<String,String> before = null;
private Map<String,String> after = null;
private Boolean isDdl= null;
private String sql = null;
private static AtomicLong uuid = new AtomicLong(0);
public CDCEvent(){}
public CDCEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName){
this.init(are);
this.databaseName = databaseName;
this.tableName = tableName;
}
private void init(final BinlogEventV4 be){
this.eventId = uuid.getAndAdd(1);
BinlogEventV4Header header = be.getHeader();
this.timestamp = header.getTimestamp();
this.eventType = header.getEventType();
this.serverId = header.getServerId();
this.timestampReceipt = header.getTimestampOfReceipt();
this.position = header.getPosition();
this.nextPostion = header.getNextPosition();
this.binlogName = header.getBinlogFileName();
}
@Override
public String toString(){
StringBuilder builder = new StringBuilder();
builder.append("{ eventId:").append(eventId);
builder.append(",databaseName:").append(databaseName);
builder.append(",tableName:").append(tableName);
builder.append(",eventType:").append(eventType);
builder.append(",timestamp:").append(timestamp);
builder.append(",timestampReceipt:").append(timestampReceipt);
builder.append(",binlogName:").append(binlogName);
builder.append(",position:").append(position);
builder.append(",nextPostion:").append(nextPostion);
builder.append(",serverId:").append(serverId);
builder.append(",isDdl:").append(isDdl);
builder.append(",sql:").append(sql);
builder.append(",before:").append(before);
builder.append(",after:").append(after).append("}");
return builder.toString();
}
// 省略Getter和Setter方法
}
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
open-replicator的解析主要是通过注册Listener的形式实现的,整个过程最重要的步骤在下面:
InstanceListener.java
package or;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import or.keeper.TableInfoKeeper;
import or.manager.CDCEventManager;
import or.model.ColumnInfo;
import or.model.TableInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.code.or.binlog.BinlogEventListener;
import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
import com.google.code.or.binlog.impl.event.FormatDescriptionEvent;
import com.google.code.or.binlog.impl.event.QueryEvent;
import com.google.code.or.binlog.impl.event.TableMapEvent;
import com.google.code.or.binlog.impl.event.UpdateRowsEvent;
import com.google.code.or.binlog.impl.event.WriteRowsEvent;
import com.google.code.or.binlog.impl.event.XidEvent;
import com.google.code.or.common.glossary.Column;
import com.google.code.or.common.glossary.Pair;
import com.google.code.or.common.glossary.Row;
import com.google.code.or.common.util.MySQLConstants;
public class InstanceListener implements BinlogEventListener{
private static final Logger logger = LoggerFactory.getLogger(InstanceListener.class);
@Override
public void onEvents(BinlogEventV4 be) {
if(be == null){
logger.error("binlog event is null");
return;
}
int eventType = be.getHeader().getEventType();
switch(eventType){
case MySQLConstants.FORMAT_DESCRIPTION_EVENT:
{
logger.trace("FORMAT_DESCRIPTION_EVENT");
break;
}
case MySQLConstants.TABLE_MAP_EVENT://每次ROW_EVENT前都伴随一个TABLE_MAP_EVENT事件,保存一些表信息,如tableId, tableName, databaseName, 而ROW_EVENT只有tableId
{
TableMapEvent tme = (TableMapEvent)be;
TableInfoKeeper.saveTableIdMap(tme);
logger.trace("TABLE_MAP_EVENT:tableId:{}",tme.getTableId());
break;
}
case MySQLConstants.DELETE_ROWS_EVENT:
{
DeleteRowsEvent dre = (DeleteRowsEvent) be;
long tableId = dre.getTableId();
logger.trace("DELETE_ROW_EVENT:tableId:{}",tableId);
TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
List<Row> rows = dre.getRows();
for(Row row:rows){
List<Column> before = row.getColumns();
Map<String,String> beforeMap = getMap(before,databaseName,tableName);
if(beforeMap !=null && beforeMap.size()>0){
CDCEvent cdcEvent = new CDCEvent(dre,databaseName,tableName);
cdcEvent.setIsDdl(false);
cdcEvent.setSql(null);
cdcEvent.setBefore(beforeMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
}
}
break;
}
case MySQLConstants.UPDATE_ROWS_EVENT:
{
UpdateRowsEvent upe = (UpdateRowsEvent)be;
long tableId = upe.getTableId();
logger.info("UPDATE_ROWS_EVENT:tableId:{}",tableId);
TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
List<Pair<Row>> rows = upe.getRows();
for(Pair<Row> p:rows){
List<Column> colsBefore = p.getBefore().getColumns();
List<Column> colsAfter = p.getAfter().getColumns();
Map<String,String> beforeMap = getMap(colsBefore,databaseName,tableName);
Map<String,String> afterMap = getMap(colsAfter,databaseName,tableName);
if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){
CDCEvent cdcEvent = new CDCEvent(upe,databaseName,tableName);
cdcEvent.setIsDdl(false);
cdcEvent.setSql(null);
cdcEvent.setBefore(beforeMap);
cdcEvent.setAfter(afterMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
}
}
break;
}
case MySQLConstants.WRITE_ROWS_EVENT:
{
WriteRowsEvent wre = (WriteRowsEvent)be;
long tableId = wre.getTableId();
logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);
TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
List<Row> rows = wre.getRows();
for(Row row: rows){
List<Column> after = row.getColumns();
Map<String,String> afterMap = getMap(after,databaseName,tableName);
if(afterMap!=null && afterMap.size()>0){
CDCEvent cdcEvent = new CDCEvent(wre,databaseName,tableName);
cdcEvent.setIsDdl(false);
cdcEvent.setSql(null);
cdcEvent.setAfter(afterMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
}
}
break;
}
case MySQLConstants.QUERY_EVENT:
{
QueryEvent qe = (QueryEvent)be;
TableInfo tableInfo = createTableInfo(qe);
if(tableInfo == null)
break;
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
logger.trace("QUERY_EVENT:databaseName:{},tableName:{}",databaseName,tableName);
CDCEvent cdcEvent = new CDCEvent(qe,databaseName,tableName);
cdcEvent.setIsDdl(true);
cdcEvent.setSql(qe.getSql().toString());
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
break;
}
case MySQLConstants.XID_EVENT:{
XidEvent xe = (XidEvent)be;
logger.trace("XID_EVENT: xid:{}",xe.getXid());
break;
}
default:
{
logger.trace("DEFAULT:{}",eventType);
break;
}
}
}
/**
* ROW_EVENT中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,
* 然后跟取回的List<Column>进行映射。
*
* @param cols
* @param databaseName
* @param tableName
* @return
*/
private Map<String,String> getMap(List<Column> cols, String databaseName, String tableName){
Map<String,String> map = new HashMap<>();
if(cols == null || cols.size()==0){
return null;
}
String fullName = databaseName+"."+tableName;
List<ColumnInfo> columnInfoList = TableInfoKeeper.getColumns(fullName);
if(columnInfoList == null)
return null;
if(columnInfoList.size() != cols.size()){
TableInfoKeeper.refreshColumnsMap();
if(columnInfoList.size() != cols.size())
{
logger.warn("columnInfoList.size is not equal to cols.");
return null;
}
}
for(int i=0;i<columnInfoList.size(); i++){
if(cols.get(i).getValue()==null)
map.put(columnInfoList.get(i).getName(),"");
else
map.put(columnInfoList.get(i).getName(), cols.get(i).toString());
}
return map;
}
/**
* 从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,
* 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中
*
* @param qe
* @return
*/
private TableInfo createTableInfo(QueryEvent qe){
String sql = qe.getSql().toString().toLowerCase();
TableInfo ti = new TableInfo();
String databaseName = qe.getDatabaseName().toString();
String tableName = null;
if(checkFlag(sql,"table")){
tableName = getTableName(sql,"table");
} else if(checkFlag(sql,"truncate")){
tableName = getTableName(sql,"truncate");
} else{
logger.warn("can not find table name from sql:{}",sql);
return null;
}
ti.setDatabaseName(databaseName);
ti.setTableName(tableName);
ti.setFullName(databaseName+"."+tableName);
return ti;
}
private boolean checkFlag(String sql, String flag){
String[] ss = sql.split(" ");
for(String s:ss){
if(s.equals(flag)){
return true;
}
}
return false;
}
private String getTableName(String sql, String flag){
String[] ss = sql.split("\\.");
String tName = null;
if (ss.length > 1) {
String[] strs = ss[1].split(" ");
tName = strs[0];
} else {
String[] strs = sql.split(" ");
boolean start = false;
for (String s : strs) {
if (s.indexOf(flag) >= 0) {
start = true;
continue;
}
if (start && !s.isEmpty()) {
tName = s;
break;
}
}
}
tName.replaceAll("`", "").replaceAll(";", "");
//del "("[create table person(....]
int index = tName.indexOf('(');
if(index>0){
tName = tName.substring(0, index);
}
return tName;
}
}
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
- 250
- 251
- 252
- 253
- 254
- 255
- 256
- 257
- 258
- 259
- 260
- 261
- 262
- 263
- 264
- 265
- 266
- 267
- 268
- 269
- 270
- 271
上面所涉及到的TableInfo .java如下:
package or.model;
public class TableInfo {
private String databaseName;
private String tableName;
private String fullName;
// 省略Getter和Setter
@Override
public boolean equals(Object o){
if(this == o)
return true;
if(o == null || this.getClass()!=o.getClass())
return false;
TableInfo tableInfo = (TableInfo)o;
if(!this.databaseName.equals(tableInfo.getDatabaseName()))
return false;
if(!this.tableName.equals(tableInfo.getTableName()))
return false;
if(!this.fullName.equals(tableInfo.getFullName()))
return false;
return true;
}
@Override
public int hashCode(){
int result = this.tableName.hashCode();
result = 31*result+this.databaseName.hashCode();
result = 31*result+this.fullName.hashCode();
return result;
}
}
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
接着需要有个地方保存从TABLE_MAP_EVENT中提取到的信息,TableInfoKeeper .java
package or.keeper;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import or.MysqlConnection;
import or.model.ColumnInfo;
import or.model.TableInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.code.or.binlog.impl.event.TableMapEvent;
public class TableInfoKeeper {
private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);
private static Map<Long,TableInfo> tabledIdMap = new ConcurrentHashMap<>();
private static Map<String,List<ColumnInfo>> columnsMap = new ConcurrentHashMap<>();
static{
columnsMap = MysqlConnection.getColumns();
}
public static void saveTableIdMap(TableMapEvent tme){
long tableId = tme.getTableId();
tabledIdMap.remove(tableId);
TableInfo table = new TableInfo();
table.setDatabaseName(tme.getDatabaseName().toString());
table.setTableName(tme.getTableName().toString());
table.setFullName(tme.getDatabaseName()+"."+tme.getTableName());
tabledIdMap.put(tableId, table);
}
public static synchronized void refreshColumnsMap(){
Map<String,List<ColumnInfo>> map = MysqlConnection.getColumns();
if(map.size()>0){
// logger.warn("refresh and clear cols.");
columnsMap = map;
// logger.warn("refresh and switch cols:{}",map);
}
else
{
logger.error("refresh columnsMap error.");
}
}
public static TableInfo getTableInfo(long tableId){
return tabledIdMap.get(tableId);
}
public static List<ColumnInfo> getColumns(String fullName){
return columnsMap.get(fullName);
}
}
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
正如上面InstanceListener中提到的,有些信息需要直接从MySQL中读取,比如数据库表的列信息,相关的类MysqlConnection如下:
package or;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import or.model.BinlogInfo;
import or.model.BinlogMasterStatus;
import or.model.ColumnInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MysqlConnection {
private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);
private static Connection conn;
private static String host;
private static int port;
private static String user;
private static String password;
public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg){
try {
if(conn == null || conn.isClosed()){
Class.forName("com.mysql.jdbc.Driver");
host = hostArg;
port = portArg;
user = userArg;
password = passwordArg;
conn = DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/",user,password);
logger.info("connected to mysql:{} : {}",user,password);
}
} catch (ClassNotFoundException e) {
logger.error(e.getMessage(),e);
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}
public static Connection getConnection(){
try {
if(conn == null || conn.isClosed()){
setConnection(host,port,user,password);
}
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
return conn;
}
/**
* 获取Column信息
*
* @return
*/
public static Map<String,List<ColumnInfo>> getColumns(){
Map<String,List<ColumnInfo>> cols = new HashMap<>();
Connection conn = getConnection();
try {
DatabaseMetaData metaData = conn.getMetaData();
ResultSet r = metaData.getCatalogs();
String tableType[] = {"TABLE"};
while(r.next()){
String databaseName = r.getString("TABLE_CAT");
ResultSet result = metaData.getTables(databaseName, null, null, tableType);
while(result.next()){
String tableName = result.getString("TABLE_NAME");
// System.out.println(result.getInt("TABLE_ID"));
String key = databaseName +"."+tableName;
ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);
cols.put(key, new ArrayList<ColumnInfo>());
while(colSet.next()){
ColumnInfo columnInfo = new ColumnInfo(colSet.getString("COLUMN_NAME"),colSet.getString("TYPE_NAME"));
cols.get(key).add(columnInfo);
}
}
}
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
return cols;
}
/**
* 参考
* mysql> show binary logs
* +------------------+-----------+
* | Log_name | File_size |
* +------------------+-----------+
* | mysql-bin.000001 | 126 |
* | mysql-bin.000002 | 126 |
* | mysql-bin.000003 | 6819 |
* | mysql-bin.000004 | 1868 |
* +------------------+-----------+
*/
public static List<BinlogInfo> getBinlogInfo(){
List<BinlogInfo> binlogList = new ArrayList<>();
Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;
try {
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show binary logs");
while(resultSet.next()){
BinlogInfo binlogInfo = new BinlogInfo(resultSet.getString("Log_name"),resultSet.getLong("File_size"));
binlogList.add(binlogInfo);
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally{
try {
if(resultSet != null)
resultSet.close();
if(statement != null)
statement.close();
if(conn != null)
conn.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}
return binlogList;
}
/**
* 参考:
* mysql> show master status;
* +------------------+----------+--------------+------------------+
* | File | Position | Binlog_Do_DB | Binlog_Ignore_DB |
* +------------------+----------+--------------+------------------+
* | mysql-bin.000004 | 1868 | | |
* +------------------+----------+--------------+------------------+
* @return
*/
public static BinlogMasterStatus getBinlogMasterStatus(){
BinlogMasterStatus binlogMasterStatus = new BinlogMasterStatus();
Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;
try {
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show master status");
while(resultSet.next()){
binlogMasterStatus.setBinlogName(resultSet.getString("File"));
binlogMasterStatus.setPosition(resultSet.getLong("Position"));
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally{
try {
if(resultSet != null)
resultSet.close();
if(statement != null)
statement.close();
if(conn != null)
conn.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}
return binlogMasterStatus;
}
/**
* 获取open-replicator所连接的mysql服务器的serverid信息
* @return
*/
public static int getServerId(){
int serverId=6789;
Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;
try {
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show variables like 'server_id'");
while(resultSet.next()){
serverId = resultSet.getInt("Value");
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally{
try {
if(resultSet != null)
resultSet.close();
if(statement != null)
statement.close();
if(conn != null)
conn.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}
return serverId;
}
}
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
上面代码设计的附加类(BinlogInfo.java; BinlogMasterStatus.java; ColumnInfo.java)
package or.model;
public class BinlogInfo {
private String binlogName;
private Long fileSize;
// 省略Getter和Setter
}
package or.model;
public class BinlogMasterStatus {
private String binlogName;
private long position;
// 省略Getter和Setter
}
package or.model;
public class ColumnInfo {
private String name;
private String type;
// 省略Getter和Setter
}
- 17
- 18
- 19
- 20
最后还要有个地方存储解析之后的事件信息,这里简要设计下,采用一个ConcurrentLinkedDeque好了(CDCEventManager.java)
package or.manager;
import java.util.concurrent.ConcurrentLinkedDeque;
import or.CDCEvent;
public class CDCEventManager {
public static final ConcurrentLinkedDeque<CDCEvent> queue = new ConcurrentLinkedDeque<>();
}
- 1
- 2
- 3
- 4
- 5
- 6
所有的准备工作都完成了,下面可以解析binlog日志了:
package or.test;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import or.CDCEvent;
import or.InstanceListener;
import or.MysqlConnection;
import or.OpenReplicatorPlus;
import or.manager.CDCEventManager;
import or.model.BinlogMasterStatus;
import com.google.code.or.OpenReplicator;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
public class OpenReplicatorTest {
private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);
private static final String host = "xx.xx.xx.60";
private static final int port = 3306;
private static final String user = "****";
private static final String password = "****";
public static void main(String[] args){
OpenReplicator or = new OpenReplicator ();
or.setUser(user);
or.setPassword(password);
or.setHost(host);
or.setPort(port);
MysqlConnection.setConnection(host, port, user, password);
// or.setServerId(MysqlConnection.getServerId());
//配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverId
BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();
or.setBinlogFileName(bms.getBinlogName());
// or.setBinlogFileName("mysql-bin.000004");
or.setBinlogPosition(4);
or.setBinlogEventListener(new InstanceListener());
try {
or.start();
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
Thread thread = new Thread(new PrintCDCEvent());
thread.start();
}
public static class PrintCDCEvent implements Runnable{
@Override
public void run() {
while(true){
if(CDCEventManager.queue.isEmpty() == false)
{
CDCEvent ce = CDCEventManager.queue.pollFirst();
Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();
String prettyStr1 = gson.toJson(ce);
System.out.println(prettyStr1);
}
else{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
时间运行旧了会遇到这样一个问题:
16-10-21 10:41:49.365 ERROR[binlog-parser-1 AbstractBinlogParser.run:247] failed to parse binlog
java.io.EOFException: null
at com.google.code.or.io.util.ActiveBufferedInputStream.read(ActiveBufferedInputStream.java:169) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.doFill(XInputStreamImpl.java:236) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.read(XInputStreamImpl.java:213) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:141) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:61) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.binlog.impl.ReplicationBasedBinlogParser.doParse(ReplicationBasedBinlogParser.java:91) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.binlog.impl.AbstractBinlogParser$Task.run(AbstractBinlogParser.java:244) ~[open-replicator-1.0.7.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_80]
16-10-21 10:41:49.371 INFO [binlog-parser-1 TransportImpl.disconnect:121] disconnected from xx.xx.xx.60:3306
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
初步解决方案(extends OpenReplicator然后添加重试机制):
package or;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.code.or.OpenReplicator;
public class OpenReplicatorPlus extends OpenReplicator{
private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorPlus.class);
private volatile boolean autoRestart = true;
@Override
public void stopQuietly(long timeout, TimeUnit unit){
super.stopQuietly(timeout, unit);
if(autoRestart){
try {
TimeUnit.SECONDS.sleep(10);
logger.error("Restart OpenReplicator");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- 17
- 18
- 19
- 20
- 21
- 22
- 23
最后只需将OpenReplicatorTest.java中的OpenReplicator or = new OpenReplicator ();改为OpenReplicator or = new OpenReplicatorPlus ();即可。
大功告成~~
参考资料
- 谈谈对Canal(增量数据订阅与消费)的理解
- MySQL主备复制原理、实现及异常处理
- https://github.com/whitesock/open-replicator
欢迎跳转到本文的原文链接:https://honeypps.com/backend/read-mysql-binlog-by-using-openreplicator/
欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。