Publish Records and Fix Errors

Overview

This is a continue of Split and Transform Rest, but publish the individual JSON Account records to message input queues, including an error queue in the event of a processing error. You also develop a strategy to correct erroneous messages and redeliver them.

The route implements an interception strategy when IllegalArgumentException is thrown. The erroneous record is published to a special topic and an error code and error message are added.

A separate route listens for error messages. When this route is triggered, the erroneous message and message code are saved into an error table with the status of ERROR. To correct the record, you run a script against the error table and modify the status to FIXED. You use another route to poll the database for FIXED records. This route republishes the message on the original queue and updates the status to CLOSED.

Goals

  • Publish records from the processing to an error topic

  • Implement a custom error handling strategy to capture the erroneous messages

  • Fix the messages and republish them

index.png

camelContext

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
    <!-- Define a traditional camel context here -->
    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <propertyPlaceholder id="properties" location="route.properties"/>

        <endpoint id="csv2json" uri="dozer:csv2json2?sourceModel=org.acme.Customer&amp;targetModel=org.globex.Account&amp;marshalId=json&amp;unmarshalId=csv&amp;mappingFile=transformation.xml"/>
        <!-- CSV Input & JSon OutPut DataFormat -->
        <dataFormats>
            <bindy classType="org.acme.Customer" id="csv" type="Csv"/>
            <json id="json" library="Jackson"/>
        </dataFormats>
        <restConfiguration bindingMode="off" component="servlet" contextPath="/rest"/>
        <rest apiDocs="true" id="rest-entry" path="/service">
            <post id="rest-entry-post" uri="/customers">
                <to uri="direct:inbox"/>
            </post>
        </rest>
        <route id="_injectroute" streamCache="true">
            <from id="_fromIR1" uri="direct:inbox"/>
            <setExchangePattern id="_setExchangePattern3" pattern="InOnly"/>
            <to id="_to1" uri="amqp:queue:inputQueue"/>
            <transform id="_transform1">
                <constant>Processed the customer data</constant>
            </transform>
        </route>
        <route id="_route1" streamCache="true">
            <!-- Consume files from input directory -->
            <from id="_from1" uri="amqp:queue:inputQueue"/>
            <onException id="_onException1">
                <exception>java.lang.IllegalArgumentException</exception>
                <handled>
                    <constant>true</constant>
                </handled>
                <log id="_log1" message=">> Exception : ${body}"/>
                <setExchangePattern id="_setExchangePattern1" pattern="InOnly"/>
                <to id="_to2" uri="direct:error"/>
            </onException>
            <split id="_split1">
                <tokenize token=";"/>
                <to id="_to3" ref="csv2json"/>
                <setExchangePattern id="_setExchangePattern2" pattern="InOnly"/>
                <to id="_to4" uri="amqp:queue:accountQueue"/>
                <log id="_log2" message=">> Completed JSON: ${body}"/>
            </split>
        </route>
        <!-- Publish the error code and error message on a topic -->
        <route id="direct-error-queue">
            <from id="_from2" uri="direct:error"/>
            <setHeader headerName="error-code" id="_setHeader1">
                <constant>111</constant>
            </setHeader>
            <setHeader headerName="error-message" id="_setHeader2">
                <simple>${exception.message}</simple>
            </setHeader>
            <setHeader headerName="message" id="_setHeader3">
                <simple>${body}</simple>
            </setHeader>
            <log id="_log3" logName="org.fuse.usecase"
                loggingLevel="DEBUG" message="!!!! ERROR NOTIFICATION SEND"/>
            <to id="error-queue-endpoint" uri="amqp:"/>
        </route>
        <!-- Consume the Topic message and publish it into the DB -->
        <route id="error-queue-sql">
            <from id="_from3" uri="amqp:"/>
            <log id="_log4" logName="org.fuse.usecase"
                loggingLevel="DEBUG" message="!!!! NOTIFICATION RECEIVED"/>
            <log id="_log5" logName="org.fuse.usecase"
                loggingLevel="DEBUG" message=">> Error code : ${header.error-code}, Error Message : ${header.error-message}"/>
            <to id="_to5" uri="sql:insert into USECASE.T_ERROR(ERROR_CODE,ERROR_MESSAGE,MESSAGE,STATUS)                      values (:#${header.error-code}, :#${header.error-message}, :#${header.message}, 'ERROR');"/>
        </route>
        <!-- Inject correct record/message and update their status to CLOSE -->
        <route id="sql-queue-input">
            <from id="_from4" uri="sql:select MESSAGE, ID from USECASE.T_ERROR where STATUS = 'FIXED' ?consumer.onConsume=update USECASE.T_ERROR set STATUS='CLOSE' where ID = :#ID"/>
            <setBody id="_setBody1">
                <simple>${body[message]}</simple>
            </setBody>
            <log id="_log6" message=">> Body : ${body}"/>
            <to id="_to6" uri="amqp:queue:inputQueue"/>
        </route>

    </camelContext>
