CamelInAction第三章数据转换
第三章数据转换
本章包括:
使⽤EIPs和Java两种⽅式装换数据
转换XML格式的数据
通⽤数据格式装换
编写转换器
理解Camel的类型装换机制
在现实⽣活中,⼈们说不⽤的语⾔,在IT世界中,有不同的协议。当进⾏系统集成时,软件⼯程师经常需要在各种协议之间充当调解⼈。为了解决这个问题,使⽤的数据模型必须从⼀种形式转换到另⼀个协议,以适应任何协议发的接收者都能够理解。中介和数据转换是⼀个关键的特性在camel中。
camel提供了许多数据转换技术,不久,我们将介绍他们。但⾸先我们⾸先概述介绍camel数据转换。
数据转换包括两⽅⾯的内容:1.数据格式装换---消息体的格式从⼀种形式转换为另⼀种形式:XML---->json。2、数据类型的转换---消息体的格式从⼀种类型转换为另⼀种类型:java.lang.Stringis----> javax.jms.TextMessage.
图3.1说明了消息体转换从⼀种形式到另中形式的原则。这种转换原则适⽤于任何格式转换和类型转换。在Camel应⽤中你⾯临的数据转换⼤多数情况下都是格式转换,此时你需要在两个协议之间做格式调解。Camel内置了类型转换机制,可以⾃动在两种类型之间转换,这样就⼤⼤减轻了终端⽤户处理类型转换的负担。
3.1.1 Camel的数据转换
在Camel中,典型的数据转换有以下六种形式,见列表3.1:
转换形式
路由中的数据转换
描述
在路由中,你可以显⽰的进⾏数据转换,使⽤Message Translator或者Content Enricher企业集成模式。这样你可以使⽤java代码进⾏数据映射。
转换形式
使⽤组件进⾏数据转换
描述
Camel提供了⼀些组件⽤于数据转换,例如XSLT组件⽤于XML转换。
转换形式
数据格式转换
描述
Camel中的数据格式转换都是成对格式出现的,⼀种格式转换为另⼀种格式,反之亦然。
转换形式
使⽤模板进⾏数据转换
描述
Camel提供了⼀些组件⽤于模板转换,例如 Apache Velocity组件。
转换形式
使⽤Camel的类型转换机制进⾏数据类型转换
描述
Camel有⼀个复杂的类型转换程序机制,此机制按需激活。这样⽅便了常见类型间的转换,如java.lang.Integer类型到java.lang.String类型的换⾏,java.io.File 类型到java.lang.String类型间的转换。
转换形式
组件适配器中的消息转换
描述
Camel的许多组件适应各种常⽤的协议,因此,需要能够对这些协议的消息进⾏转换。通常这些组件结合使⽤⾃定义数据转换和类型转换器。这种情况⽆缝地发⽣,只有组件创建着需要考虑。第⼗⼀章详解。
3.2 使⽤EIP(企业集成模式)和java代码进⾏数据转换
数据映射是指两个不同的数据模型之间的映射的过程,数据映射是数据集成的⼀个关键因素。⽬前关于数据模型的标准有很多,这些标准由不同的机构或者委员会制定。因此,你会经常发现⾃⼰需要从公司的⾃定义数据模型映射到⼀个标准的数据模型。
Camel中的数据映射给⽤户提供了很⼤的⾃由,因为它允许你使⽤java代码,并不局限于使⽤特定数据映射⼯具,使⽤⼯具,起初可能看起来优雅,但结果可能出现不可能解决的问题。
在本节中,我们将会看到如何使⽤Processor进⾏数据映射,Processor是⼀个Camel API。Camel也可以使⽤bean进⾏数据映射,这是⼀个好的做法,因为它允许你的映射逻辑独⽴于Camel API。
3.2.1 使⽤消息转换器模式
消息转换器模式如图3.2所⽰。该模式可以将消息从⼀种格式转换为另⼀种格式,类似于设计模式中的适配器模式。
Camel提供了三种⽅式来使⽤此模式:
1、使⽤Processor
2、使⽤Bean
3、使⽤<transform>
使⽤Processor
Processor是Camel中的⼀个接⼝使⽤Processor,此接⼝只有⼀个⽅法:
public void process(Exchange exchange) throws Exception;
从上述⽅法可以看出,Processor是⼀个可以直接操作Camel的Exchange对象的API。它让你可以访问所有在Camel的CamelContext中传输的部分。CamelContext对象可以通过Exchange的getCamelContext⽅法得到。
让我们看⼀个例⼦。在骑⼠汽车零部件系统中,每天都会把收到的订单输出为CSV格式的⽂件。公司使⽤了⼀个⾃定义格式的订单实体,但是为了使事情变得简单,他们使⽤了⼀个HTTP服务,此服务会根据输⼊的⽇期参数返回⼀个订单列表。你⾯对的挑战就在于:把HTTP服务返回的数据映射为CSV格式,并写到⽂件中。
因为你想从⼀个快速原型开始,您决定使⽤Camel的Processor.
代码列表3.1 使⽤Processor将⼀个⾃定义格式转换为CSV格式。
⾸先从exchange中获取⾃定义格式的内容。它是String类型的,所以你传⼊了String参数,Exchange返回了String类型的结果。接着你从⾃定义格式中提取数据到本地变量。⾃定义格式的内容可以是任意的,但在这个例⼦中,它是⼀个定长的⾃定义格式。接着你通过构建⼀个以逗号分隔的字符串将⾃定义格式映射为CSV格式。最后,使⽤CSV格式的负载替换了⾃定义格式的负载。
你可以在下列路由中使⽤上⾯的OrderToCsvProcessor:
from("quartz://report?cron=0+0+6+*+*+?")
.to(" ")
.process(new OrderToCsvProcessor())
.to("file://riders/orders?fileName=report-${header.Date}.csv");
上述路由使⽤Quartz组件安排⼯作在6点每天运⾏⼀次。然后它调⽤返回⾃定义格式的HTTP服务来检索昨天收到的订单。接下来,它使⽤OrderToCSVProcessor 从⾃定义格式映射到CSV格式将结果写⼊⽂件。
等价的路由在Spring XML中配置如下:
<bean id="csvProcessor" class="camelinaction.OrderToCsvProcessor"/>
<camelContext xmlns=" ">
<route>
<from uri="quartz://report?cron=0+0+6+*+*+?"/>
<to uri=" ;date=yesterday"/>
<process ref="csvProcessor"/>
<to uri="file://riders/orders?fileName=report-${header.Date}.csv"/>
</route>
</camelContext>
注意:如何使⽤Exchange的getOut和getIn⽅法
replaceall()
Exchange定义了两个检索消息的⽅法:getIn和getOut。getIn⽅法返回传⼊的消息,getOut⽅法访问出站消息。
有两种场景,Camel终端⽤户需要决定使⽤哪个⽅法:
1、只读场景,例如打印传⼊消息的⽇志;
2、可写的场景:例如转换消息格式的时候;
在第⼆种场景中,你可能会使⽤getOut⽅法,这在理论上是没错的。但是实践中,这种⽅式有缺点:传⼊的消息headers 和 attachments将会丢失。这可能不是你想要的。所以你必须拷贝headers 和 attachments到出站消息中,这个步骤通常是冗长乏味的。变通的⽅式是使⽤getIn⽅法设置消息内容的变化,永远不要使⽤getOut。
使⽤Processor有⼀个缺点:必须和Camel的API绑定。
使⽤bean进⾏数据转换
使⽤bean进⾏数据转换是⼀个很好的实践,因为这种⽅式允许你使⽤任何你需要的java代码或者java类库。Camel对这点没有强加任何限制。Camel可以调⽤你开发的任意bean,甚⾄你可以使⽤已经存在的bean,⽽不需要重写或者重新编译他们。
让我们使⽤ bean代替前⾯的那个Processor。
代码列表3.2
代码列表3.1和3.2的第⼀个明显的区别是,代码列表3.2没有使⽤任何Camel的接⼝。这意味着你的bean彻底独⽴于Camel API。另⼀个区别:你可以对⽅法签名进⾏命名,在上述代码列表中,⽅法是⼀个名为map的静态⽅法。
⽅法签名定义了协议,意味着第⼀个参数String custom,对应将要转换的消息体。⽅法返回类型为String类型,对应转换后的数据类型为String类型。运⾏
时,Camel会绑定⽅法签名。这⾥我们不深究更多的细节,第四章将会详细介绍。相应的DSL和Sping XML如下:
from("quartz://report?cron=0+0+6+*+*+?")
.to(" ")
.bean(new OrderToCsvBean())
.to("file://riders/orders?fileName=report-${header.Date}.csv");
<bean id="csvBean" class="camelinaction.OrderToCsvBean"/>
<camelContext xmlns=" ">
<route>
<from uri="quartz://report?cron=0+0+6+*+*+?"/>
<to uri=" ;date=yesterday"/>
<bean ref="csvBean"/>
<to uri="file://riders/orders?fileName=report-${header.Date}.csv"/>
</route>
</camelContext>
在DSL中使⽤TRANSFORM()⽅法进⾏数据转换
TRANSFORM()是可以⽤在路由中的⼀个⽅法,可以⽤来进⾏消息转换。利⽤表达式,TRANSFORM()
具有极⼤的灵活性,有时还可以节省时间。例如,假设你需要使⽤<br/>标记取代所有HTML格式数据中的的换⾏符。此时可以使⽤Camel内置的表达式和正则表达式的搜索和替换:
from("direct:start")
.transform(body().regexReplaceAll("\n", "<br/>"))
.to("mock:result");
上述路由使⽤transform()⽅法告诉Camel:消息要使⽤表达式进⾏转换。Camel利⽤建造者模式从路由中的表达式创建了整个表达式。这是通过⽅法调⽤链接在⼀起,这是建造者模式的本质。
在这个例⼦中,你联合使⽤了body()和regexReplaceAll()表达式,表达式应该这样读:获取body,并执⾏⼀个正则表达式,使⽤<br/>替换所有的\n,两个表达式组成了⼀个联合Camel表达式。
Direct组件
这个例⼦中使⽤到了Direct组件作为路由的输⼊源(from("direct:start"))。Direct组件提供⽣产者和消费者之间的直接调⽤。但是调⽤只在同⼀个Camel应⽤中有效,所以外部系统不能直接给direct组件发消息。这个组件经常⽤于Camel应⽤中的路由连接或者路由测试。
Camel⽀持使⽤⾃定义表达式。当时想使⽤java代码对其完全控制时,可以使⽤⾃定义表达式。例如:前⾯的例⼦可有如下实现⽅式:
from("direct:start")
.transform(new Expression() {
public <T> T evaluate(Exchange exchange, Class<T> type) {
String body = In().getBody(String.class);
body = placeAll("\n", "<br/>");
body = "<body>" + body + "</body>";
return (T) body;
}
})
.to("mock:result");
在Spring XML中使⽤<transform>进⾏数据转换
这种⽅式不像上⼀种⽅式那样强⼤。在Spring XML中,建造者模式的表达式不可⽤,因为XML底层没有⼀种真正的编程语⾔。你可做的就是调⽤⼀个bean的⽅法或者使⽤脚本语⾔。
让我们看下他是如何⼯作的。下⾯的路由通过调⽤⼀个bean的⽅法作为表达式:
<bean id="htmlBean" class="camelinaction.HtmlBean"/>
<camelContext id="camel" xmlns=" ">
<route>
<from uri="direct:start"/>
<transform>
<method bean="htmlBean" method="toHtml"/>
</transform>
<to uri="mock:result"/>
</route>
</camelContext>
 ⾸先,您声明⼀个常规的spring bean⽤于转换消息。然后,在路由中,您使⽤<transform>和<method>调⽤了这个Bean。htmlBean的实现⾮常简单:
