MySQL数据实时增量同步到Kafka-Flume 写在前⾯的话
  需求,将MySQL⾥的数据实时增量同步到Kafka。接到活⼉的时候,第⼀个想法就是通过读取MySQL的binlog⽇志,将数据写到Kafka。不过对⽐了⼀些⼯具,例如:Canel,Databus,Puma等,这些都是需要部署server和client的。其中server端是由这些⼯具实现,配置了就可以读binlog,⽽client端是需要我们动⼿编写程序的,远没有达到我即插即⽤的期望和懒⼈的标准。
同步的格式
  原作者的插件flume-ng-sql-source只⽀持csv的格式,如果开始同步之后,数据库表需要增减字段,则会给开发者造成很⼤的困扰。所以我添加了⼀个分⽀版本,⽤来将数据以JSON的格式,同步到kafka,字段语义更加清晰。
  将此jar包下载之后,和相应的数据库驱动包,⼀起放到flume的lib⽬录之下即可。
处理机制
flume-ng-sql-source在【status.file.name】⽂件中记录读取数据库表的偏移量,进程重启后,可以接着上次的进度,继续增量读表。
启动说明
说明:启动命令⾥的【YYYYMM=201711】,会传⼊到flume.properties⾥⾯,替换${YYYYMM}
[test@localhost ~]$ YYYYMM=201711 bin/flume-ng agent -c conf -f conf/flume.properties -n sync &
-c:表⽰配置⽂件的⽬录,在此我们配置了flume-env.sh,也在conf⽬录下;
-f:指定配置⽂件,这个配置⽂件必须在全局选项的--conf参数定义的⽬录下,就是说这个配置⽂件要在前⾯配置的conf⽬录下⾯;
-n:表⽰要启动的agent的名称,也就是我们flume.properties配置⽂件⾥⾯,配置项的前缀,这⾥我们配的前缀是【sync】;
flume的配置说明
flume-env.sh
# 配置JVM堆内存和java运⾏参数,配置-DpropertiesImplementation参数是为了在flume.properties配置⽂件中使⽤环境变量
export JAVA_OPTS="-Xms512m -Xmx512m -Dcom.sun.management.jmxremote -DpropertiesImplementation=org.de.EnvVarResolverProperties"
flume.properties
# 数据来源
sync.sources = s-1
# 数据通道
sync.channels = c-1
# 数据去处,这⾥配置了failover,根据下⾯的优先级配置,会先启⽤k-1,k-1挂了后再启⽤k-2
sync.sinks = k-1 k-2
#这个是配置failover的关键,需要有⼀个sink group
sync.sinkgroups = g-1
sync.sinkgroups.g-1.sinks = k-1 k-2
#处理的类型是failover
sync.sinkgroups.pe = failover
#优先级,数字越⼤优先级越⾼,每个sink的优先级必须不相同
sync.sinkgroups.g-1.processor.priority.k-1 = 5
sync.sinkgroups.g-1.processor.priority.k-2 = 10
#设置为10秒,当然可以根据你的实际状况更改成更快或者很慢
sync.sinkgroups.g-1.processor.maxpenalty = 10000
>> 数据通道的定义
# 数据量不⼤,直接放内存。其实还可以放在JDBC,kafka或者磁盘⽂件等
sync.pe = memory
# 通道队列的最⼤长度
sync.channels.c-1.capacity = 100000
# putList和takeList队列的最⼤长度,sink从capacity中抓取batchsize个event,放到这个队列。所以此参数最好⽐capacity⼩,⽐sink的batchsize⼤。
# 官⽅定义:The maximum number of events the channel will take from a source or give to a sink per transaction.
sync.ansactionCapacity = 1000
sync.channels.c-1.byteCapacityBufferPercentage = 20
### 默认值的默认值等于JVM可⽤的最⼤内存的80%,可以不配置
# sync.channels.c-1.byteCapacity = 800000
>####sql source>>>##
# source s-1⽤到的通道,和sink的通道要保持⼀致,否则就GG了
sync.sources.s-1.channels=c-1
>#### For each one of the sources, the type is defined
sync.pe = org.keedio.flume.source.SQLSource
sync.sources.tion.url = jdbc:mysql://192.168.1.10/testdb?useSSL=false
>#### Hibernate Database connection properties
sync.sources.tion.user = test
sync.sources.tion.password = 123456
sync.sources.tion.autocommit = true
sync.sources.s-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
sync.sources.tion.driver_class = sql.jdbc.Driver
sync.sources.s-1.run.query.delay=10000
sync.sources.s-1.status.file.path = /home/test/apache-flume-1.8.0-bin/status
# ⽤上${YYYYMM}环境变量,是因为我⽤的测试表⽰⼀个⽉表,每个⽉的数据会放到相应的表⾥。使⽤⽅式见上⾯的启动说明
sync.sources.s-1.status.file.name = test_${YYYYMM}.status
>### Custom query
sync.sources.s-1.start.from = 0
sync.sources.s-1.custom.query = select * from t_test_${YYYYMM} where id > $@$ order by id asc
sync.sources.s-1.batch.size = 100
sync.sources.ws = 100
sync.sources.tion.provider_class = tion.C3P0ConnectionProvider
sync.sources.s-1.hibernate.c3p0.min_size=5
sync.sources.s-1.hibernate.c3p0.max_size=20
>#### sinks 1
# sink k-1⽤到的通道,和source的通道要保持⼀致,否则取不到数据
sync.sinks.k-1.channel = c-1
sync.pe = org.apache.flume.sink.kafka.KafkaSink
sync.sinks.pic = sync-test
sync.sinks.k-1.kafka.bootstrap.servers = localhost:9092
sync.sinks.k-1.kafka.producer.acks = 1
# 每批次处理的event数量
sync.sinks.k-1.kafka.flumeBatchSize  = 100
>#### sinks 2
# sink k-2⽤到的通道,和source的通道要保持⼀致,否则取不到数据
sync.sinks.k-2.channel = c-1
sync.pe = org.apache.flume.sink.kafka.KafkaSink
sync.sinks.pic = sync-test
sync.sinks.k-2.kafka.bootstrap.servers = localhost:9092
sync.sinks.k-2.kafka.producer.acks = 1
sync.sinks.k-2.kafka.flumeBatchSize  = 100
flume各部分参数含义
batchData的⼤⼩见参数:batchSize
PutList和TakeList的⼤⼩见参数:transactionCapactiy
Channel总容量⼤⼩见参数:capacity
问题记录
异常:Exception in thread "PollableSourceRunner-SQLSource-src-1" java.lang.AbstractMethodError:
org.keedio.flume.MaxBackOffSleepInterval()J
分析:由于我⽤的是flume1.8,⽽flume-ng-sql-1.4.3插件对应的flume-ng-core版本是1.5.2,1.8版本⾥的PollableSource接⼝多了两个⽅法getBackOffSleepIncrement(); getMaxBackOffSleepInterval();在失败补偿暂停线程处理时,需要⽤到这个⽅法。
解决⽅法:更新flume-ng-sql-1.4.3⾥依赖的flume-ng-core版本为1.8.0,并在源代码【SQLSource.java】⾥添加这两个⽅法即可。
@Override
public long getBackOffSleepIncrement() {
return 1000;
}
@Override
public long getMaxBackOffSleepInterval() {
kafka命令
return 5000;
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。

发表评论