</beans>

本地测试

1. 创建并启动 AMQ broker
$ cd amq-broker-7.2.2/
$ ./bin/artemis create  --user admin --password password --role admin --allow-anonymous y ./instances/broker1
$ cd instances/broker1/
$ ./bin/artemis run
2. 启动 Spring Boot camel route
$ cd fuse-get-started/rest-publish-and-fix-errors/
$ mvn spring-boot:run
3. 测试
$ curl -k http://localhost:8080/rest/service/customers -X POST  -d 'Rotobots,NA,true,Bill,Smith,100 N Park Ave.,Phoenix,AZ,85017,602-555-1100;BikesBikesBikes,NA,true,George,Jungle,1101 Smith St.,Raleigh,NC,27519,919-555-0800;CloudyCloud,EU,true,Fred,Quicksand,202 Barney Blvd.,Rock City,MI,19728,313-555-1234;ErrorError,,,EU,true,Fred,Quicksand,202 Barney Blvd.,Rock City,MI,19728,313-555-1234' -H 'content-type: text/html'
Processed the customer data

部署到 OpenShift

1. 部署 AMQ
$ oc new-app --template=amq-broker-72-basic \
   -e AMQ_PROTOCOL=openwire,amqp,stomp,mqtt,hornetq \
   -e AMQ_USER=admin \
   -e AMQ_PASSWORD=password \
   -e AMQ_ROLE=admin
2. 部署 PostgreSQL
$ oc new-app \
    -e POSTGRESQL_USER=postgres \
    -e POSTGRESQL_PASSWORD=postgres \
    -e POSTGRESQL_DATABASE=sampledb \
    postgresql-persistent
3. 连接到 PostgreSQL
$ oc get pods | grep postgresql
$ oc rsh postgresql-1-wczss
sh-4.2$ createdb -h localhost -p 5432 -U postgres sampledb
sh-4.2$ PGPASSWORD=$POSTGRESQL_PASSWORD psql -h postgresql $POSTGRESQL_DATABASE $POSTGRESQL_USER
psql (9.6.10)
Type "help" for help.

sampledb=#
4. 创建表
CREATE SCHEMA USECASE;
CREATE TABLE USECASE.T_ACCOUNT (
    id  SERIAL PRIMARY KEY,
    CLIENT_ID integer,
    SALES_CONTACT VARCHAR(30),
    COMPANY_NAME VARCHAR(50),
    COMPANY_GEO CHAR(20) ,
    COMPANY_ACTIVE BOOLEAN,
    CONTACT_FIRST_NAME VARCHAR(35),
    CONTACT_LAST_NAME VARCHAR(35),
    CONTACT_ADDRESS VARCHAR(255),
    CONTACT_CITY VARCHAR(40),
    CONTACT_STATE VARCHAR(40),
    CONTACT_ZIP VARCHAR(10),
    CONTACT_EMAIL VARCHAR(60),
    CONTACT_PHONE VARCHAR(35),
    CREATION_DATE TIMESTAMP,
    CREATION_USER VARCHAR(255)
);
CREATE TABLE USECASE.T_ERROR (
    ID SERIAL PRIMARY KEY,
    ERROR_CODE VARCHAR(4) NOT NULL,
    ERROR_MESSAGE VARCHAR(255),
    MESSAGE VARCHAR(512),
    STATUS CHAR(6)
);
5. 部署 Spring Boot camel route
$ cd fuse-get-started/rest-publish-and-fix-errors/
$ mvn fabric8:deploy -Popenshift
6. 测试
$ oc expose svc rest-publish-and-fix-errors
$ curl -k http://rest-publish-and-fix-errors-user4-fuse.apps.0d94.openshift.opentlc.com/rest/service/customers -X POST  -d 'Rotobots,NA,true,Bill,Smith,100 N Park Ave.,Phoenix,AZ,85017,602-555-1100;BikesBikesBikes,NA,true,George,Jungle,1101 Smith St.,Raleigh,NC,27519,919-555-0800;CloudyCloud,EU,true,Fred,Quicksand,202 Barney Blvd.,Rock City,MI,19728,313-555-1234;ErrorError,,,EU,true,Fred,Quicksand,202 Barney Blvd.,Rock City,MI,19728,313-555-1234' -H 'content-type: text/html'
Processed the customer data
7. 更新数据库中 ERROR 状态
sampledb=# SELECT id, error_code, status FROM USECASE.T_ERROR;
 id | error_code | status
----+------------+--------
  1 | 111        | ERROR
(1 row)

sampledb=# UPDATE USECASE.T_ERROR SET STATUS='FIXED' WHERE ID=1;
UPDATE 1

results matching ""

    No results matching ""