Canal记录mysql的binlog日志监听


缘起

公司有一个app客户端,在调取某些数据时,需要从不同的项目中获得数据,并处理这些数据,导致显示到客户端非常慢,现需要对这个问题进行优化。数据库大都是mysql服务器,所以选用了适合它的开源框架canal来处理数据。

在使用canal监听mysql的二进制日志的过程中,发现可用性还是比较强的,优点如下
  • 能够针对不同的表进行监听(订阅)
  • Canal客户端出现异常时,服务器可以将产品记录,重启客户端时可以将未消费的数据继续消费,做到不丢失数据(服务端记录未消费的产品)
  • Canal服务端出现异常,导致客户端无法消费数据时, 重启Canal服务器可以从出现异常时产生的数据,也可以做到不丢失数据。(服务端可以从上次无法生产的地方继续生产数据)

客户端配置如下:

SimpleCanalClientTest.java
package com.alibaba.otter.canal.example;
import java.net.InetSocketAddress;
import org.apache.commons.lang.exception.ExceptionUtils;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
/**
 * 单机模式的测试例子
 *
 * @author jianghang 2013-4-15 下午04:19:20
 * @version 1.0.4
 */
public class SimpleCanalClientTest extends AbstractCanalClientTest {
    public SimpleCanalClientTest(String destination){
        super(destination);
    }
    public static void main(String args[]) {
        System.out.println(System.getProperty(“java.class.path”));//系统的classpaht路径
        System.out.println(System.getProperty(“user.dir”));//用户的当前路径
        // 根据ip,直接创建链接,无HA的功能
        String destination = “example”;
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(“10.100.200.104″,//AddressUtils.getHostIp(),
            11111), destination,”sys_canal” , “sys_canal”);
        final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
        clientTest.setConnector(connector);
        clientTest.start();
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                try {
                    logger.info(“## stop the canal client”);
                    clientTest.stop();
                } catch (Throwable e) {
                    logger.warn(“##something goes wrong when stopping canal:\n{}”, ExceptionUtils.getFullStackTrace(e));
                } finally {
                    logger.info(“## canal client is down.”);
                }
            }
        });
    }
}
AbstractCanalClientTest.java
package com.alibaba.otter.canal.example;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
/**
 * 测试基类
 *
 * @author jianghang 2013-4-15 下午04:17:12
 * @version 1.0.4
 */
