『NiFi学习之路』⼊门——下载、安装与简单使⽤⼀、概述
“光说不练假把式。” 官⽹上的介绍多少让⼈迷迷糊糊的,各种⾼⼤上的词语仿佛让 NiFi 离我们越来越远。
实践是最好的⽼师。那就让我们试⽤⼀下 NiFi 吧!
⼆、安装
由于我的整个学习和使⽤过程都是在 Linux 下完成,所以,整个教程也是⾯向 Linux ⽤户的。
其他系统下的使⽤⽅法类似,如有其他系统的⽤户,那么还请有选择性的进⾏参考本教程。
NiFi 是免安装地,仅需从官⽹下载压缩包,然后解压,旋即完成了整个安装过程。
官⽹下载压缩包:,如何压缩包的选择参考⽂章
tar --命令解压,关于解命令烦请参考⽂章
解压结束即完成整个安装过程
三、配置
我们需要对 NiFi 的配置⽂件(存放在[nifi_install_location]/config)进⾏个性化配置。
由于⽬前我们只是简单的上⼿,所以⼤部分配置项都选择了默认的配置。我们仅对 NiFi 进⾏基本的配置。
nifi.properties⽂件针对的是 NiFi 的配置,仅需做如下修改:
session下载# web properties #
nifi.web.war.directory=./lib
nifi.web.http.host=192.168.203.7 # 设置成本机的 ip
nifi.web.http.port=9191
nifi.web.https.host=
nifi.web.https.port=
nifi.web.jetty.working.directory=./work/jetty
nifi.web.jetty.threads=200
其他⽂件在本教程场景下⽆需修改。
四、启动
通过[nifi_install_location]/bin/nifi.sh进⾏start、restart、stop、status等操作。
./bin/nifi.sh start启动 nifi。
浏览器中访问192.168.203.7:9191/nifi(此处的ip与端⼝同你的配置项相关)
即可出现 nifi 的操作界⾯
由于 nifi 加载配置信息较慢,稍作等待。
五、简单使⽤
设定⼀个这样的场景:
统计每次收到的消息中,单词出现的频次。WORD COUNT
5.0 先修:操作⾯板与 Processor
熟悉操作⾯板的各个组件。如何添加、启动、停⽌、删除⼀个 Processor,如何对 Processor 进⾏基本配置?
对于上述问题,请⾃⾏在界⾯上进⾏尝试和练习。
官⽹上也提供了⼀些 Youtube 上的教学视频,请参考这⾥
5.1 涉及到的组件
GenerateFlowFile
ExecuteScript
PutFile
5.2 部署组件
从 Processor 拖取上述组件到操作⾯板上,依照描述的数据流动⽅式将组件串起。
串联组件时,有时需要对组件进⾏关系的选择。即选择上⼀组件分发的正确/错误消息分发到下⼀组件。
同时,末端的组件你需要在它的 setting pannel 指定⾃⼰处理 Success、Failure。
对 Processor 的配置如下:
GenerateFlowFile
Profile -> Custom Text 填⼊需要统计单词频次的⽂章/内容
如:
Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. Some of the high-level capabilities and objectives of Apache NiFi include: Scheduling -> Run Duration 调节⾄ 2s,放缓数据⽣成的速度
ExecuteScript
Profile -> ScriptEngine 选择 Groovy
ScriptBody 填⼊如下 Groovy 代码:
// Code from
import org.apachemons.io.IOUtils
import java.nio.charset.*
def flowFile = ()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream ->
def wordCount = [:]
def tellTaleHeart = String(inputStream, StandardCharsets.UTF_8)
def words = tellTaleHeart.split(/(!|\?|-|\.|\"|:|;|,|\s)+/)*.toLowerCase()
words.each { word ->
def currentWordCount = (word)
if(!currentWordCount) {
wordCount.put(word, 1)
}
else {
wordCount.put(word, currentWordCount + 1)
}
}
def outputMapString = wordCount.inject("", {k,v -> k += "${v.key}: ${v.value}\n"})
outputStream.Bytes(StandardCharsets.UTF_8))
} as StreamCallback)
flowFile = session.putAttribute(flowFile, 'filename', 'telltale_heart_wordcount')
PutFile
Profile -> Directory 填⼊存放结果的⽂件夹,如/home/lbh/logs/result
启动各个 Processor 后,就能清晰地看到数据在 Processors 之间流动。
查看 PutFile 中设置的⽂件存放⽬录,能够看到存放着统计结果的⽂件telltale_heart_wordcount
打开telltale_heart_wordcount⽂件,查看到统计结果如下:
此⽂在我的上同步发布,地址为:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。