C#使⽤Canal监控Mysql
⼀、canal安装与配置
1、电脑中⾸先需要下载安装canal,可以去上⾯下载(更多版本选择),也可以从下⾯的地址进⾏提取
canal⽀持多种语⾔使⽤。
2、下载完成,将它解压,如:
3、修改D:\Tools\alibaba\canal\conf\example下的instance.properties(刚解压出的example⽂件夹中,可以将除了instance.properties⽂件以外的⽂件全部删除)修改instance.properties⽂件的相关mysql数据库配置
4、完成配置以后,进⼊bin⽂件夹,windows双击startup.bat⽂件(注意,canal需要java运⾏环境,如果电脑没有java环境的,可以只配置⼀个jre)
如上图显⽰,canal就正常运⾏了。接下来就只需要开启mysql的binlog⽇志,在⾃⼰的程序中使⽤canal即可
⼆、对 mysql的操作
1、查看binlog是否启⽤,在mysql中执⾏show variables like '%log_bin%';
2、电脑,右键管理,打开服务,到mysql的my.ini配置⽂件位置,添加
server_id=1918
log_bin = mysql-bin
#binlog_format="ROW"
binlog_format="MIXED" #开启MIXED模式
#binlog_format="STATEMENT"
3、重启mysql服务就可以启⽤binlog。开启MIXED模式才可看见执⾏的语句
------三种模式详细解释:
互联⽹公司使⽤MySQL的功能较少(不⽤存储过程、触发器、函数),选择默认的Statement level
⽤到MySQL的特殊功能(存储过程、触发器、函数)则选择Mixed模式
⽤到MySQL的特殊功能(存储过程、触发器、函数),⼜希望数据最⼤化⼀直则选择Row模式
完成对mysql的配置,可以在mysql的安装⽬录下的data⽂件夹中看到000001⽂件,就是binlog⽇志⽂件。如:
三、C#代码
上⾯也有详细的说明。
1、⾸先需要⽤到canal的包,去nuget下载CanalSharp.Client
2、在代码中添加引⽤
using CanalSharp.Client.Impl;
//canal 配置的destination,默认为example
mysql下载下来没安装包var destination = "example";
//创建⼀个简单CanalClient连接对象(此对象不⽀持集)传⼊参数分别为canal地址、端⼝、destination、⽤户名、密码
var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destination, "", "");//这⾥可以就这样
//连接Canal
connector.Connect();
//订阅,同时传⼊Filter,如果不传则以Canal的Filter为准。Filter是⼀种过滤规则,通过该规则的表数据变更才会传递过来
//允许所有数据 .*\\..*
//允许某个库数据库名\\..*
//允许某些表库名.表名,库名.表名"
connector.Subscribe(".*\\..*");
while(true)
{
//获取数据1024表⽰数据⼤⼩单位为字节
var message = connector.Get(1024);
//批次id 可⽤于回滚
var batchId = message.Id;
if(batchId == -1|| message.Entries.Count <= 0)
{
Thread.Sleep(300);
continue;
}
PrintEntrys(message.Entries);
}
/
//<summary>
///输出数据
///</summary>
///<param name="entrys">⼀个entry表⽰⼀个数据库变更</param>
private void PrintEntrys(List<Entry> entrys)
{
foreach(var entry in entrys)
{
if(entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
{
continue;
}
RowChange rowChange = null;
try
{
//获取⾏变更
rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
}
catch(Exception ex)
{
logMan.Error("mysql获取⾏变更错误:"+ ex.Message, ex);
}
if(rowChange != null)
{
//变更类型insert/update/delete 等等
EventType eventType = rowChange.EventType;
//输出insert/update/delete 变更类型列数据
foreach(var rowData in rowChange.RowDatas)
{
if(eventType == EventType.Delete)
{
PrintColumn(rowData.BeforeColumns.ToList(), entry.Header.TableName);
}
else if(eventType == EventType.Insert)//添加
{
PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName);
}
else
{
if(entry.Header.TableName == "tablename1")//这⾥可以过滤⾃⼰想监测的表名
PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName, rowData.BeforeColumns.ToList());
}
}
}
}
}
private void PrintColumn(List<Column> columns, string tbName, List<Column> oldColumns = null)
{
//修改的值,以及修改列的id值
string newNum = "", userid = "";
foreach(var column in columns)
{
if(column.Updated)
{
//输出列列值是否变更也可以拼出执⾏的语句
Console.WriteLine($"{column.Name} :{column.Value} update= {column.Updated}");
}
}
}
附上⼀个完整的控制台应⽤。完成⼀、⼆中对mysql,canal的配置后,可以直接运⾏下列的控制台进⾏测试using CanalSharp.Client.Impl;
using Com.Alibaba.Otter.Canal.Protocol;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
//canal 配置的destination,默认为example
var destination = "example";
//创建⼀个简单CanalClient 连接对象(此对象不⽀持集)传⼊参数分别为canal 地址、端⼝、destination、⽤户名、密码
var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destination, "", "");
//连接Canal
connector.Connect();
//订阅,同时传⼊Filter。Filter是⼀种过滤规则,通过该规则的表数据变更才会传递过来
//允许所有数据 .*\\..*
//允许某个库数据库名\\..*
//允许某些表库名.表名,库名.表名
connector.Subscribe(".*\\..*");
Console.WriteLine("监控创建成功,开始监控。");
while(true)
{
//获取数据1024表⽰数据⼤⼩单位为字节
var message = connector.Get(1024);
//批次id 可⽤于回滚
var batchId = message.Id;
if(batchId == -1|| message.Entries.Count <= 0)
{
Thread.Sleep(300);
continue;
}
PrintEntry(message.Entries);
}
}
///<summary>
///输出数据
/
//</summary>
///<param name="entrys">⼀个entry表⽰⼀个数据库变更</param>
private static void PrintEntry(List<Entry> entrys)
{
Console.WriteLine("监控到改动,进⼊监控输出。");
foreach(var entry in entrys)
{
if(entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
{
continue;
}
RowChange rowChange = null;
try
{
//获取⾏变更
rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
Console.WriteLine("⾏变更。");
}
catch(Exception ex)
{
//_logger.LogError(ex.ToString());
}
if(rowChange != null)
{
//变更类型insert/update/delete 等等
EventType eventType = rowChange.EventType;
//输出binlog信息表名数据库名变更类型
//_logger.LogInformation(
//$"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");
Console.WriteLine("改变的数据库名:"+ entry.Header.SchemaName);
//输出insert/update/delete 变更类型列数据
foreach(var rowData in rowChange.RowDatas)
{
if(eventType == EventType.Delete)
{
PrintColumn(rowData.BeforeColumns.ToList(), entry.Header.TableName);
}
else if(eventType == EventType.Insert)
{
PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName);
}
else
{
PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName);
}
}
}
}
}
///<summary>
///输出每个列的详细数据
///</summary>
///<param name="columns"></param>
private static void PrintColumn(List<Column> columns, string tbName)
{
Console.WriteLine("有改动");
foreach(var column in columns)
{
//if (column.Name == lookName) userid = column.Value;
//Console.WriteLine($"{column.Name} :{column.Value} update= {column.Updated}");
if(column.Updated)
{
//输出列明列值是否变更
Console.WriteLine($"修改的表名:{tbName},列名:{column.Name} :{column.Value} update= {column.Updated}");
//修改以后的值column.Name=
//if (column.Name == alterName) newNum = column.Value;
//if (column.Name == lookName) userid = column.Value;
//Console.WriteLine($"update {tbName} set {alterName}={column.Value} where {lookName}={userid}");
}
}
}
}
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论