Блоги

Apache Camel и RabbitMQ: Дружба через боль

В рамках работы над нашей интеграционной платформой Entaxy мы решили проверить как всё будет функционировать, если переключиться с брокера ActiveMQ Artemis, который мы используем в базе на что-то другое. В качестве подопытного кролика мы, собственно, и выбрали Кролика(RabbitMQ). Нам важно было проверить работу очередей. Критериями успешности считались:

  • Штатная работа потребителя очереди в начале маршрута
  • Штатная работа в рамках компонента pollEnrich
  • Обеспечение гарантированной доставки - сообщение не удаляется из очереди до момента подтверждения доставки или до момента достижения заданного количества попыток повторной отправки

Для проверки были использованы Java 11, Apache Camel 3.11.3 на Apache Karaf 4.2.9, Apache ActiveMQ Artemis 2.17.0 и RabbitMQ 3.9.8. 

С версии 3.8 в Apache Camel есть целых 3 компонента, которые так или иначе связаны с AMQP и RabbitMQ - это AMQP, rabbitmq и spring-rabbitmq. Для работы с Artemis мы использовали простой компонент AMQP. Все критерии проверки были пройдены - маршрут, который начинается с 

<from uri="amqp:queue:foo" />

и  маршрут, содержащий в себе

<pollEnrich>
    <constant>amqp:queue:foo</constant>
</pollEnrich>

работают в штатном режиме, вычитывая сообщения одно за одним. В случае polling consumer, для обеспечения гарантированной доставки потребовалось добавить транзакционность - после этого сообщение стало исчезать из очереди только после заданного количества попыток повторной отправки (естественно, в случае возникновения ошибки). Если речь идёт об обычном консьюмере, то достаточно указать в параметрах acknowledgementModeName=CLIENT_ACKNOWLEDGE. Итоговый рабочий вариант выглядит следующим образом:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 blueprint-1.0.0.xsd ">

    <!-- Artemis Connection Factory -->
    <bean id="connectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
       <property name="remoteURI" value="amqp://localhost:5672" />
       <property name="username" value="entaxy" />
       <property name="password" value="entaxy" />
     </bean>

    <camelContext
        xmlns="http://camel.apache.org/schema/blueprint"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://camel.apache.org/schema/blueprint camel-blueprint-3.4.5.xsd">

        <route id="consume">
            <from uri="amqp:queue:foo?acknowledgementModeName=CLIENT_ACKNOWLEDGE" />
            <onException>
                <exception>java.lang.Exception</exception>
                <redeliveryPolicy maximumRedeliveries="2" redeliveryDelay="10000" retryAttemptedLogLevel="WARN"/>
                <handled><constant>true</constant></handled>
                <to uri="direct:error" />
            </onException>
            
            <log message=">>>Consumer start"/>

            <throwException exceptionType="java.lang.NullPointerException"
                                message="Not supported, sorjan"/>


            <log message=">>>Body:  ${body}"/>
            <log message=">>>Headers:  ${headers}"/>
            <when>
                <simple>${body} == null</simple>
                <log message=">>>@@@@@@@@@@@@@@@@@@@@@@ Body is null!!!!"/>
            </when>            
        </route>

        <route>
            <from uri="direct:error" />
            <log message=">>>Error happens body:  ${body}"/>
        </route>
    </camelContext>
</blueprint>

 

Проверку RabbitMQ мы начали с компонента rabbitmq. Настройка компонента достаточно проста - надо только добавить ConnectionFactory:

<bean id="rabbitConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
      <property name="host" value="localhost"/>
      <property name="port" value="5672"/>
      <property name="username" value="guest"/>
      <property name="password" value="guest"/>
    </bean>

Первое с чем мы столкнулись и после чего решили отложить этот компонент - это невозможность чтения 1-го сообщения за раз. Есть ряд настроек, типа prefetchCount, но они не применяются - консьюмер вычитывает сразу пачку сообщений, нарушая тем самым принцип гарантированной доставки - при рестарте Карафа эти сообщения исчезают. Если попробовать выставить значение для prefetchSize > 0, то при попытке запуска получим вот такое в логах:

Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - prefetch_size!=0 (1), class-id=60, method-id=10)

Следующий компонент, к которому мы обратились - это spring-rabbitmq, доступный с версии Apache Camel 3.8. Как видно из названия, этот компонент использует внутри себя Spring, в частности, spring-amqp. Начало проверки выглядит вполне себе многообещающим - 

<from uri="spring-rabbitmq:ex1?queues=bar&amp;prefetchCount=1" />

и без использования транзакций вычитывает одно сообщение за раз и отправляет подтверждение о доставке только в конце маршрута. Если добавить в маршрут ошибку, то сработает redeliveryPolicy, который определяется в обработчике ошибок:

<onException>
   <exception>java.lang.Exception</exception>
   <redeliveryPolicy maximumRedeliveries="2" redeliveryDelay="10000" retryAttemptedLogLevel="WARN"/>
  <handled><constant>true</constant></handled>
  <to uri="direct:error" />
</onException>

 Всё несколько печальнее при использовании в pollEnrich - вычитывается одно сообщение за раз, но подтверждение о доставке отправляется сразу. Мгновенное подтверждение доставки нас не устраивает, так как в маршруте может случиться исключение и сообщение, теоретически, будет потеряно. В отличие от обычного консьюмера, polling консьюмер использует класс RabbitTemplate, в котором есть поддержка транзакций (флаг channelTransacted), но Camel никак это не использует. Как итог - при использовании RabbitMQ в модели поллинга есть риск потери сообщений. 

В рамках этого блога мы попытались обобщить возможности работы из Apache Camel с 2-мя брокерами, которые используют AMQP протокол - это Apache ActiveMQ Artemis и RabbitMQ. В случае с RabbitMQ есть вопросы к работе стандартных компонентов Camel - они не смогли обеспечить полноценный объём функционала.

21.12.2021