Использование интеграционного шаблона Poll Enrich в WSO2 EI
Блоги
Навигационные полоски
Представьте ситуацию - вам надо забрать сообщение из очереди брокера по пришедшему REST запросу. Вполне нормальная задача, для которой есть отдельный шаблон EIP, который называется Poll Enrich - https://camel.apache.org/components/3.4.x/eips/pollEnrich-eip.html/. В WSO2 EI мы столкнулись с тем, что данная задача не имеет решения из коробки. Да, конечно, есть возможность получения сообщений из очередей и топиков брокера, но во всех сценариях это является точкой входа в маршрут:
- Прослушивание очереди - https://ei.docs.wso2.com/en/7.2.0/micro-integrator/use-cases/examples/jms_examples/consuming-jms/
- Использование Message Store и Message Processor - https://ei.docs.wso2.com/en/7.2.0/micro-integrator/develop/creating-artifacts/creating-a-message-processor/
Ни один из вариантов не даёт возможности вставить чтение из очереди в середине интеграционного маршрута. Можно воспользоваться Class медиатором и написать всё на Java. Но есть более элегантное решение - воспользоваться инфраструктурой коннекторов. Поискав в магазине готовых коннекторов WSO2, можно найти коннектор JMS - https://store.wso2.com/store/assets/esbconnector/details/e57e41ab-7795-47f1-af4d-071fed5755da. Из документации по размещённой ссылке мы попадаем на Github - https://github.com/wso2-extensions/esb-connector-jms/blob/1.0.1/README.md и становится ясно, что коннектор работает только на публикацию сообщений в брокер. Этот коннектор мы и будем дорабатывать до возможности чтения сообщений. Точнее на основе этого проекта сделаем отдельный под чтение.
Как создать новый коннектор можно прочитать тут - https://ei.docs.wso2.com/en/7.2.0/micro-integrator/develop/customizations/creating-new-connector/. Создав проект коннектора из архетипа Maven, перенесём и изменим нужное содержание из коннектора JMS. В перенесённом поменяем дискрипторы с publish на consume и зададим новое имя коннектора - https://github.com/wso2-extensions/esb-connector-jms/tree/1.0.1/src/main/resources.
В файле connector.xml будет примерно следующее:
<connector>
<component name="jms_custom" package="org.wso2.carbon.connector" >
<dependency component="config" />
<dependency component="consume" />
<description>JMS Message Consumer</description>
</component>
<icon>icon/icon.png</icon>
</connector>
Далее, займёмся Java кодом - изменим имена файлов и их содержание.
Фактическое изменение кода предполагается только в классе JMSConsumer (JMSPublisher до переименования - https://github.com/wso2-extensions/esb-connector-jms/blob/1.0.1/src/main/java/org/wso2/carbon/connector/jms/JMSPublisher.java ). В этот класс была добавлена логика чтения сообщения из очереди и записи сообщения в property:
try {
Message message = messageConsumer.receive(1000);
if (message != null) {
String text = ((TextMessage) message).getText();
log.info(">>>Got JMS message - " + text);
buildSynapseMessage(new ByteArrayInputStream(text.getBytes()), "CUSTOM_JMS_CONSUMER", msgCtx, "text/plain");
}
} catch (Exception e) {
handleException("Failed to consume JMS message", e);
}
и далее сохранение содержание в контексте (механизм подсмотрен в другом коннекторе - https://github.com/wso2-extensions/esb-connector-file )
try {
org.apache.axis2.context.MessageContext axis2MsgCtx = ((org.apache.synapse.core.axis2.Axis2MessageContext) msgCtx).getAxis2MessageContext();
Builder builder = selectSynapseMessageBuilder(msgCtx, contentType);
OMElement documentElement = builder.processDocument(inputStream, contentType, axis2MsgCtx);
//We need this to build the complete message before closing the stream
documentElement.toString();
if (StringUtils.isNotEmpty(contentPropertyName)) {
msgCtx.setProperty(contentPropertyName, documentElement);
} else {
msgCtx.setEnvelope(TransportUtils.createSOAPEnvelope(documentElement));
}
} catch (AxisFault e) {
throw new Exception("Axis2 error while building message from Stream", e);
}
После сборки проекта можно найти файл с расширением zip в папке target - это и есть нужный нам коннектор.
Далее создадим интеграционный проект в студии и маршрут с использованием коннектора. Чтобы иметь возможность применить коннектор в редакторе, нужно сделать его импорт, как описано тут - https://ei.docs.wso2.com/en/7.2.0/micro-integrator/develop/creating-artifacts/adding-connectors/#importing-connectors. Осталось создать, например REST API и внутри маршрута добавить наш коннектор:
<?xml version="1.0" encoding="UTF-8"?>
<api context="/consumer" name="JMSConsumerAPI" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="GET" uri-template="/consume">
<inSequence>
<log level="full"/>
<jms_custom.init>
<connectionFactoryName>QueueConnectionFactory</connectionFactoryName>
<javaNamingProviderUrl>amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5675'</javaNamingProviderUrl>
<destinationType>queue</destinationType>
<connectionPoolSize>20</connectionPoolSize>
<destinationName>JMSConsumerTestQueue</destinationName>
<javaNamingFactoryInitial>org.wso2.andes.jndi.PropertiesFileInitialContextFactory</javaNamingFactoryInitial>
</jms_custom.init>
<jms_custom.consumeMessage/>
<log level="custom">
<property expression="$ctx:CUSTOM_JMS_CONSUMER" name="JMSMessage1"/>
</log>
<log level="full"/>
<respond/>
</inSequence>
<outSequence/>
<faultSequence/>
</resource>
</api>
Для тестов использовался брокер WSO2 из состава WSO2 EI 6.6.0. В панели управления брокера создайте очередь JMSConsumerTestQueue и отправьте несколько тестовых сообщений. После этого, при помощи curl или Postman, отправьте сообщения в REST API по адресу http://localhost:8290/consumer/consume методом GET. В логах должно появиться следующее:
[2021-09-02 17:35:11,464] INFO {LogMediator} - {api:JMSConsumerAPI} JMSMessage1 = test
Обращаю внимание, что данный пример был сделан для проверки концепции.
Удачных интеграций!
- 6.2 (12)
- 7.0 (12)
- activiti (14)
- apache camel (6)
- camel (11)
- devcon (6)
- devops (5)
- emdev (9)
- emdev limited (9)
- entaxy (13)
- esb (10)
- fuse (5)
- gartner (7)
- google apps (6)
- jboss (5)
- liferay (143)
- liferay 7.1 (11)
- liferay dxp (11)
- liferay7 (12)
- openshift (8)
- osgi (5)
- redhat (15)
- rest (6)
- wso2 (70)
- wso2 api-m (10)
- wso2 ei (8)
- wso2ei (5)
- wso2esb (7)
- wso2is (8)
- емдев (11)
Сайт использует файлы cookie. Они позволяют узнавать вас и получать информацию о вашем пользовательском опыте. Это нужно, чтобы улучшать сайт. Посещая страницы сайта и предоставляя свои данные, вы позволяете нам предоставлять их сторонним партнерам. Если вы согласны, продолжайте пользоваться сайтом. Если нет – установите специальные настройки в браузере или обратитесь в техподдержку.