"Гальваническая развязка" при помощи Apache Camel и Kafka - как сделать асинхронное синхронным?
Некоторые наши заказчики имеют вполне себе чёткое требование от службы информационной безопасности - нельзя делать прямые tcp запросы из интернетов в защищённый сегмент, а-та-та! В этом случае на помощь приходит брокер - либо классический JMS (AMQP) типа Active MQ или Rabbit MQ, либо потоковый - Kafka, Nats. Тогда, брокер устанавливается в DMZ, а потребители находятся в приватном сегменте. В итоге сообщения попадают в очереди или топики из DMZ, временно сохраняются. а потребители из приватного сегмента сами отправляют запрос на вычитку из очереди и не нарушаются требования безопасности. Есть источники, где описано как это делать при помощи разных инструментов - например тут и тут. В этом блоге я хотел бы описать как это делается при помощи Apache Camel и Kafka.
Для начала опишу сценарий, который необходимо реализовать:
- Пользователь отправляет HTTP запрос в REST сервис.
-
В DMZ этот запрос отправляется в топик Kafka с названием in. При этом важно - HTTP соединение остаётся в режиме ожидания ответа!
- Система-потребитель топика in читает сообщение из топика и делает дальнейшие манипуляции (это нас в рамках блога интересовать не будет).
- Как только все манипуляции во внутренних системах сделаны сообщение с ответом отправляется в выходной топик Kafka с названием out.
- Входное и выходное сообщение имеют одинаковое свойство, по которому их можно сопоставить.
- После получения сообщения формируется ответ для пользователя на основании входящего HTTP сообщения и ответа из топика out.
- Сформированное сообщение отправляется пользователю в HTTP response.
Для пользователя это всё выглядит как синхронное взаимодействие, хотя под капотом живёт полный асинхрон. В данном реализации были использованы Apache Camel 3.9.0, запущенный на Apache Karaf 4.3.1. В качестве брокера использовался Kafka 2.11-2.2.1.
Давайте разберём ключевые моменты реализации.
- На Apache Camel реализуем простой REST сервис
<restConfiguration component="jetty" port="9095" bindingMode="auto"/>
<rest path="/say">
<post uri="/bye">
<to uri="direct:postbye"/>
</post>
</rest>
- Пришедшее в теле json сообщение отправляем в топик Kafka
<to uri="kafka:in?brokers=localhost:9092&groupId=hello" />
- Затем этот же json отправляем в аггрегатор
- И ставим этот поток в режим ожидания - подробнее на процессоре onCompleteProcessor остановимся позже
<onCompletion mode="BeforeConsumer">
<process ref="onCompleteProcessor" />
</onCompletion>
- В агрегаторе мы ждём второго сообщения с коррелирующим значением поля из json
<aggregate strategyRef="myAggregator" completionSize="2">
<correlationExpression>
<header>UniqueRequestID</header>
</correlationExpression>
<log message="@@@@After aggregation!!!!" loggingLevel="INFO"/>
</aggregate>
- В процессоре onCompleteProcessor происходит периодическая проверка агрегированных сообщений
- Как только находится подходящая пара сообщений происходит формирование json HTTP ответа, который и возвращается пользователю.
- 6.2 (12)
- 7.0 (12)
- activiti (14)
- apache camel (6)
- camel (11)
- devcon (6)
- devops (5)
- emdev (9)
- emdev limited (9)
- entaxy (13)
- esb (10)
- fuse (5)
- gartner (7)
- google apps (6)
- jboss (5)
- liferay (143)
- liferay 7.1 (11)
- liferay dxp (11)
- liferay7 (12)
- openshift (8)
- osgi (5)
- redhat (15)
- rest (6)
- wso2 (70)
- wso2 api-m (10)
- wso2 ei (8)
- wso2ei (5)
- wso2esb (7)
- wso2is (8)
- емдев (11)
Сайт использует файлы cookie. Они позволяют узнавать вас и получать информацию о вашем пользовательском опыте. Это нужно, чтобы улучшать сайт. Посещая страницы сайта и предоставляя свои данные, вы позволяете нам предоставлять их сторонним партнерам. Если вы согласны, продолжайте пользоваться сайтом. Если нет – установите специальные настройки в браузере или обратитесь в техподдержку.