"Гальваническая развязка" при помощи Apache Camel и Kafka - как сделать асинхронное синхронным?

Некоторые наши заказчики имеют вполне себе чёткое требование от службы информационной безопасности - нельзя делать прямые tcp запросы из интернетов в защищённый сегмент, а-та-та! В этом случае на помощь приходит брокер - либо классический JMS (AMQP) типа Active MQ или Rabbit MQ, либо потоковый - Kafka, Nats. Тогда, брокер устанавливается в DMZ, а потребители находятся в приватном сегменте. В итоге сообщения попадают в очереди или топики из DMZ, временно сохраняются. а потребители из приватного сегмента сами отправляют запрос на вычитку из очереди и не нарушаются требования безопасности. Есть источники, где описано как это делать при помощи разных инструментов - например тут и тут. В этом блоге я хотел бы описать как это делается при помощи Apache Camel и Kafka.

Для начала опишу сценарий, который необходимо реализовать:

  1. Пользователь отправляет HTTP запрос в REST сервис.
  2. В DMZ этот запрос отправляется в топик Kafka с названием in. При этом важно - HTTP соединение остаётся в режиме ожидания ответа!

  3. Система-потребитель топика in читает сообщение из топика и делает дальнейшие манипуляции (это нас в рамках блога интересовать не будет).
  4. Как только все манипуляции во внутренних системах сделаны сообщение с ответом отправляется в выходной топик Kafka с названием out.
  5. Входное и выходное сообщение имеют одинаковое свойство, по которому их можно сопоставить.
  6. После получения сообщения формируется ответ для пользователя на основании входящего HTTP сообщения и ответа из топика out.
  7. Сформированное сообщение отправляется пользователю в HTTP response.

Для пользователя это всё выглядит как синхронное взаимодействие, хотя под капотом живёт полный асинхрон. В данном реализации были использованы Apache Camel 3.9.0, запущенный на Apache Karaf 4.3.1. В качестве брокера использовался Kafka 2.11-2.2.1.

Давайте разберём ключевые моменты реализации.

  • На Apache Camel реализуем простой REST сервис

<restConfiguration component="jetty" port="9095" bindingMode="auto"/>

    <rest path="/say">

    <post uri="/bye">

      <to uri="direct:postbye"/>

    </post>

  </rest>

  • Пришедшее в теле json сообщение отправляем в топик Kafka

<to uri="kafka:in?brokers=localhost:9092&amp;groupId=hello" />

  • Затем этот же json отправляем в аггрегатор
  • И ставим этот поток в режим ожидания - подробнее на процессоре onCompleteProcessor остановимся позже

    <onCompletion mode="BeforeConsumer">

        <process ref="onCompleteProcessor" />

    </onCompletion>

  • В агрегаторе мы ждём второго сообщения с коррелирующим значением поля из json

<aggregate strategyRef="myAggregator" completionSize="2">

      <correlationExpression>

        <header>UniqueRequestID</header>

      </correlationExpression>

      <log message="@@@@After aggregation!!!!" loggingLevel="INFO"/>

    </aggregate>

  • В процессоре onCompleteProcessor происходит периодическая проверка агрегированных сообщений
  • Как только находится подходящая пара сообщений происходит формирование json HTTP ответа, который и возвращается пользователю.
18.05.2021