首页   

​ 做两个服务器的数据库实时同步

​ cannal,是阿里的一个mysql增量订阅&消费工具: https://github.com/alibaba/canal

cannal分为服务端和客户端:

服务端 可以理解为一个mysql服务端(即高可用架构中的从节点),为了让canal服务端生效,我们需要进行一些简单的配置, 让canal服务端向真正的mysql服务端发送获取binlog请求,并且将binlog解析以后存在本地的数据结构中

客户端 可以理解为某种意义上的数据库客户端,通过一些简单的编码,我们可以获取存在canal服务端的已被解析的binlog数据 (增量数据),获取数据以后,即可进行定制化的处理

2 服务端安装

官方文档: https://github.com/alibaba/canal/wiki/QuickStart

01 mysql准备

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

vi /etc/my.cnf

直接在这个位置加上下边三行

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限
  • 1 重启mysql

    sudo etc/init.d/mysql restart
    

    2 用mysql的root用户登陆mysql,查看 log_bin 变量

    mysql -u root -p
    show variables like 'log_bin';
    

    如果是on,表示该功能已开启

    3 在mysql添加以下用户和权限

    CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
    GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
    

    02 cannal配置

  • 下载cannal
  • wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz
    

    或者下载安装包:

    地址:https://github.com/alibaba/canal/releases

    ![image-20200904171734024](/Users/tianzhh/Library/Application Support/typora-user-images/image-20200904171734024.png)

    mkdir /tmp/canal
    tar zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz  -C /tmp/canal
    
    vi conf/example/instance.properties
    
    #position info,需要改成自己的数据库信息
    canal.instance.master.address = 127.0.0.1:3306 
    #username/password,需要改成自己的数据库信息
    canal.instance.dbUsername = canal  
    canal.instance.dbPassword = canal
    # 你要同步的数据库
    canal.instance.defaultDatabaseName =
    canal.instance.connectionCharset = UTF-8
    #table regex
    canal.instance.filter.regex = .\*\\\\..\*
    
  • 启动 停止
  • sh bin/startup.sh
    sh bin/stop.sh
    

    3 客户端

    github主页:https://github.com/haozi3156666/canal-python

    自己python开发客户端详解

    pip3 install canal-python
    pip3 install protobuf
    
    import time
    from canal.client import Client
    from canal.protocol import EntryProtocol_pb2
    from canal.protocol import CanalProtocol_pb2
    client = Client()
    client.connect(host='127.0.0.1', port=11111)
    client.check_valid(username=b'', password=b'')
    client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*')
    while True:
        message = client.get(100)
        entries = message['entries']
        for entry in entries:
            entry_type = entry.entryType
            if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
                continue
            row_change = EntryProtocol_pb2.RowChange()
            row_change.MergeFromString(entry.storeValue)
            event_type = row_change.eventType
            header = entry.header
            database = header.schemaName
            table = header.tableName
            event_type = header.eventType
            for row in row_change.rowDatas:
                format_data = dict()
                if event_type == EntryProtocol_pb2.EventType.DELETE:
                    for column in row.beforeColumns:
                        format_data = {
                            column.name: column.value
                elif event_type == EntryProtocol_pb2.EventType.INSERT:
                    for column in row.afterColumns:
                        format_data = {
                            column.name: column.value
                else:
                    format_data['before'] = format_data['after'] = dict()
                    for column in row.beforeColumns:
                        format_data['before'][column.name] = column.value
                    for column in row.afterColumns:
                        format_data['after'][column.name] = column.value
                data = dict(
                    db=database,
                    table=table,
                    event_type=event_type,
                    data=format_data,
                print(data)
        time.sleep(1)
    client.disconnect()
    
    © 2022 微搜