Split and Transform Files

Overview

In this lab, you develop Camel routes that read from rest, split them into individual records, and transform them into Java objects that are written as individual JSON files. The transformation is done using the Fuse Integration Editor. You also develop exception handling logic to process incorrectly formatted records, the result persist to AMQ.

Goals

  • Build an Apache Camel route that consumes CSV records from REST API and transforms the records into Java objects using the Bindy transformation component

  • Split the list of received Java objects and apply a Java-to-Java transformation using the Fuse Integration Editor (from org.acme.Customer to org.globex.Account)

  • Map the fields as defined within the table

  • Transform the org.globex.Account Java objects into individual JSON records

  • Move the JSON record to a message broker Queue

  • Implement the DLQ pattern to move erroneous CSV records into an error queue

  • Design a unit test to validate the data transformations

index.png

camelContext

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
    <!-- Define a traditional camel context here -->
    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <propertyPlaceholder id="properties" location="route.properties"/>
        <endpoint id="csv2json" uri="dozer:csv2json2?sourceModel=org.acme.Customer&amp;targetModel=org.globex.Account&amp;marshalId=json&amp;unmarshalId=csv&amp;mappingFile=transformation.xml"/>
        <!-- CSV Input & JSon OutPut DataFormat -->
        <dataFormats>
            <bindy classType="org.acme.Customer" id="csv" type="Csv"/>
            <json id="json" library="Jackson"/>
        </dataFormats>
        <restConfiguration bindingMode="off" component="servlet" contextPath="/rest"/>
        <rest apiDocs="true" id="rest-entry" path="/service">
            <post id="rest-entry_post" uri="/customers">
                <to uri="direct:inbox"/>
            </post>
        </rest>
        <!--
             Transformation Endpoint doing csv2java conversion
             Step 1 : csv record is converted java using camel bindy
             Step 2 : Dozer transformation of Customer to Account
             Step 3 : Java to Json conversion using json jackson
        -->
        <route id="_route1" streamCache="true">
            <!-- Consume files from input directory -->
            <from id="_from1" uri="direct:inbox"/>
            <onException id="_onException1">
                <exception>java.lang.IllegalArgumentException</exception>
                <handled>
                    <constant>true</constant>
                </handled>
                <log id="_log1" message=">> Exception : ${body}"/>
                <setExchangePattern id="_setExchangePattern1" pattern="InOnly"/>
                <to id="_to1" uri="amqp:queue:errorQueue"/>
            </onException>
            <split id="_split1">
                <tokenize token=";"/>
                <to id="_to2" ref="csv2json"/>
                <setExchangePattern id="_setExchangePattern2" pattern="InOnly"/>
                <to id="_to3" uri="amqp:queue:accountQueue"/>
                <log id="_log2" message=">> Completed JSON: ${body}"/>
            </split>
            <transform id="_transform1">
                <constant>Processed the customer data</constant>
            </transform>
        </route>
    </camelContext>
</beans>

本地运行

1. 创建并启动 AMQ broker
$ cd amq-broker-7.2.0/
$ ./bin/artemis create  --user admin --password password --role admin --allow-anonymous y ./instances/broker1
$ cd instances/broker1/
$ ./bin/artemis run
2. 启动 Spring Boot camel route
$ cd fuse-get-started/rest-split-transform-amq/ && mvn clean install
$ java -jar target/rest-split-transform-amq-1.0.0.jar
3. 执行 REST Post 触发一次流程
$ curl -k http://localhost:8080/rest/service/customers -X POST  -d 'Rotobots,NA,true,Bill,Smith,100 N Park Ave.,Phoenix,AZ,85017,602-555-1100;BikesBikesBikes,NA,true,George,Jungle,1101 Smith St.,Raleigh,NC,27519,919-555-0800;CloudyCloud,EU,true,Fred,Quicksand,202 Barney Blvd.,Rock City,MI,19728,313-555-1234;ErrorError,,,EU,true,Fred,Quicksand,202 Barney Blvd.,Rock City,MI,19728,313-555-1234' -H 'content-type: text/html'
Processed the customer data
4. 查看 Spring Boot camel route 运行日志
10:49:15.742 [AmqpProvider :(1):[amqp://localhost:5672]] INFO  o.a.q.jms.sasl.SaslMechanismFinder - Best match for SASL auth was: SASL-PLAIN
10:49:15.904 [AmqpProvider :(1):[amqp://localhost:5672]] INFO  org.apache.qpid.jms.JmsConnection - Connection ID:fc401104-03e1-4f44-804b-4d995c702cd6:1 connected to remote Broker: amqp://localhost:5672
10:49:16.013 [XNIO-3 task-1] INFO  _route1 - >> Completed JSON: {"company":{"name":"Rotobots","geo":"NA","active":true},"contact":{"firstName":"Bill","lastName":"Smith","streetAddr":"100 N Park Ave.","city":"Phoenix","state":"AZ","zip":"85017","phone":"602-555-1100"}}
10:49:16.017 [XNIO-3 task-1] INFO  _route1 - >> Completed JSON: {"company":{"name":"BikesBikesBikes","geo":"NA","active":true},"contact":{"firstName":"George","lastName":"Jungle","streetAddr":"1101 Smith St.","city":"Raleigh","state":"NC","zip":"27519","phone":"919-555-0800"}}
10:49:16.021 [XNIO-3 task-1] INFO  _route1 - >> Completed JSON: {"company":{"name":"CloudyCloud","geo":"EU","active":true},"contact":{"firstName":"Fred","lastName":"Quicksand","streetAddr":"202 Barney Blvd.","city":"Rock City","state":"MI","zip":"19728","phone":"313-555-1234"}}
10:49:16.024 [XNIO-3 task-1] INFO  _route1 - >> Exception : ErrorError,,,EU,true,Fred,Quicksand,202 Barney Blvd.,Rock City,MI,19728,313-555-1234

5. 查看 AMQ 控制台

查看 AMQ 控制台会发现有两个 Queue 创建:accountQueue、errorQueue。

部署到 OpenShift

1. 登录到 OpenShift
$ oc login https://master.apps.example.com:8443
2. 部署 Spring Boot Camel route 到 OpenShift
$ mvn fabric8:deploy -Popenshift
3. Expose rest-split-transform-amq
$ oc expose svc rest-split-transform-amq
4. 执行 REST Post 触发一次流程
$ curl -k http://HOST/rest/service/customers -X POST  -d 'Rotobots,NA,true,Bill,Smith,100 N Park Ave.,Phoenix,AZ,85017,602-555-1100;BikesBikesBikes,NA,true,George,Jungle,1101 Smith St.,Raleigh,NC,27519,919-555-0800;CloudyCloud,EU,true,Fred,Quicksand,202 Barney Blvd.,Rock City,MI,19728,313-555-1234;ErrorError,,,EU,true,Fred,Quicksand,202 Barney Blvd.,Rock City,MI,19728,313-555-1234' -H 'content-type: text/html'

results matching ""

    No results matching ""