⼤数据项⽬实战之新闻话题的实时统计分析
摘要: 本⽂讲解⼀个完整的企业级⼤数据项⽬实战,实时|离线统计分析⽤户的搜索话题,并⽤酷炫的前端界⾯展⽰出来。这些指标对⽹站的精准营销、运营都有极⼤帮助。
前⾔:本⽂是⼀个完整的⼤数据项⽬实战,实时|离线统计分析⽤户的搜索话题,并⽤酷炫的前端界⾯展⽰出来。这些指标对⽹站的精准营销、运营都有极⼤帮助。架构⼤致是按照企业标准来的,从⽇志的采集、转化处理、实时计算、JAVA后台开发、WEB前端展⽰,⼀条完整流程线下来,甚⾄每个节点都⽤的⾼可⽤架构,都考虑了故障转移和容错性。所⽤到的框架包括:
Hadoop(HDFS+MapReduce+Yarn)+Flume+KafKa+Hbase+Hive+Spark(SQL、Structured Streaming
)+Hue+Mysql+SpringMVC+Mybatis+Websocket+AugularJs+Echarts。所涉及到的语⾔包括:JAVA、Scala、Shell。
由于本⽂并⾮零基础教学,所以只讲架构和流程,基础性知识⾃⾏查缺补漏。Github已经上传完整项⽬代码:liuyanling41-Github
最终效果图如下:
项⽬架构图如下:
环境准备
**
模拟⽹站实时产⽣⽇志信息**
获取数据源,本⽂是利⽤搜狗的数据:搜狗实验室
编写java类模拟实时采集⽹站⽇志。主要利⽤Java中的输⼊输出流。写好后打成jar包传到服务器上
public class ReadWebLog {
private static String readFileName;
private static String writeFileName;
public static void main(String args[]) {
readFileName = args[0];
writeFileName = args[1];
readFile(readFileName);
}
public static void readFile(String fileName) {
try {
FileInputStream fis = new FileInputStream(fileName);
InputStreamReader isr = new InputStreamReader(fis, "GBK");
//以上两步已经可以从⽂件中读取到⼀个字符了,但每次只读取⼀个字符不能满⾜⼤数据的需求。故需使⽤BufferedReader,它具有缓冲的作⽤,可以⼀次读取多个字符
前端websocket怎么用BufferedReader br = new BufferedReader(isr);
int count = 0;
while (br.readLine() != null) {
String line = br.readLine();
count++;
// 显⽰⾏号
Thread.sleep(300);
String str = new Bytes("UTF8"), "GBK");
System.out.println("row:" + count + ">>>>>>>>" + line);
writeFile(writeFileName, line);
}
isr.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void writeFile(String fileName, String conent) {
try {
FileOutputStream fos = new FileOutputStream(fileName, true);
OutputStreamWriter osw = new OutputStreamWriter(fos);
BufferedWriter bw = new BufferedWriter(osw);
bw.write("\n");
bw.write(conent);
bw.close();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 编写采集⽇志的shell脚本
vim weblog.sh
#/bin/bash
echo "start log"
java -jar /home/weblog.jar /usr/local/weblog.log /home/weblogs.log 运⾏效果图
Flume Agent2采集⽇志信息
主要通过设置Source、Channel、Sink来完成⽇志采集。配置flume配置⽂件 f
a2.sources = r2
a2.channels = c2
a2.sinks = k2
a2.pe = exec
#来源于weblogs.log⽂件
a2.sources.r2mand = tail -F /home/weblogs.log
a2.sources.r2.channels = c2
a2.pe = memory
a2.channels.c2.capacity = 10000
a2.ansactionCapacity = 100
a2.channels.c2.keep-alive = 10
a2.pe = avro
a2.sinks.k2.channel = c2
落地点是master机器的5555端⼝(主机名和端⼝号都必须与master机器的flume配置保持⼀致)
a2.sinks.k2.hostname = master
a2.sinks.k2.port = 5555
编写shell脚本,⽅便运⾏。vim flume.sh
#/bin/bash
echo “flume agent2 start”
bin/flume-ng agent --conf conf --name a2 --conf-file f -logger=INFO,console 运⾏的时候直接 ./flume.sh 即可
Flume Agent3采集⽇志信息
各⽅⾯配置都和Agent2完全⼀样、省略。
Flume Agent1整合⽇志信息
f
#Flume Agent1实时整合⽇志信息
a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaS hbaseS
flume + hbase
a1.pe = avro
a1.sources.r1.channels = kafkaC hbaseC
a1.sources.r1.bind = master
a1.sources.r1.port = 5555
a1.pe = memory
a1.channels.hbaseC.capacity = 10000
a1.ansactionCapacity = 10000
a1.pe = asynchbase
a1.sinks.hbaseS.table = weblogs
a1.lumnFamily = info
a1.sinks.hbaseS.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.hbaseS.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
a1.sinks.hbaseS.channel = hbaseC
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论