关键词搜索

源码搜索 ×
×

采用OpenReplicator解析MySQL binlog

发布2016-11-07浏览12486次

详情内容


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


欢迎跳转到本文的原文链接:https://honeypps.com/backend/read-mysql-binlog-by-using-openreplicator/

Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。
Open Replicator项目地址:https://github.com/whitesock/open-replicator

binlog事件分析结构图

这里写图片描述

在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。

这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:

DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):

{
  "eventId": 1,
  "databaseName": "canal_test",
  "tableName": "`company`",
  "eventType": 2,
  "timestamp": 1477033198000,
  "timestampReceipt": 1477033248780,
  "binlogName": "mysql-bin.000006",
  "position": 353,
  "nextPostion": 468,
  "serverId": 2,
  "before": null,
  "after": null,
  "isDdl": true,
  "sql": "DROP TABLE `company` /* generated by server */"
}

    DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):

    {
      "eventId": 0,
      "databaseName": "canal_test",
      "tableName": "person",
      "eventType": 24,
      "timestamp": 1477030734000,
      "timestampReceipt": 1477032161988,
      "binlogName": "mysql-bin.000006",
      "position": 242,
      "nextPostion": 326,
      "serverId": 2,
      "before": {
        "id": "3",
        "sex": "f",
        "address": "shanghai",
        "age": "https://files.jxasp.com/image/23",
        "name": "zzh3"
      },
      "after": {
        "id": "3",
        "sex": "m",
        "address": "shanghai",
        "age": "https://files.jxasp.com/image/23",
        "name": "zzh3"
      },
      "isDdl": false,
      "sql": null
    }
    
      17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    相关的类文件如下:
    CDCEvent.java

    package or;
    
    import java.util.Map;
    import java.util.concurrent.atomic.AtomicLong;
    
    import com.google.code.or.binlog.BinlogEventV4;
    import com.google.code.or.binlog.BinlogEventV4Header;
    import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;
    
    public class CDCEvent {
    	private long eventId = 0;//事件唯一标识
    	private String databaseName = null;
    	private String tableName = null;
    	private int eventType = 0;//事件类型
    	private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]
    	private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]
    	private String binlogName = null;// binlog file name
    	private long position = 0;
    	private long nextPostion = 0;
    	private long serverId = 0;
    	private Map<String,String> before = null;
    	private Map<String,String> after = null;
    	private Boolean isDdl= null;
    	private String sql = null;
    	
    	private static AtomicLong uuid = new AtomicLong(0);
    	public CDCEvent(){}
    		
    	public CDCEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName){
    		this.init(are);
    		this.databaseName = databaseName;
    		this.tableName = tableName;
    	}
    	
    	private void init(final BinlogEventV4 be){
    		this.eventId = uuid.getAndAdd(1);
    		BinlogEventV4Header header = be.getHeader();
    		
    		this.timestamp = header.getTimestamp();
    		this.eventType = header.getEventType();
    		this.serverId = header.getServerId();
    		this.timestampReceipt = header.getTimestampOfReceipt();
    		this.position = header.getPosition();
    		this.nextPostion = header.getNextPosition();
    		this.binlogName = header.getBinlogFileName();
    	}
    	
    	@Override
    	public String toString(){
    		StringBuilder builder = new StringBuilder();
    		builder.append("{ eventId:").append(eventId);
    		builder.append(",databaseName:").append(databaseName);
    		builder.append(",tableName:").append(tableName);
    		builder.append(",eventType:").append(eventType);
    		builder.append(",timestamp:").append(timestamp);
    		builder.append(",timestampReceipt:").append(timestampReceipt);
    		builder.append(",binlogName:").append(binlogName);
    		builder.append(",position:").append(position);
    		builder.append(",nextPostion:").append(nextPostion);
    		builder.append(",serverId:").append(serverId);
    		builder.append(",isDdl:").append(isDdl);
    		builder.append(",sql:").append(sql);
    		builder.append(",before:").append(before);
    		builder.append(",after:").append(after).append("}");
    		
    		return builder.toString();
    	}
    // 省略Getter和Setter方法	
    }
    
      17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    open-replicator的解析主要是通过注册Listener的形式实现的,整个过程最重要的步骤在下面:
    InstanceListener.java

    package or;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import or.keeper.TableInfoKeeper;
    import or.manager.CDCEventManager;
    import or.model.ColumnInfo;
    import or.model.TableInfo;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.google.code.or.binlog.BinlogEventListener;
    import com.google.code.or.binlog.BinlogEventV4;
    import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
    import com.google.code.or.binlog.impl.event.FormatDescriptionEvent;
    import com.google.code.or.binlog.impl.event.QueryEvent;
    import com.google.code.or.binlog.impl.event.TableMapEvent;
    import com.google.code.or.binlog.impl.event.UpdateRowsEvent;
    import com.google.code.or.binlog.impl.event.WriteRowsEvent;
    import com.google.code.or.binlog.impl.event.XidEvent;
    import com.google.code.or.common.glossary.Column;
    import com.google.code.or.common.glossary.Pair;
    import com.google.code.or.common.glossary.Row;
    import com.google.code.or.common.util.MySQLConstants;
    
    public class InstanceListener implements BinlogEventListener{
    	private static final Logger logger = LoggerFactory.getLogger(InstanceListener.class);
    	
    	@Override
    	public void onEvents(BinlogEventV4 be) {
    		if(be == null){
    			logger.error("binlog event is null");
    			return;
    		}
    		
    		int eventType = be.getHeader().getEventType();
    		switch(eventType){
    			case MySQLConstants.FORMAT_DESCRIPTION_EVENT:
    			{
    				logger.trace("FORMAT_DESCRIPTION_EVENT");
    				break;
    			}
    			case MySQLConstants.TABLE_MAP_EVENT://每次ROW_EVENT前都伴随一个TABLE_MAP_EVENT事件,保存一些表信息,如tableId, tableName, databaseName, 而ROW_EVENT只有tableId
    			{
    				TableMapEvent tme = (TableMapEvent)be;
    				TableInfoKeeper.saveTableIdMap(tme);
    				logger.trace("TABLE_MAP_EVENT:tableId:{}",tme.getTableId());
    				break;
    			}
    			case MySQLConstants.DELETE_ROWS_EVENT:
    			{
    				DeleteRowsEvent dre = (DeleteRowsEvent) be;
    				long tableId = dre.getTableId();
    				logger.trace("DELETE_ROW_EVENT:tableId:{}",tableId);
    				
    				TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
    				String databaseName = tableInfo.getDatabaseName();
    				String tableName = tableInfo.getTableName();
    				
    				List<Row> rows = dre.getRows();
    				for(Row row:rows){
    					List<Column> before = row.getColumns();
    					Map<String,String> beforeMap = getMap(before,databaseName,tableName);
    					if(beforeMap !=null && beforeMap.size()>0){
    						CDCEvent cdcEvent = new CDCEvent(dre,databaseName,tableName);
    						cdcEvent.setIsDdl(false);
    						cdcEvent.setSql(null);
    						cdcEvent.setBefore(beforeMap);
    						CDCEventManager.queue.addLast(cdcEvent);
    						logger.info("cdcEvent:{}",cdcEvent);
    					}
    				}
    				break;
    			}
    			case MySQLConstants.UPDATE_ROWS_EVENT:
    			{
    				UpdateRowsEvent upe = (UpdateRowsEvent)be;
    				long tableId = upe.getTableId();
    				logger.info("UPDATE_ROWS_EVENT:tableId:{}",tableId);
    				
    				TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
    				String databaseName = tableInfo.getDatabaseName();
    				String tableName = tableInfo.getTableName();
    				
    				List<Pair<Row>> rows = upe.getRows();
    				for(Pair<Row> p:rows){
    					List<Column> colsBefore = p.getBefore().getColumns();
    					List<Column> colsAfter = p.getAfter().getColumns();
    					
    					Map<String,String> beforeMap = getMap(colsBefore,databaseName,tableName);
    					Map<String,String> afterMap = getMap(colsAfter,databaseName,tableName);
    					if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){
    						CDCEvent cdcEvent = new CDCEvent(upe,databaseName,tableName);
    						cdcEvent.setIsDdl(false);
    						cdcEvent.setSql(null);
    						cdcEvent.setBefore(beforeMap);
    						cdcEvent.setAfter(afterMap);
    						CDCEventManager.queue.addLast(cdcEvent);
    						logger.info("cdcEvent:{}",cdcEvent);
    					}
    				}
    				break;
    			}
    			case MySQLConstants.WRITE_ROWS_EVENT:
    			{
    				WriteRowsEvent wre = (WriteRowsEvent)be;
    				long tableId = wre.getTableId();
    				logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);
    				
    				TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
    				String databaseName = tableInfo.getDatabaseName();
    				String tableName = tableInfo.getTableName();
    				
    				List<Row> rows = wre.getRows();
    				for(Row row: rows){
    					List<Column> after = row.getColumns();
    					Map<String,String> afterMap = getMap(after,databaseName,tableName);
    					if(afterMap!=null && afterMap.size()>0){
    						CDCEvent cdcEvent = new CDCEvent(wre,databaseName,tableName);
    						cdcEvent.setIsDdl(false);
    						cdcEvent.setSql(null);
    						cdcEvent.setAfter(afterMap);
    						CDCEventManager.queue.addLast(cdcEvent);
    						logger.info("cdcEvent:{}",cdcEvent);
    					}
    				}
    				break;
    			}
    			case MySQLConstants.QUERY_EVENT:
    			{
    				QueryEvent qe = (QueryEvent)be;
    				TableInfo tableInfo = createTableInfo(qe);
    				if(tableInfo == null)
    					break;
    				String databaseName = tableInfo.getDatabaseName();
    				String tableName = tableInfo.getTableName();
    				logger.trace("QUERY_EVENT:databaseName:{},tableName:{}",databaseName,tableName);
    				
    				CDCEvent cdcEvent = new CDCEvent(qe,databaseName,tableName);
    				cdcEvent.setIsDdl(true);
    				cdcEvent.setSql(qe.getSql().toString());
    				
    				CDCEventManager.queue.addLast(cdcEvent);
    				logger.info("cdcEvent:{}",cdcEvent);
    
    				break;
    			}
    			case MySQLConstants.XID_EVENT:{
    				XidEvent xe = (XidEvent)be;
    				logger.trace("XID_EVENT: xid:{}",xe.getXid());
    				break;
    			}
    			default:
    			{
    				logger.trace("DEFAULT:{}",eventType);
    				break;
    			}
    		}
    		
    	}
    	
    	/**
    	 * ROW_EVENT中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,
    	 * 然后跟取回的List<Column>进行映射。
    	 * 
    	 * @param cols
    	 * @param databaseName
    	 * @param tableName
    	 * @return
    	 */
    	private Map<String,String> getMap(List<Column> cols, String databaseName, String tableName){
    		Map<String,String> map = new HashMap<>();
    		if(cols == null || cols.size()==0){
    			return null;
    		}
    		
    		String fullName = databaseName+"."+tableName;
    		List<ColumnInfo> columnInfoList = TableInfoKeeper.getColumns(fullName);
    		if(columnInfoList == null)
    			return null;
    		if(columnInfoList.size() != cols.size()){
    			TableInfoKeeper.refreshColumnsMap();
    			if(columnInfoList.size() != cols.size())
    			{
    				logger.warn("columnInfoList.size is not equal to cols.");
    				return null;
    			}
    		}
    		
    		for(int i=0;i<columnInfoList.size(); i++){
    			if(cols.get(i).getValue()==null)
    				map.put(columnInfoList.get(i).getName(),"");
    			else
    				map.put(columnInfoList.get(i).getName(), cols.get(i).toString());
    		}
    		
    		return map;
    	}
    
    	/**
    	 * 从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,
    	 * 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中
    	 * 
    	 * @param qe
    	 * @return
    	 */	
    	private TableInfo createTableInfo(QueryEvent qe){
    		String sql = qe.getSql().toString().toLowerCase();
    		
    		TableInfo ti = new TableInfo();
    		String databaseName = qe.getDatabaseName().toString();
    		String tableName = null;
    		if(checkFlag(sql,"table")){
    			tableName = getTableName(sql,"table");
    		} else if(checkFlag(sql,"truncate")){
    			tableName = getTableName(sql,"truncate");
    		} else{
    			logger.warn("can not find table name from sql:{}",sql);
    			return null;
    		}
    		ti.setDatabaseName(databaseName);
    		ti.setTableName(tableName);
    		ti.setFullName(databaseName+"."+tableName);
    		
    		return ti;
    	}
    
    	private boolean checkFlag(String sql, String flag){
    		String[] ss = sql.split(" ");
    		for(String s:ss){
    			if(s.equals(flag)){
    				return true;
    			}
    		}
    		return false;
    	}
    	
    	private String getTableName(String sql, String flag){
    		String[] ss = sql.split("\\.");
    		String tName = null;
    		if (ss.length > 1) {
    			String[] strs = ss[1].split(" ");
    			tName = strs[0];
    		} else {
    			String[] strs = sql.split(" ");
    			boolean start = false;
    			for (String s : strs) {
    				if (s.indexOf(flag) >= 0) {
    					start = true;
    					continue;
    				}
    				if (start && !s.isEmpty()) {
    					tName = s;
    					break;
    				}
    			}
    		}
    		tName.replaceAll("`", "").replaceAll(";", "");
    		
    		//del "("[create table person(....]
    		int index = tName.indexOf('(');
    		if(index>0){
    			tName = tName.substring(0, index);
    		}
    		
    		return tName;
    	}
    }
    
      17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271

    上面所涉及到的TableInfo .java如下:

    package or.model;
    
    public class TableInfo {
    
    	private String databaseName;
    	private String tableName;
    	private String fullName;
    	// 省略Getter和Setter
    	
    	@Override
    	public boolean equals(Object o){
    		if(this == o)
    			return true;
    		if(o == null || this.getClass()!=o.getClass())
    			return false;
    		TableInfo tableInfo = (TableInfo)o;
    		if(!this.databaseName.equals(tableInfo.getDatabaseName()))
    			return false;
    		if(!this.tableName.equals(tableInfo.getTableName()))
    			return false;
    		if(!this.fullName.equals(tableInfo.getFullName()))
    			return false;
    		return true;
    	}
    	
    	@Override
    	public int hashCode(){
    		int result = this.tableName.hashCode();
    		result = 31*result+this.databaseName.hashCode();
    		result = 31*result+this.fullName.hashCode();
    		return result;
    	}
    }
    
      17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    接着需要有个地方保存从TABLE_MAP_EVENT中提取到的信息,TableInfoKeeper .java

    package or.keeper;
    
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    import or.MysqlConnection;
    import or.model.ColumnInfo;
    import or.model.TableInfo;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.google.code.or.binlog.impl.event.TableMapEvent;
    
    public class TableInfoKeeper {
    
    	private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);
    	
    	private static Map<Long,TableInfo> tabledIdMap = new ConcurrentHashMap<>();
    	private static Map<String,List<ColumnInfo>> columnsMap = new ConcurrentHashMap<>();
    	
    	static{
    		columnsMap = MysqlConnection.getColumns();
    	}
    	
    	public static void saveTableIdMap(TableMapEvent tme){
    		long tableId = tme.getTableId();
    		tabledIdMap.remove(tableId);
    		
    		TableInfo table = new TableInfo();
    		table.setDatabaseName(tme.getDatabaseName().toString());
    		table.setTableName(tme.getTableName().toString());
    		table.setFullName(tme.getDatabaseName()+"."+tme.getTableName());
    		
    		tabledIdMap.put(tableId, table);
    	}
    	
    	public static synchronized void refreshColumnsMap(){
    		Map<String,List<ColumnInfo>> map = MysqlConnection.getColumns();
    		if(map.size()>0){
    //			logger.warn("refresh and clear cols.");
    			columnsMap = map;
    //			logger.warn("refresh and switch cols:{}",map);
    		}
    		else
    		{
    			logger.error("refresh columnsMap error.");
    		}
    	}
    	
    	public static TableInfo getTableInfo(long tableId){
    		return tabledIdMap.get(tableId);
    	}
    	
    	public static List<ColumnInfo> getColumns(String fullName){
    		return columnsMap.get(fullName);
    	}
    }
    
      17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    正如上面InstanceListener中提到的,有些信息需要直接从MySQL中读取,比如数据库表的列信息,相关的类MysqlConnection如下:

    package or;
    
    import java.sql.Connection;
    import java.sql.DatabaseMetaData;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import or.model.BinlogInfo;
    import or.model.BinlogMasterStatus;
    import or.model.ColumnInfo;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MysqlConnection {
    	private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);
    	
    	private static Connection conn;
    	
    	private static String host;
    	private static int port;
    	private static String user;
    	private static String password;
    	
    	public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg){
    		try {
    			if(conn == null || conn.isClosed()){
    				Class.forName("com.mysql.jdbc.Driver");
    				
    				host = hostArg;
    				port = portArg;
    				user = userArg;
    				password = passwordArg;
    				
    				conn = DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/",user,password);
    				logger.info("connected to mysql:{} : {}",user,password);
    			}
    		} catch (ClassNotFoundException e) {
    			logger.error(e.getMessage(),e);
    		} catch (SQLException e) {
    			logger.error(e.getMessage(),e);
    		}
    	}
    	
    	public static Connection getConnection(){
    		try {
    			if(conn == null || conn.isClosed()){
    				setConnection(host,port,user,password);
    			}
    		} catch (SQLException e) {
    			logger.error(e.getMessage(),e);
    		}
    		return conn;
    	}
    
    	/**
    	 * 获取Column信息
    	 * 
    	 * @return
    	 */
    	public static Map<String,List<ColumnInfo>> getColumns(){
    		Map<String,List<ColumnInfo>> cols = new HashMap<>();
    		Connection conn = getConnection();
    		
    		try {
    			DatabaseMetaData metaData = conn.getMetaData();
    			ResultSet r = metaData.getCatalogs();
    			String tableType[] = {"TABLE"};
    			while(r.next()){
    				String databaseName = r.getString("TABLE_CAT");
    				ResultSet result = metaData.getTables(databaseName, null, null, tableType);
    				while(result.next()){
    					String tableName = result.getString("TABLE_NAME");
    //					System.out.println(result.getInt("TABLE_ID"));
    					String key = databaseName +"."+tableName;
    					ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);
    					cols.put(key, new ArrayList<ColumnInfo>());
    					while(colSet.next()){
    						ColumnInfo columnInfo = new ColumnInfo(colSet.getString("COLUMN_NAME"),colSet.getString("TYPE_NAME"));
    						cols.get(key).add(columnInfo);
    					}
    					
    				}
    			}
    		} catch (SQLException e) { 
    			logger.error(e.getMessage(),e);
    		}	
    		return cols;
    	}
    	
    	/**
    	 * 参考
    	 * mysql> show binary logs
    	 *  +------------------+-----------+
    	 *	| Log_name         | File_size |
    	 *	+------------------+-----------+
    	 *	| mysql-bin.000001 |       126 |
    	 *	| mysql-bin.000002 |       126 |
    	 *	| mysql-bin.000003 |      6819 |
    	 *	| mysql-bin.000004 |      1868 |
    	 *	+------------------+-----------+
    	 */
    	public static List<BinlogInfo> getBinlogInfo(){
    		List<BinlogInfo> binlogList = new ArrayList<>();
    		
    		Connection conn = null;
    		Statement statement = null;
    		ResultSet resultSet = null;
    		
    		try {
    			conn = getConnection();
    			statement = conn.createStatement();
    			resultSet = statement.executeQuery("show binary logs");
    			while(resultSet.next()){
    				BinlogInfo binlogInfo = new BinlogInfo(resultSet.getString("Log_name"),resultSet.getLong("File_size"));
    				binlogList.add(binlogInfo);
    			}
    		} catch (Exception e) {
    			logger.error(e.getMessage(),e);
    		} finally{
    			try {
    				if(resultSet != null)
    					resultSet.close();
    				if(statement != null)
    					statement.close();
    				if(conn != null)
    					conn.close();
    			} catch (SQLException e) {
    				logger.error(e.getMessage(),e);
    			}
    		}
    		
    		return binlogList;
    	}
    
    	/**
    	 * 参考:	
    	 * mysql> show master status;
    	 * 	+------------------+----------+--------------+------------------+
    	 * 	| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB |
    	 * 	+------------------+----------+--------------+------------------+
    	 * 	| mysql-bin.000004 |     1868 |              |                  |
    	 * 	+------------------+----------+--------------+------------------+
    	 * @return
    	 */
    	public static BinlogMasterStatus getBinlogMasterStatus(){
    		BinlogMasterStatus binlogMasterStatus = new BinlogMasterStatus();
    		
    		Connection conn = null;
    		Statement statement = null;
    		ResultSet resultSet = null;
    		
    		try {
    			conn = getConnection();
    			statement = conn.createStatement();
    			resultSet = statement.executeQuery("show master status");
    			while(resultSet.next()){
    				binlogMasterStatus.setBinlogName(resultSet.getString("File"));
    				binlogMasterStatus.setPosition(resultSet.getLong("Position"));
    			}
    		} catch (Exception e) {
    			logger.error(e.getMessage(),e);
    		} finally{
    			try {
    				if(resultSet != null)
    					resultSet.close();
    				if(statement != null)
    					statement.close();
    				if(conn != null)
    					conn.close();
    			} catch (SQLException e) {
    				logger.error(e.getMessage(),e);
    			}
    		}
    		
    		return binlogMasterStatus;
    	}
    
    	/**
    	 * 获取open-replicator所连接的mysql服务器的serverid信息
    	 * @return
    	 */
    	public static int getServerId(){
    		int serverId=6789;
    		Connection conn = null;
    		Statement statement = null;
    		ResultSet resultSet = null;
    		
    		try {
    			conn = getConnection();
    			statement = conn.createStatement();
    			resultSet = statement.executeQuery("show variables like 'server_id'");
    			while(resultSet.next()){
    				serverId = resultSet.getInt("Value");
    			}
    		} catch (Exception e) {
    			logger.error(e.getMessage(),e);
    		} finally{
    			try {
    				if(resultSet != null)
    					resultSet.close();
    				if(statement != null)
    					statement.close();
    				if(conn != null)
    					conn.close();
    			} catch (SQLException e) {
    				logger.error(e.getMessage(),e);
    			}
    		}
    		
    		return serverId;
    	}
    }
    
      17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219

    上面代码设计的附加类(BinlogInfo.java; BinlogMasterStatus.java; ColumnInfo.java)

    package or.model;
    public class BinlogInfo {
    	private String binlogName;
    	private Long fileSize;
    	// 省略Getter和Setter
    }
    
    package or.model;
    public class BinlogMasterStatus {
    	private String binlogName;
    	private long position;
     // 省略Getter和Setter
    }
    
    package or.model;
    public class ColumnInfo {
    	private String name;
    	private String type;
     // 省略Getter和Setter
    }
    
      17
    • 18
    • 19
    • 20

    最后还要有个地方存储解析之后的事件信息,这里简要设计下,采用一个ConcurrentLinkedDeque好了(CDCEventManager.java)

    package or.manager;
    import java.util.concurrent.ConcurrentLinkedDeque;
    import or.CDCEvent;
    public class CDCEventManager {
    	public static final ConcurrentLinkedDeque<CDCEvent> queue = new ConcurrentLinkedDeque<>();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    所有的准备工作都完成了,下面可以解析binlog日志了:

    package or.test;
    
    import java.util.concurrent.TimeUnit;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import or.CDCEvent;
    import or.InstanceListener;
    import or.MysqlConnection;
    import or.OpenReplicatorPlus;
    import or.manager.CDCEventManager;
    import or.model.BinlogMasterStatus;
    
    import com.google.code.or.OpenReplicator;
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import com.google.gson.JsonElement;
    import com.google.gson.JsonParser;
    
    public class OpenReplicatorTest {
    	private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);
    	private static final String host = "xx.xx.xx.60";
    	private static final int port = 3306;
    	private static final String user = "****";
    	private static final String password = "****";
    	
    	public static void main(String[] args){
    		OpenReplicator or = new OpenReplicator ();
    		or.setUser(user);
    		or.setPassword(password);
    		or.setHost(host);
    		or.setPort(port);
    		MysqlConnection.setConnection(host, port, user, password);
    		
    //		or.setServerId(MysqlConnection.getServerId());
    		//配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverId
    		
    		BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();
    		or.setBinlogFileName(bms.getBinlogName());
    //		or.setBinlogFileName("mysql-bin.000004");
    		or.setBinlogPosition(4);
    		or.setBinlogEventListener(new InstanceListener());
    		try {
    			or.start();
    		} catch (Exception e) {
    			logger.error(e.getMessage(),e);
    		}
    		
    		Thread thread = new Thread(new PrintCDCEvent());
    		thread.start();
    	}
    
    	public static class PrintCDCEvent implements Runnable{
    		@Override
    		public void run() {
    			while(true){
    				if(CDCEventManager.queue.isEmpty() == false)
    				{
    					CDCEvent ce = CDCEventManager.queue.pollFirst();
    					Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();
    					String prettyStr1 = gson.toJson(ce);
    					System.out.println(prettyStr1);	
    				}
    				else{
    					try {
    						TimeUnit.SECONDS.sleep(1);
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    				}
    			}
    		}		
    	}
    }
    
      17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    时间运行旧了会遇到这样一个问题:

    16-10-21 10:41:49.365 ERROR[binlog-parser-1 AbstractBinlogParser.run:247] failed to parse binlog
    java.io.EOFException: null
    	at com.google.code.or.io.util.ActiveBufferedInputStream.read(ActiveBufferedInputStream.java:169) ~[open-replicator-1.0.7.jar:na]
    	at com.google.code.or.io.impl.XInputStreamImpl.doFill(XInputStreamImpl.java:236) ~[open-replicator-1.0.7.jar:na]
    	at com.google.code.or.io.impl.XInputStreamImpl.read(XInputStreamImpl.java:213) ~[open-replicator-1.0.7.jar:na]
    	at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:141) ~[open-replicator-1.0.7.jar:na]
    	at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:61) ~[open-replicator-1.0.7.jar:na]
    	at com.google.code.or.binlog.impl.ReplicationBasedBinlogParser.doParse(ReplicationBasedBinlogParser.java:91) ~[open-replicator-1.0.7.jar:na]
    	at com.google.code.or.binlog.impl.AbstractBinlogParser$Task.run(AbstractBinlogParser.java:244) ~[open-replicator-1.0.7.jar:na]
    	at java.lang.Thread.run(Unknown Source) [na:1.7.0_80]
    16-10-21 10:41:49.371 INFO [binlog-parser-1 TransportImpl.disconnect:121] disconnected from xx.xx.xx.60:3306
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    初步解决方案(extends OpenReplicator然后添加重试机制):

    package or;
    
    import java.util.concurrent.TimeUnit;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import com.google.code.or.OpenReplicator;
    
    public class OpenReplicatorPlus extends OpenReplicator{
    	private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorPlus.class);
    	private volatile boolean autoRestart = true;
    	@Override
    	public void stopQuietly(long timeout, TimeUnit unit){
    		super.stopQuietly(timeout, unit);
    		if(autoRestart){
    			try {
    				TimeUnit.SECONDS.sleep(10);
    				logger.error("Restart OpenReplicator");
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
      17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    最后只需将OpenReplicatorTest.java中的OpenReplicator or = new OpenReplicator ();改为OpenReplicator or = new OpenReplicatorPlus ();即可。

    大功告成~~


    参考资料

    1. 谈谈对Canal(增量数据订阅与消费)的理解
    2. MySQL主备复制原理、实现及异常处理
    3. https://github.com/whitesock/open-replicator

    欢迎跳转到本文的原文链接:https://honeypps.com/backend/read-mysql-binlog-by-using-openreplicator/


    欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


    相关技术文章

    点击QQ咨询
    开通会员
    返回顶部
    ×
    微信扫码支付
    微信扫码支付
    确定支付下载
    请使用微信描二维码支付
    ×

    提示信息

    ×

    选择支付方式

    • 微信支付
    • 支付宝付款
    确定支付下载