public class AbstractCanalClientTest {
    protected final static Logger             logger             = LoggerFactory.getLogger(AbstractCanalClientTest.class);
    protected static final String             SEP                = SystemUtils.LINE_SEPARATOR;
    protected static final String             DATE_FORMAT        = “yyyy-MM-dd HH:mm:ss”;
    protected volatile boolean                running            = false;
    protected Thread.UncaughtExceptionHandler handler            = new Thread.UncaughtExceptionHandler() {
                                                                     public void uncaughtException(Thread t, Throwable e) {
                                                                         logger.error(“parse events has an error”, e);
                                                                     }
                                                                 };
    protected Thread                          thread             = null;
    protected CanalConnector                  connector;
    protected static String                   context_format     = null;
    protected static String                   row_format         = null;
    protected static String                   transaction_format = null;
    protected String                          destination;
    static {
        context_format = SEP + “****************************************************” + SEP;
        context_format += “* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}” + SEP;
        context_format += “* Start : [{}] ” + SEP;
        context_format += “* End : [{}] ” + SEP;
        context_format += “****************************************************” + SEP;
        row_format = SEP
                     + “—————-> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms”
                     + SEP;
        transaction_format = SEP + “================> binlog[{}:{}] , executeTime : {} , delay : {}ms” + SEP;
    }
    public AbstractCanalClientTest(String destination){
        this(destination, null);
    }
    public AbstractCanalClientTest(String destination, CanalConnector connector){
        this.destination = destination;
        this.connector = connector;
    }
    protected void start() {
        Assert.notNull(connector, “connector is null”);
        thread = new Thread(new Runnable() {
            public void run() {
                process();
            }
        });
        thread.setUncaughtExceptionHandler(handler);
        thread.start();
        running = true;
    }
    protected void stop() {
        if (!running) {
            return;
        }
        running = false;
        if (thread != null) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                // ignore
            }
        }
        MDC.remove(“destination”);
    }
    protected void process() {
        int batchSize = 5 * 1024;
        while (running) {
            try {
                MDC.put(“destination”, destination);
                connector.connect();
                connector.subscribe(“gq_p2pget_fy.t_sys_org”);
                while (running) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
//                         try {
//                         Thread.sleep(1000);
//                         } catch (InterruptedException e) {
//                         }
                    } else {
                        System.out.println(message.toString());
                        printSummary(message, batchId, size);
                        printEntry(message.getEntries());
                    }
                    //System.out.println(message.toString());
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
            } catch (Exception e) {
                logger.error(“process error!”, e);
            } finally {
                connector.disconnect();
                MDC.remove(“destination”);
            }
        }
    }
    private void printSummary(Message message, long batchId, int size) {
        long memsize = 0;
        for (Entry entry : message.getEntries()) {
            memsize += entry.getHeader().getEventLength();
        }
        String startPosition = null;
        String endPosition = null;
        if (!CollectionUtils.isEmpty(message.getEntries())) {
            startPosition = buildPositionForDump(message.getEntries().get(0));
            endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() – 1));
        }
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        logger.info(context_format, new Object[] { batchId, size, memsize, format.format(new Date()), startPosition,
                endPosition });
    }
    protected String buildPositionForDump(Entry entry) {
        long time = entry.getHeader().getExecuteTime();
        Date date = new Date(time);
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        return entry.getHeader().getLogfileName() + “:” + entry.getHeader().getLogfileOffset() + “:”
               + entry.getHeader().getExecuteTime() + “(” + format.format(date) + “)”;
    }
    protected void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            long executeTime = entry.getHeader().getExecuteTime();
            long delayTime = new Date().getTime() – executeTime;
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
                    TransactionBegin begin = null;
                    try {
                        begin = TransactionBegin.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException(“parse event has an error , data:” + entry.toString(), e);
                    }
                    // 打印事务头信息,执行的线程id,事务耗时
                    logger.info(transaction_format,
                        new Object[] { entry.getHeader().getLogfileName(),
                                String.valueOf(entry.getHeader().getLogfileOffset()),
                                String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
                    logger.info(” BEGIN —-> Thread id: {}”, begin.getThreadId());
                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    TransactionEnd end = null;
                    try {
                        end = TransactionEnd.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException(“parse event has an error , data:” + entry.toString(), e);
                    }
                    // 打印事务提交信息,事务id
                    logger.info(“—————-\n”);
                    logger.info(” END —-> transaction id: {}”, end.getTransactionId());
                    logger.info(transaction_format,
                        new Object[] { entry.getHeader().getLogfileName(),
                                String.valueOf(entry.getHeader().getLogfileOffset()),
                                String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
                }
                continue;
            }
            if (entry.getEntryType() == EntryType.ROWDATA) {
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException(“parse event has an error , data:” + entry.toString(), e);
                }
                EventType eventType = rowChage.getEventType();
                logger.info(row_format,
                    new Object[] { entry.getHeader().getLogfileName(),
                            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                            entry.getHeader().getTableName(), eventType,
                            String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
                if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
                    logger.info(” sql —-> ” + rowChage.getSql() + SEP);
                    continue;
                }
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    }
    protected void printColumn(List<Column> columns) {
        for (Column column : columns) {
            StringBuilder builder = new StringBuilder();
            builder.append(column.getName() + ” : ” + column.getValue());
            builder.append(”    type=” + column.getMysqlType());
            if (column.getUpdated()) {
                builder.append(”    update=” + column.getUpdated());
            }
            builder.append(SEP);
            logger.info(builder.toString());
        }
    }
    public void setConnector(CanalConnector connector) {
        this.connector = connector;
    }
}

服务端配置

canal/conf/example/instance.properties配置如下
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234
# position info
canal.instance.master.address = 10.100.200.104:3308
canal.instance.master.journal.name = mysqlmaster-bin.000021
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password
canal.instance.dbUsername = sys_canal
canal.instance.dbPassword = sys_canal
canal.instance.defaultDatabaseName = gq_p2pget_fy
canal.instance.connectionCharset = UTF-8
# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =
#################################################

Reference

https://github.com/alibaba/otter(仅需2个项目,服务端项目:deployer,客户端项目:example)

分享到:

发表评论

昵称

抢个沙发呗~~~