public class HtmlBean {
public static String toHtml(String body) {
body = placeAll("\n", "<br/>");
body = "<body>" + body + "</body>";
return body;
}
}
在Camel中,你也可以使⽤脚本语⾔作为表达式。例如,你可以使⽤Groovy,MVEL,JavaScript或者Camel的脚本语⾔(名为Simple)(见附录A)。在这⾥我们
只演⽰Simple语⾔,Simple语⾔可以只⽤占位符构建字符串信息:
<transform>
<simple>Hello ${body} how are you?</simple>
</transform>
3.2.2 使⽤Content Enricher企业集成模式
Content Enricher企业集成模式的阐述见图3.3。这种模式描述了这样⼀种场景,⽤来⾃另⼀个消息源返回的数据来丰富⼀个消息的数据。
为了⽅便理解这个模式,我们再看其实汽车零部件系统。事实证明你在代码清单3.1中所做的数据映射是不能满⾜需求的。订单被堆积在了FTP服务器上,你的⼯作就是以某种⽅式将这些信息合并到现有的报告⽂件中。图3.4描述了这个场景。
---在图3.4中,使⽤Quartz在每天6点准时开始路由,
---从HTTP server中获取⾃定义格式的订单数据
---订单数据被转换为CSV格式
---此时,你必须执⾏额外的内容丰富步骤,使⽤从FTP服务器中获得的数据进⾏丰富。
---接着,最终⽣成的订单报告别写⼊⽂件服务器
在介绍上图路由的实现⽅式之前,我们需要看⼀下在Camel中Content Enricher企业集成模式是如何实现的。Camel提供了两个DSL操作⽤于实现这个模式:
1、pollEnrich---这个操作使⽤⼀个消费者合并来⾃另⼀个源的数据
2、enrich---这个操作使⽤⼀个⽣产者合并来⾃另⼀个源的数据
pollEnrich和enrich两个操作的区别
两者的区别在于pollEnrich使⽤⼀个消费者合并来⾃另⼀个源的数据,⽽enrich使⽤⼀个⽣产者合并来⾃另⼀个源的数据.了解其中的不同是⾮常重要的:file组件可以使⽤这两个操作,但是使⽤enrich操作,会把信息写⼊到⼀个⽂件中;使⽤pollEnrich操作,将会读取⽂件内容作为消息源,这种情况⾮常类似
上⾯那个场景。HTTP组件只能使⽤enrich操作,它允许您调⽤⼀个外部HTTP服务,使⽤它的返回结果作为源。
Camel使⽤org.apache.camel.processor.AggregationStrategy接⼝来合并原消息和丰富源数据:
Exchange aggregate(Exchange oldExchange, Exchange newExchange);
你需要实现aggregate⽅法。这个⽅法有两个参数:第⼀个参数oldExchange,包含原消息的exchange,第⼆个参数newExchange,是丰富源数据的exchange。你的⼯作就是使⽤java代码合并丰富源数据到原消息,并返回合并结果。听起来可能有些迷惑,让我们看下实际的例⼦。
为了解决上述骑⼠汽车零部件系统中的问题,你需要使⽤pollEnrich操作,因为这个操作是从FTP服务器上轮询获取⽂件的。
使⽤pollEnrich丰富消息
代码列表3.3展⽰了如何使⽤pollEnrich从远程的FTP服务器上获取额外的订单,并使⽤AggregationStrategy将这些订单数据合并到已有消息中。
这个路由每天下午六点被Quartz组件触发。调⽤HTTP服务获取订单,并使⽤Process把他们转换为CS
V格式。此时,你需要使⽤来⾃远程FTP服务器上的订单来丰富已有订单数据,这点使⽤pollEnrich来完成,此操作消费远程⽂件。
为了合同数据,你说⽤了AggregationStrategy。⾸先,判断是否有数据被消费。如果newExchange为null,那么没有远程⽂件被消费,你只需返回已有数据;如果有远程⽂件,你通过连接现有的数据与新数据实现合并,并把合并后的数据设置到oldExchange中。接着,返回合并后的数据(oldExchange)。最后,使⽤file组件⽣成CSV报告⽂件。
PollEnrich使⽤⼀个轮询消费者来检索消息,这个操作提供了三种超时模式:
---pollEnrich(timeout = -1) 轮询消息并等待,直到⼀个消息到达。这种模式在消息到达前将阻塞。
---pollEnrich(timeout = 0) 如果消息存在,⽴即检索消息;否则返回null。这种模式不会等待消息的到达,所以此模式不会阻塞。这是默认选中的模式。
---pollEnrich(timeout > 0) 检索消息,如果没有消息存在,等待,最多等待到超时。这种模式将可能阻塞。
最佳实践⽅式是使⽤timeout=0或者给timeout赋⼀个固定值,避免没有消息到达时⼀直等待。
注意:
Enrich和pollEnrich不能访问当前的exchange中的信息。也就是说,例如,你不能通过设置当前exchange的filename头部,使pollEnrich选择⼀个特定的⽂件,因为pollEnrich⽆法读取当前exchange的头部信息。这⼀点在Camel的未来版本中可能会⽀持。
使⽤enrich来丰富消息
当你使⽤请求-响应消息返回的数据来丰富当前消息时,需要使⽤enrich操作。⽐如⽤webService返回的结果来丰富当前消息。来看另⼀个例⼦,使⽤Spring XML,使⽤TCP传输丰富当前的消息:
<bean id="quoteStrategy"
class="camelinaction.QuoteStrategy"/>
<route>
<from uri="activemq:queue:quotes"/>
<enrich url="mina:tcp://riders:9876?textline=true&sync=true"
strategyRef="quoteStrategy"/>
<to uri="log:quotes"/>
</route>
在这⾥你使⽤了Camel的mina组件来进⾏TCP传输,通过使⽤sync=true配置选项实现了请求--响应消息。要合并原消息和远程返回的数据,<enrich>元素需要
引⽤⼀个AggregationStrategy对象,代码中通过strategyRef属性实现了这⼀点。正如你所看到的,strategyRef属性所引⽤的quoteStrategy是个bean的id,此id 就是AggregationStrategy的实现类bean的id,这个地⽅也是合并发⽣所在地。
3.3 XML格式的数据转换(此部分略)
3.4 数据格式转换
在Camel中,数据格式转换时可插拔的数据转换,可将消息从⼀种形式转换为另⼀种形式。Camel中,每⼀种数据格式转换都由接⼝
org.apace.camel.spi.DataFormat代表,此接⼝包含两个⽅法:
1、marshal⽅法:封存⼀个消息为另⼀种形式,⽐如封存java对象为XML, CSV, EDI, HL7等数据模型;即对象--->⼆进制
2、unmarshal⽅法:进⾏⼀种反向操作,将某种格式的数据转换为消息,即⼆进制--->对象
见图3.6所⽰。
在3.3节我们提到了数据格式,我们讨论了XML转换。本节将深度介绍数据格式,这次我们不使⽤XML数据类型,使⽤CSV和JSON数据类型。还将学习如何创建⾃定义的数据格式。我们⾸先简要看⼀下Camel提供的开箱即⽤的数据格式。
3.4.1Camel提供的数据格式
Camel为⼀些众所周知的数据模型提供了⼀系列的数据格式,如表3.3所⽰。
如表中所⽰,Camel提供了18中开箱即⽤的数据格式,我们只介绍其中最常⽤的三种。详细信息参见:
3.4.2 使⽤CSV数据格式
3.4.3 使⽤Bindy数据格式
3.4.4 使⽤json数据格式
JSON(JavaScript对象表⽰法)是⼀种数据交换格式,Camel提供了两个组件,⽀持JSON数据格式:camel-xstream和camel-jackson。这⼀节关注camel-jackson。
回到骑⼠汽车配件系统,你现在必须实现⼀个新的服务,此服务返回JSON格式的订单摘要。在Camel中实现这个服务是⾮常容易的。快速构建的⼀个模型见代码列表3.9
<bean id="orderService" class="camelinaction.OrderServiceBean"/>
<camelContext id="camel" xmlns=" ">
<dataFormats>
<json id="json" library="Jackson"/>
</dataFormats>
<route>
<from uri="jetty:// :8080/order"/>
<bean ref="orderService" method="lookup"/>
<marshal ref="json"/>
</route>
</camelContext>
⾸先你需要设置的JSON数据格式,指定使⽤Jackson类库。然后你定义⼀个路由,使⽤Jetty作为HTTP服务端点。这个服务将请求路由到orderService bean上⾯,调⽤bean的lookup⽅法。此⽅法返回的结果被marshal为json格式的数据,然后返回给HTTP客户端。
orderService bean可以有这样⼀个⽅法:
public PurchaseOrder lookup(@Header(name = "id") String id)
这个签名允许您实现查逻辑。在4.5.3节中你会了解更多关于@Header注释。
注意这个bean⽅法返回⼀个POJO对象,JSON类库可以对这个POJO进⾏marshal,⽐如你使⽤了代码列表3.7中的PurchaseOrder对象,那么会⽣成下⾯的json字符串:
{"name":"Camel in Action","amount":1.0,"price":49.95}
HTTP服务本⾝可以被⼀个GET请求调⽤,使⽤id作为请求参数:
:8080/order/service?id=123.
3.4.5 配置Camel数据格式
3.4.6 ⾃定义数据格式
在项⽬中,⼀个经常要做的事情就是⽤⾃定义的数据格式进⾏数据转换。这⼀节,我们将看看如何开发⼀个数据格式,可以翻转字符串
开发⾃定义数据格式是⾮常简单的,因为你只需要实现Camel的⼀个接⼝:org.apache.camel.spi.DataFormat.让我们看看如何实现⼀个字符串翻转功能的数据格式。见代码列表3.11.
---实现DataFormat接⼝:marshal和unmarshal两个⽅法。
---marshal⽅法需要向OutputStream中输出结果,所以你需要以byte[]的形式获取消息负载(使⽤Camel的类型装换器实现),接着在⼀个⼯具⽅法中翻转它;接着将数据写⼊OutputStream;
---unmarshal⽅法中:再次使⽤Camel的类型转换器获取消息负载。unmarshal⽅法同样对消息负载进⾏了翻转。在unmarshal⽅法中直接返回了数据⽽不是写⼊IO流中。
在路由中使⽤这个新数据格式,需要把这个数据格式定义为Spring bean,并且在<marshal>和<unmarshal>元素中引⽤这个bean:
<bean id="reverse" class="camelinaction.ReverseDataFormat"/>
<camelContext id="camel" xmlns=" ">
<route>
<from uri="direct:marshal"/>

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。