NiFiProcessors概述
为了能够创建⼀个⾼效的NiFi数据数据流程,我们需要了解有哪些Processor类型可以使⽤。每个新的NiFi版本Processor数量都会增加,当前NiFi版本为1.12.1,内置了288个类型的Proccessor,这些Processor提供从多个不同系统接收数据、路由、转换、处理、拆分和聚合数据以及将数据分发到多个系统的功能。
按照功能分类:
1. procesor 分类说明
1.1 数据摄取
GetFile: 此“Processor”监控本地磁盘或者⽹络连接磁盘中的⽂件⽂件夹,读取⽂件内容封装为⼀个FlowFile,⽂件的属性将会转化为FlowFile的属性。默认情况下源⽂件会被删除(也可选择保留)。会忽略没有读取权限的⽂件。
GetFTP: 下载FtpServer上的⽂件,读取⽂件内容创建FlowFile。读取完成后原始⽂件会被删除。此processor的⽬的是移动⽂件,⽽不是复制⽂件。
GetSFTP: 下载SFtpServer上的⽂件,读取⽂件内容创建FlowFile。读取完成后原始⽂件会被删除。此processor的⽬的是移动⽂件,⽽不是复制⽂件。
GetJMSQueue: 由ActiveMQ JMS消息队列读取⼀条消息,并更具消息内容创建FlowFile。可以选择是否将JMS属性复制为FlowFile 的属性。
GetJMSTopic: 由ActiveMQ JMS消息队列的topic读取⼀条消息,并更具消息内容创建FlowFile。可以选择是否将JMS属性复制为FlowFile的属性。⽀持持久订阅和⾮持久订阅。
GetHTTP: ⼜⼀个http或者https地址下载内容并创建FlowFile加载到NiFi。这个Processor会记录ETag和Last-Modified Date,以避免重复的获取数据。
ListenHTTP: 启动⼀个http server,监听连接。任何Post Request的content都会被读取转化为FlowFile,会返回⼀个200响应给请求端。
ListenUDP: 监听传⼊的UDP数据包,并为每个数据包或每⼀组数据包创建⼀个FlowFile(取决于配置),并将FlowFile发送
到“success”关系。
GetHDFS: 监视HDFS中⽤户指定的⽬录。 每当有新⽂件进⼊HDFS时,就会将其复制到NiFi中并从HDFS中删除。 GetHDFS的设计⽬标是移动⽂件,⽽不是复制⽂件。 如果该GetHDFS在集中运⾏,为了从HDFS复制数据并使其保持完整,只能允许在主节点上运⾏。如果需要从集中的多个节点流转输数据,则要试⽤ListHDFS处理器。
ListHDFS / FetchHDFS: ListHDFS监视HDFS中⽤户指定的⽬录,并发出⼀个FlowFile,其中包含它遇到的每个⽂件的⽂件名。然后,它通过分布式缓存在整个NiFi集中保持这种状态。然后,这些FlowFile可以在集中分发到FetchHDFS处理器,后者负责获取这些⽂件的实际内容,并发送包含从HDFS获取的内容的FlowFile。
FetchS3Object: 从AWS简单存储服务(S3)获取对象的内容。发出的DataFlow包含从S3接收的内容。
GetKafka: 由kafka(0.8.0版本)取回数据,将每个消息封装为FlowFile也可以将多条封装为⼀个FlowFlie(需要⾃定义分隔符)。其他kafka版本有对应的ConsumeKafkaRecord和ConsumeKafka Processor。
GetMongo: 执⾏⼀个⽤户定义的查询语句,将返回结果写⼊新的FlowFile中。
GetTwitter: 允许⽤户注册⼀个过滤器来监听Twitter“garden hose”或“Enterprise endpoint”,为接收到的每个tweet创建⼀个FlowFile。
1.2. 数据转化
CompressContent: 压缩、解压FlowFile的内容(Content)。
ConvertCharacterSet: 将内容从⼀个字符集编码到另⼀个字符集的字符集
EncryptContent: 加密、揭秘FlowFile的内容(Content)。
ReplaceText: 使⽤正则表达式修改⽂本内容
TransformXml: 使⽤XSLT转化XML内容
JoltTransformJSON: 使⽤ JOLT转化JSON(内容)
1.3. 路由与中介
ControlRate: 控制数据传输到后续Processor的速率
DetectDuplicate: 根据⼀些⽤户定义的条件,监视重复的FlowFiles。 常与HashContent结合使⽤。
DistributeLoad: 可以定义1-n个输出关系,每个关系将输出n分之⼀的数据,可以⽤来负载均衡或者采样数据。
MonitorActivity: 当⽤户定义的时间段过去⽽没有任何数据通过流中的特定点时,发送通知。 (可选)在数据流恢复时发送通知。
RouteOnAttribute: 根据FlowFile包含的属性路由数据。
ScanAttribute: 扫描FlowFile上⽤户定义的属性集,检查是否有属性与⽤户定义的词典相匹配,来决定路由。
RouteOnContent: 搜索FlowFile的内容以查看其是否与任何⽤户定义的正则表达式匹配。 根据是否匹配来决定FlowFile流向。
ScanContent: 在FlowFile的内容中搜索⽤户定义的词典中存在条⽬,并根据这些条⽬的存在或不存在进⾏路由。 该词典可以包含⽂本条⽬或⼆进制条⽬。
ValidateXml: 使⽤⽤户定义的XMLschema验证FlowFile的内容是否有效来路由FlowFile。
1.4. 数据库访问
ConvertJSONToSQL: 将json转化为insert或者update sql,发送给"PutSQL Processor"
ExecuteSQL:执⾏⼀个⽤户定义的sql,将返回结果转化为Avro格式并做为⼀个FlowFile的content。
PutSQL: 通过执⾏由FlowFile的内容定义的sqlddm语句来更新数据库
SelectHiveQL: 对Apache配置Hive数据库执⾏⽤户定义的HiveQL SELECT命令,将结果写⼊Avro或CSV格式的FlowFile
PutHiveQL: 通过执⾏由FlowFile内容定义的HiveQL DDM语句来更新配置的Hive数据库
PutInfluxDB:将FlowFile内容中的'line protocol'数据插⼊到influxdb中。
1.5. 属性提取
EvaluateJsonPath: ⽤户提供JSONPath表达式(类似于XPath,⽤于XML解析/提取),然后根据表达式内容对这些json进⾏计算提取,以替换FlowFile内容或将值提取到⽤户命名的Attribute中。
EvaluateXPath: ⽤户提供XPath表达式,然后根据XML内容对这些表达式进⾏计算提取,以替换FlowFile内容或将值提取到⽤户命名的Attribute中。
EvaluateXQuery: ⽤户提供⼀个XQuery查询,然后根据XML内容对该查询进⾏计算提取,以替换FlowFile内容或将该值提取到⽤户命名的Attribute中。
ExtractText: 使⽤⼀个或多个正则表达是有FlowFile内容中提取数据,以替换FlowFile内容或将该值提取到⽤户命名的Attribute中。
HashAttribute: 针对⽤户定义的⼀个属性列表串联执⾏哈希函数。
HashContent: 针对FlowFile的内容执⾏哈希函数,并将哈希值添加为属性。
IdentifyMimeType: Evaluates the content of a FlowFile in order to determine what type of file the FlowFile encapsulates.
This Processor is capable of detecting many different MIME Types, such as images, word processor documents, text, and compression formats just to name a few.
UpdateAttribute: 向FlowFile添加或更新任意数量的⽤户定义属性。 这对于添加静态配置的值以及使⽤表达式语⾔动态派⽣属性值很有⽤。 该处理器还提供了⼀个“⾼级⽤户界⾯”,允许⽤户根据⽤户提供的规则有条件地更新属性。
1.6. 系统交互
ExecuteProcess: 执⾏⼀个系统命令,将命令的标准输出捕获做为新建FlowFile的Content。命令接收任何输出。 例如可以使⽤“tail”命令补货⽂件内容。
ExecuteStreamCommand: 执⾏⼀个系统命令,将输⼊的FlowFile内容做为命令的输⼊,将命令的输出做为输出FlowFile的内容。1.7.数据出⼝/数据发送
PutEmail: 向配置的邮件地址发送电⼦邮件。 FlowFile的内容可以选择作为附件发送。
PutFile: 将FlowFile的内容写⼊本地(或⽹络连接)⽂件系统上的⽬录中。
PutFTP: 将FlowFile的内容复制到远程FTP服务器。
PutSFTP: 将FlowFile的内容复制到远程SFTP服务器。
PutJMS: 将FlowFile的内容作为JMS消息发送到JMS代理,可以选择基于属性添加JMS属性。
PutSQL: 以SQL DDL语句(INSERT,UPDATE或DELETE)的形式执⾏FlowFile的内容。 FlowFile的内容必须是有效的SQL语句。
可以将属性⽤作参数,以便可以将FlowFile的内容参数化为SQL语句,以避免SQL注⼊攻击。
PutKafka: 将FlowFile的内容作为消息发送到Apache Kafka,特别是针对0.8.x版本。 FlowFile可以作为单个消息发送,也可以使⽤分隔符(例如可以指定换⾏符)发送,以便为单个FlowFile发送许多消息。
PutMongo: 将FlowFile的内容作为INSERT或UPDATE发送到Mongo。
1.8. 拆分与合并
SplitText: SplitText接收单个FlowFile,其内容为⽂本,然后根据配置的⾏数将其拆分为1个或多个FlowFile。例如,处理器可以配置为将FlowFile拆分为多个FlowFile,每个⽂件只有1⾏。
SplitJson: 允许⽤户根据JSON元素将由数组或许多⼦对象组成的JSON对象拆分为FlowFile。
SplitXml: 允许⽤户将XML消息拆分为许多FlowFile,每个⽂件都包含原始⽂件的⼀部分。通常在将⼏个XML元素与“包装”元素结合在⼀起时使⽤。然后,此处理器允许将那些元素拆分为单独的XML元素。
UnpackContent: 解压缩不同类型的存档格式,例如ZIP和TAR。然后,存档中的每个⽂件都将作为单个FlowFile进⾏传输。
MergeContent: 此处理器负责将多个FlowFile合并为⼀个FlowFile。可以通过将FlowFiles的内容与可选的页眉,页脚和分界符连接在⼀起,或通过指定存档格式(例如ZIP或TAR)来合并FlowFiles。 FlowFiles可以基于公共属性进⾏装箱,如果通过其他拆分过程将它们分开,则可以对其进⾏“碎⽚整理”。每个bin的最⼩和最⼤⼤⼩由⽤户指定,具体取决于元素的数量或FlowFiles内容的总⼤⼩,还可以分配⼀个可选的Timeout,以便FlowFiles仅在特定时间内等待其bin变满多少时间。
transform和convert的区别SegmentContent: 根据⼀些已配置的数据⼤⼩,将FlowFile分割为可能较⼩的FlowFile。不针对任何类型的分界符执⾏拆分,仅基于字节偏移量执⾏。在传输FlowFiles之前使⽤它,以便通过并⾏发送许多不同的⽚段来提供较低的延迟。在另⼀⽅⾯,这些FlowFiles然后可以由MergeContent处理器使⽤碎⽚整理模式重新组装。
SplitContent: 将单个FlowFile拆分为可能的多个FlowFile,类似于SegmentContent。但是,使⽤SplitContent时,不对任意字节边界执⾏拆分,⽽是指定了要在其上拆分内容的字节序列。
1.9. http
GetHTTP: 将基于远程HTTP或HTTPS的URL的内容下载到NiFi中。处理器将记住ETag和上次修改⽇期,以确保不会重复摄取数据。
ListenHTTP: 启动HTTP(或HTTPS)服务器并监听传⼊的连接。对于任何传⼊的POST请求,将请求的内容将作为FlowFile内容,并返回200响应。
InvokeHTTP: 执⾏由⽤户配置的HTTP请求。该处理器⽐GetHTTP和PostHTTP具有更多的⽤途,但是需要更多的配置。该处理器不能⽤作“源处理器”,并且必须具有传⼊的FlowFiles才能被触发执⾏其任务。
PostHTTP: 执⾏HTTP POST请求,将FlowFile的内容作为Post的内容。在⽆法使⽤s2s的情况下,通常将它与ListenHTTP结合使⽤,以便在两个不同的NiFi实例之间传输数据。 注意:HTTP可以作s2s传输使⽤协议以及现有的RAW套接字传输。它还⽀持HTTP代理。建议使⽤HTTP Site-to-Site,因为它具有更⾼的可扩展性,并且可以使⽤输⼊/输出端⼝提供双向数据传输,并具有更好的⽤户⾝份验证和授权。
HandleHttpRequest / HandleHttpResponse: HandleHttpRequest处理器是⼀个源处理器,它启动类似于ListenHTTP的嵌⼊式HTTP(S)服务器。但是,它不会向客户端发送响应。相反,将以HTTP请求的主体作为其内容和属性(所有典型Servlet参数,标头等)作为属性来发送FlowFile。然后,在FlowFile处理完毕后,HandleHttpResponse便能够将响应发送回客户端。始终希望这些处理器可以相互结合使⽤,并允许⽤户在NiFi中直观地创建Web服务。这对于将前端添加到⾮基于Web的协议或围绕NiFi已执⾏的某些功能(例如数据格式转换)添加简单的Web服务特别有⽤
1.10. amazon web services
FetchS3Object: 获取存储在Amazon Simple Storage Service(S3)中的对象的内容。 然后将其写⼊FlowFile的内容。
PutS3Object: 使⽤配置的凭证,密钥和存储桶名称将FlowFile的内容写⼊Amazon S3对象。
PutSNS: 将FlowFile的内容作为通知发送到Amazon Simple Notification Service(SNS)。
GetSQS: 从Amazon Simple Queuing Service(SQS)中提取⼀条消息,并将消息的内容写⼊FlowFile的内容。
PutSQS: 将FlowFile的内容作为消息发送到Amazon Simple Queuing Service(SQS)
DeleteSQS:从Amazon Simple Queuing Service(SQS)中删除⼀条消息。 可以将其与GetSQS结合使⽤,以便从SQS接收消息,对其进⾏⼀些处理,然后仅在对象成功完成处理后才从队列中删除该对象。
2.操作属性
每个FlowFile创建时都有⼏个属性,并且属性将在FlowFile的⽣命周期内发⽣变化。 属性这个概念提供了三个主要好处:
⾸先,它允许⽤户在数据流程中制定路由决策,根据属性决定数据流经不同的流程路径。
其次,保存了数据元数据,很多Processor需要这些元数据做为参数。以使处理器的配置取决于数据本⾝。例如,PutFile Processor 能够使⽤属性来知道每个FlowFile的存储位置,⽽每个FlowFile的⽬录和⽂件名属性可能不同。
最后,属性为数据提供了⾮常有价值的上下⽂。属性可以做为搜索数据的条件,这允许⽤户搜索与特定条件匹配的出处数据,并且还允许⽤户在检查出处事件的详细信息时查看此上下⽂。通过这样做,⽤户只需浏览⼀下随内容⼀起携带的上下⽂,就能够理解为什么以⼀种或另⼀种⽅式处理数据。
2.1 ⼀般属性(系统属性)
所有FlowFile都会有的属性⼀个最⼩的属性集合,⼀般由NiFi进⾏维护 :
filename: ⽂件名,可以⽤来把数据保存到本地磁盘或者远程⽂件系统(nas,hdfs等)。
path: 路径,存储数据的本地或者远程⽂件夹
uuid: 每个FlowFile都有的唯⼀值,⽤于在nifi中唯⼀标识⼀个FlowFile.
entryDate: 数据进⼊NiFi的时间. 是长整数,表⽰"1970.1.1 00:00:00" 到当前时间点的毫秒数.
lineageStartDate: ⼀个FlowFile被复制、合并、拆分都会创建⼀个或者多个⼦FlowFile。这些⼦FlowFile再经过转换就构成了⼀个⾎缘链。A此值表⽰最早的祖先进⼊系统的⽇期和时间。 考虑另⼀种意义,该属性表⽰FlowFile通过系统的延迟。 该值是⼀个数字,表⽰⾃1970年1⽉1⽇午夜(UTC)以来的毫秒数。
fileSize: 内容的字节数,即⽂件的⼤⼩。
这些属性都是系统⽣成的,不能修改。
2.2 提取属性
NiFi提供了⼏种开箱即⽤的Processor,⽤于从FlowFiles中提取属性。同样,这也是构建⾃定义Processor的常见的例⼦。 许多Processor被编写来处理特定的数据格式,并从FlowFile的内容中提取相关信息,创建属性来保存该信息,以便随后就如何路由或处理数据做出决策。
2.3 添加⽤户⾃定义属性
除了使Processor能够从FlowFile内容中提取特定信息到属性外,⽤户还需要在数据流程中的特定位置向每个FlowFile添加⽤户定义属性。“UpdateAttribute Processor”专为此⽬的⽽设计。通过单击“属性”选项卡右上⾓的“ +”按钮,⽤户可以在“配置”对话框中向Processor添加新属性。然后提⽰⽤户输⼊属性的名称,然后输⼊值。对于此UpdateAttribute Processor处理的每个FlowFile,将为会把⽤户定义的属性添加到FlowFile属性中。属性的名称将与添加的属性的名称相同。属性的值将与属性的值相同,属性的值也可以包含表达语⾔。这允许基于其他属性修改或添加属性。例如,如果要在处理⽂件的主机名和⽇期前加上⽂件名,则可以通过添加名称为“
filename”和值为“${hostname()}-${now():format('yyyy-dd-MM')}-${filename}”的属性来实现,这需要⽤户熟悉NiFi表达式语⾔。除了固定添加⼀组定义的属性外,UpdateAttribute Processor还具有⼀个⾼级UI,该UI允许⽤户配置⼀组规则,在这些规则上应添加属性。要使⽤此功能,请在“配置”对话框的“属性”标签中,点击对话框底部的“⾼级”按钮。这是提供专门针对此processor定制的UI,其他Processor
没有此功能。在此UI内,⽤户能够配置规则引擎,从本质上讲,指定必须匹配的规则才能将已配置的属性添加到FlowFile。
UpdateAttribute 属性配置窗⼝

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。