Использование интеграционного шаблона Poll Enrich в WSO2 EI

Представьте ситуацию - вам надо забрать сообщение из очереди брокера по пришедшему REST запросу. Вполне нормальная задача, для которой есть отдельный шаблон EIP, который называется Poll Enrich - https://camel.apache.org/components/3.4.x/eips/pollEnrich-eip.html/. В WSO2 EI мы столкнулись с тем, что данная задача не имеет решения из коробки. Да, конечно, есть возможность получения сообщений из очередей и топиков брокера, но во всех сценариях это является точкой входа в маршрут:

Ни один из вариантов не даёт возможности вставить чтение из очереди в середине интеграционного маршрута. Можно воспользоваться Class медиатором и написать всё на Java. Но есть более элегантное решение - воспользоваться инфраструктурой коннекторов. Поискав в магазине готовых коннекторов WSO2, можно найти коннектор JMS - https://store.wso2.com/store/assets/esbconnector/details/e57e41ab-7795-47f1-af4d-071fed5755da. Из документации по размещённой ссылке мы попадаем на Github - https://github.com/wso2-extensions/esb-connector-jms/blob/1.0.1/README.md и становится ясно, что коннектор работает только на публикацию сообщений в брокер. Этот коннектор мы и будем дорабатывать до возможности чтения сообщений. Точнее на основе этого проекта сделаем отдельный под чтение.

Как создать новый коннектор можно прочитать тут - https://ei.docs.wso2.com/en/7.2.0/micro-integrator/develop/customizations/creating-new-connector/. Создав проект коннектора из архетипа Maven, перенесём и изменим нужное содержание из коннектора JMS. В перенесённом поменяем дискрипторы с publish на consume и зададим новое имя коннектора - https://github.com/wso2-extensions/esb-connector-jms/tree/1.0.1/src/main/resources.

В файле connector.xml будет примерно следующее:

<connector>

    <component name="jms_custom" package="org.wso2.carbon.connector" >

        <dependency component="config" />

        <dependency component="consume" />

        <description>JMS Message Consumer</description>

    </component>

    <icon>icon/icon.png</icon>

</connector>

Далее, займёмся Java кодом - изменим имена файлов и их содержание. 

Фактическое изменение кода предполагается только в классе JMSConsumer (JMSPublisher до переименования - https://github.com/wso2-extensions/esb-connector-jms/blob/1.0.1/src/main/java/org/wso2/carbon/connector/jms/JMSPublisher.java ). В этот класс была добавлена логика чтения сообщения из очереди и записи сообщения в property:

try {

    Message message = messageConsumer.receive(1000);

    if (message != null) {

    String text = ((TextMessage) message).getText();

    log.info(">>>Got JMS message - " + text);

    buildSynapseMessage(new ByteArrayInputStream(text.getBytes()), "CUSTOM_JMS_CONSUMER", msgCtx, "text/plain");

    }

} catch (Exception e) {

handleException("Failed to consume JMS message", e);

}

и далее сохранение содержание в контексте (механизм подсмотрен в другом коннекторе - https://github.com/wso2-extensions/esb-connector-file )

try {

            org.apache.axis2.context.MessageContext axis2MsgCtx = ((org.apache.synapse.core.axis2.Axis2MessageContext) msgCtx).getAxis2MessageContext();

            Builder builder = selectSynapseMessageBuilder(msgCtx, contentType);

            OMElement documentElement = builder.processDocument(inputStream, contentType, axis2MsgCtx);

            //We need this to build the complete message before closing the stream

            documentElement.toString();

            if (StringUtils.isNotEmpty(contentPropertyName)) {

                msgCtx.setProperty(contentPropertyName, documentElement);

            } else {

                msgCtx.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement));

            }

        } catch (AxisFault e) {

            throw new Exception("Axis2 error while building message from Stream", e);

        }

После сборки проекта можно найти файл с расширением zip в папке target - это и есть нужный нам коннектор.

Далее создадим интеграционный проект в студии и маршрут с использованием коннектора. Чтобы иметь возможность применить коннектор в редакторе, нужно сделать его импорт, как описано тут - https://ei.docs.wso2.com/en/7.2.0/micro-integrator/develop/creating-artifacts/adding-connectors/#importing-connectors. Осталось создать, например REST API и внутри маршрута добавить наш коннектор:

<?xml version="1.0" encoding="UTF-8"?>

<api context="/consumer" name="JMSConsumerAPI" xmlns="http://ws.apache.org/ns/synapse">

    <resource methods="GET" uri-template="/consume">

        <inSequence>

            <log level="full"/>

            <jms_custom.init>

                <connectionFactoryName>QueueConnectionFactory</connectionFactoryName>

                <javaNamingProviderUrl>amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5675'</javaNamingProviderUrl>

                <destinationType>queue</destinationType>

                <connectionPoolSize>20</connectionPoolSize>

                <destinationName>JMSConsumerTestQueue</destinationName>

                <javaNamingFactoryInitial>org.wso2.andes.jndi.PropertiesFileInitialContextFactory</javaNamingFactoryInitial>

            </jms_custom.init>

            <jms_custom.consumeMessage/>

            <log level="custom">

                <property expression="$ctx:CUSTOM_JMS_CONSUMER" name="JMSMessage1"/>

            </log>

            <log level="full"/>

            <respond/>

        </inSequence>

        <outSequence/>

        <faultSequence/>

    </resource>

</api>

Для тестов использовался брокер WSO2 из состава WSO2 EI 6.6.0. В панели управления брокера создайте очередь JMSConsumerTestQueue и отправьте несколько тестовых сообщений. После этого, при помощи curl или Postman, отправьте сообщения в REST API по адресу http://localhost:8290/consumer/consume методом GET. В логах должно появиться следующее:

[2021-09-02 17:35:11,464]  INFO {LogMediator} - {api:JMSConsumerAPI} JMSMessage1 = test

Обращаю внимание, что данный пример был сделан для проверки концепции.

Удачных интеграций!

02.09.2021