⿊马头条推荐项⽬知识点总结(⼀)
实际⽣产环境中,我们要处理的数据来⾃可能各个地⽅,业务数据库,爬⾍数据库,⽇志⽂件,api⽹关买⼊数据等。
本次⿊马头条推荐项⽬中,业务数据存储在mysql中,⽤户⾏为数据存储在⽇志中,因此采⽤两种技术⼿段将业务数据和⽇志数据传输到Hadoop中。
⼀、迁移数据库
业务数据存储在mysql中,为了避免直接操作业务数据,利⽤sqoop导⼊到hive表中(底层数据就是存储在HDFS上)
业务数据不是⼀次导⼊就结束,每天都会产⽣很多新的业务数据,因此这⾥就涉及到利⽤sqoop导⼊数据到hive的⽅式。
Sqoop(发⾳:skup)是⼀款开源的⼯具,主要⽤于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进⾏数据的传递,可以将⼀个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
【Sqoop⽀持两种⽅式的数据导⼊,即全量数据导⼊和增量数据导⼊】。
1、sqoop导数据的⽅式⼀:全量数据导⼊
(1)概念:如同名字那样,全量数据导⼊就是⼀次性将所有需要的数据,从关系型数据库⼀次性导⼊到Hadoop⽣态中(可以是
HDFS,Hive,Hbase等)。
(2)适⽤场景:⼀次性离线分析场景
(3)代码实现:⽤sqoop import命令,具体如下:
# 全量数据导⼊
Sqoop import
1.    --connect jdbc:mysql://19
2.168.15.111/toutiao \
2.    --username root \
3.    --password 123456 \
4.    --table $table_name \
5.    --query ‘select * from $table_name where $conditions ’ \
6.    --m 5 \
7.    --hive-import \
8.    --hive-home /usr/local/hive \
9.    --hive -drop-import-delims \
10.    --target-dir /user/hive/warehouse \
11.    --create-hive-table \
12.    --hive-table toutiao.$table_name \
2、Sqoop导数据的⽅式⼆:增量数据导⼊
(1)使⽤场景:实际⽣产环境中,不断有业务相关的数据产⽣到关系型数据库,系统需要定期从数据
库向hadoop导⼊数据,导⼊数仓后,继续进⾏离线分析。我们不可能将所有的数据重新导⼀遍,此时就需要⽤sqoop的增量数据导⼊模式。
mysql下载下来是一个文件夹增量数据导⼊分两种,⼀是基于递增列的增量数据导⼊(Append),⼆是基于时间序列的增量数据导⼊(LastModified)
2.1基于递增列的增量数据导⼊(Append)
举个栗⼦,有⼀个订单表,⾥⾯每个订单有⼀个唯⼀标识⾃增列ID,在关系型数据库中以主键形式存在。之前已经将id在0~5201314之间的编号的订单导⼊到Hadoop中了(这⾥为HDFS),⼀段时间后我们需要将近期产⽣的新的订单数据导⼊Hadoop中(这⾥为HDFS),以供后续数仓进⾏分析。此时我们只需要指定–incremental 参数为append,–last-value参数为5201314即可,表⽰只从id⼤于5201314后开始导⼊。
1. sqoop import \
2.    --connect jdbc:mysql://192.168.15.111:3306/test \  #连接到指定数据库
3.    --username root \
4.    --password 123456 \
5.    --table order_table \    # 指定数据库的指定表
6.    --target-dir /user/mysql_to_hdfs  \
7.    --m 3  \
8.    –-hive import \
9.    –-incremental append \        # 指明模式
10.    –-check-column order_id        # 指明⽤于增量导⼊的参考列
11.    –-last-value 5201314 \    # 指定参考列上次导⼊的最⼤值
参数说明
–incremental append基于递增列的增量导⼊(将递增列值⼤于阈值的所有数据增量导⼊Hadoop)–check-column递增列(int)
–last-value阈值(int)
2.2基于时间序列的增量数据导⼊(LastModified)
此⽅式要求原有表中有time字段,它能指定⼀个时间戳,让Sqoop把该时间戳之后的数据导⼊⾄Hadoop(这⾥为HDFS)。⽐如我的头条业务数据库中,某篇⽂章的点赞数增加或减少了,变成了⼀个新的数据,在我指定的时间戳后⾯作为新的业务数据导⼊到了Hadoop(这⾥指HDFS),我们可以指定给merge-key参数,例如是article_id,表⽰将后续的新的记录与原有的记录合并。
代码:
sqoop import \
1.    --connect jdbc:mysql://19
2.168.15.111/toutiao \
2.    --username root \
3.    --password password \
4.    --table article_basic \  # 指定数据表导⼊Hadoop
5.    --m 4 \
6.    --target-dir /user/hive/warehouse/toutiao.db/article_basic \
7.    --incremental lastmodified \
8.    --check-column update_time \
9.    --merge-key article_id \
10.  --last-value '2012-02-01 11:0:00'
重要参数
参数说明
–incremental lastmodified基于时间列的增量导⼊(将时间列⼤于等于阈值的所有数据增量导⼊Hadoop)–check-column时间列(int)
–last-value阈值(int)
–merge-key合并列(主键,合并键值相同的记录)
这⾥注意-incremental lastmodified 模式不⽀持⽤sqoop直接导⼊到hive中,需要先导⼊到hdfs,然后建⽴hive表关联。
那怎么才能实现mysql数据迁移到hive上呢?
Sqoop将mysql数据导⼊到HDFS上,指定位置:
–target-dir /user/hive/warehouse/toutiao.db/
然后进⼊hive交互界⾯,在toutiao.db数据库中建表,表名和传上来的数据⽂件⼀样,hive就能⾃动将数据映射到hive表中。
【导⼊的过程总有⼀些坑,这⾥提供⼀些避坑指南】
1、注意:sqoop将数据导出的hdfs分⽚数据,默认⽤‘ ,’分割,⽽hive默认的分隔符是’ 001’。所以在hive中创建表的时候要指定分隔符。
2、原mysql中某些字段存在特定字符,如,、\t \n 都会导致导⼊到hadoop被hive读取失败,解
析时会认为是另⼀条数据,或者多⼀个字段。
/
//解决办法///:
导⼊时,加⼊query参数,选择特定字段,过滤相应内容,使⽤replace,char替换字符
3、mysql数据库⾥⾯字段是tinyint类型,通过sqool导⼊到hdfs,hive建表映射数据后,该字段却显⽰True,False,这是因为jdbc会把tinyint认为是java.sql.Types.BIT,然后hive就转为Boolean类型了。
///解决办法///:
在connect中加⼊⼀句话就可以了 ?tinyInt1isBit t=false 就⾏了,例如:
–connect jdbc:mysql://192.168.15.111/toutiao?tinyInt1isBit=false
关于脚本的执⾏
因为业务数据每天都会产⽣,因此每天都要导⼊数据到HDFS,根据各个公司和实际⽣产要求,可能是每天定点导⼀次数据(这个还可以认为定时导数据),也可能是每天多个时间定点导数据,这样就不适合⼈为导,应该利⽤程序⾃动导数据,这⾥我们利⽤⼀个Linux命令:crontab,设定时间⾃动导数据。
⾸先简要介绍⼀下crontab命令
Linux Crontab
Linux crontab是⽤来定期执⾏程序的命令
当安装完成操作系统之后,默认便会启动此任务调度命令
crond 命令每分钟会定期检查是否有要执⾏的⼯作,如果有要执⾏的⼯作便会⾃动执⾏该⼯作。
注意:
新创建的 cron 任务,不会马上执⾏,⾄少要过 2 分钟后才可以,当然你可以重启 cron 来马上执⾏。⽽ linux 任务调度的⼯作主要分为以下两类:
1、系统周期性所要执⾏的⼯作,⽐如写缓存数据到硬盘、⽇志清理等。在/etc⽬录下有⼀个crontab⽂件,这个就是系统任务调度的配置⽂件。
2、个⼈执⾏的⼯作:某个⽤户定期要做的⼯作,例如每隔10分钟检查邮件服务器是否有新信,这些⼯作可由每个⽤户⾃⾏设置。
cron通过 /etc/cron.allow 和 /etc/cron.deny ⽂件来限制某些⽤户是否可以使⽤ crontab 命令,
(1)当系统中有 /etc/cron.allow ⽂件时,只有写⼊此⽂件的⽤户可以使⽤ crontab 命令,没有写⼊的⽤户不能使⽤ crontab 命令。同样,如果有此⽂件,/etc/cron.deny ⽂件会被忽略,因为 /etc/cron.allow ⽂件的优先级更⾼。
(2)当系统中只有 /etc/cron.deny ⽂件时,写⼊此⽂件的⽤户不能使⽤ crontab 命令,没有写⼊⽂件的⽤户可以使⽤ crontab 命令。(3)crontab⽂件都位于/var/spool/cron/⽬录中
crontab使⽤⽅法
第⼀步:创建crontab⽂件
[root@localhost !]$ crontab -e
显⽰结果
#进⼊ crontab 编辑界⾯。会打开Vim编辑你的任务
*****执⾏的任务
执⾏这个命令的时候,打开的是⼀个空⽂件,操作⽅法和vim⼀样,⽂件⾥⾯填写需要执⾏的任务。
第⼆步:在⽂件中写下需要执⾏的任务
*****执⾏的任务
这⾥分两部分来学习,第⼀部分的五个*,表⽰设定的时间,第⼆部分表⽰要执⾏的commad或者某⽬录中的脚本。
【Part 1】* * * * *(f1 f2 f3 f4 f5)
f1 是表⽰分钟;为 * 时表⽰每分钟都要执⾏ program;为 */n 表⽰每 n 分钟个时间间隔执⾏⼀次;当 f1 为 a-b 时表⽰从第 a 分钟到第 b 分钟这段时间内要执⾏。
f2 表⽰⼩时;为 * 时表⽰每⼩时都要执⾏ program;为 */n 表⽰每 n ⼩时个时间间隔执⾏⼀次 ;f2 为 a-b 时表⽰从第 a 到第 b ⼩时都要执⾏
f3 表⽰⼀个⽉份中的第⼏⽇,program 表⽰要执⾏的程序;其他情况同上年的f1,f2
f4 表⽰⼏⽉份,program 表⽰要执⾏的程序;其他情况同上年的f1,f2
f5 表⽰⼀个星期中的第⼏天,program 表⽰要执⾏的程序;其他情况同上年的f1,f2
【Part 2】command 需要执⾏的命令
这⾥的command可以是⼀个简单的指令,例如启动某功能,也可以是指定⽬录下的脚本。
【使⽤⽰例】
实例1:每1分钟执⾏⼀次重启smb
命令:* * * * * /etc/init.d/smb restart
实例2:每两⼩时重启smb
命令:* */2 * * * /etc/init.d/smb restart
实例3:晚上11点到早上7点之间,每隔⼀⼩时重启smb
命令:* 23-7/1 * * * /etc/init.d/smb restart
实例4:隔半⼩时从mysql增量导⼊⼀次数据到hadoop
命令:* */0.5 * * * /root/toutiao_project/scripts/import_incremental.sh
—补充—
⼆、Flume收集⽤户⽇志信息
1、Flume概述
Flume 是分布式的海量⽇志采集、聚合和传输的系统。Flume 基于流式架构(指⼀条⼀条采集数据),灵活简单
流式架构处理数据的单位很⼩,来⼀条数据处理⼀条。所以MapReduce不是流式架构(是⼀个⽂件),spark也不是流式架构流式架构和实时在线处理没有必然联系,但是流式架构处理实时数据具有优势。⽐如spark不是流式架构,但是它的spark streaming 也可以处理实时数据。
2、Flume作⽤
Flume最主要的作⽤就是,实时读取服务器本地磁盘的数据,将数据写⼊到HDFS。
说⽩了就是⼀个采集数据的⼯具,使⽤简单,配置⽂件即可。
可以从⽂件,⽂件夹,http协议等地⽅采集
(⽬录:只要⽬录中的⽂件有变化,就会进⾏采集该数据
Shell命令:执⾏⼀条shell命令,shell命令的输出会既然sources)
** 为什么要⽤flume?** 优点如下:
1. flume可以和任意存储进程集成
2. 数据的数据速率⼤于写⼊⽬的存储的速率,flume会进⾏缓冲,减⼩hdfs的压⼒(怎么理解:源数据来的忽快忽慢,⽇志信息可能在
⽤户活跃期间瞬间有5G⽇志信息,在夜间等⼏乎没有⽇志信息,但是因为flume的缓冲能⼒,到达hdfs的速度相对平稳⼀些。)
3、Flume运⾏机制
flume运⾏的最⼩单元,独⽴运⾏在⼀个JVM中。⼀个agent⾥⾯包括⼀个或多个sources,channels,sinks,每个agent内部有三个组件。1)souce:数据采集组件,对接我们的源数据
2)channel:传输通道组件,通俗叫管道,数据的缓冲区,连接source和sink,将source和sink进⾏打通
3)sink:下沉组件,⽤于向下⼀级agent传递数据或者往最终的存储系统传递数据。
4、Flume安装
1.下载
Wget mirrors.tuna.tsinghua.edu/apache/flume/1.9.0/apache-flume-1.9.
2.解压
tar -zxvf apache-flume-1.9. -C /usr/local
3.重命名
mv apache-flume-1.9.0-bin.tar  flume
4.配置环境变量
vi /etc/profile
添加以下信息
export FLUME_HOME=/usr/local/flume
export PATH=$PATH:$FLUME_HOME/bin

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。