Часть 3. Принцип работы WSO2 API-M Analytics Server. Stream Integrator вместо Analytics Server.

Этой статьёй мы продолжаем цикл исследований работы сервера аналитики WSO2 API Manager и рассмотрим как использовать WSO2 Streaming Integrator. Безусловно, всё, что рассматривается ниже уже есть из коробки в сервере аналитики, но рассмотренный пример может быть полезен для:

  • Более глубокого понимания принципов работы
  • Если вам не нужен полноценный сервер аналитики со всеми виджетами и настройками, а интересует только обработка событий или сохранение их в готовую систему

Давайте начнём с подготовки WSO2 Streaming Integrator. Для этой статьи использовалась версия 7.0.2. Скачав и распаковав продукт сделайте следующие настройки:

  • Задайте отступ для портов - если запускаете все продукты на одной машине: https://ei.docs.wso2.com/en/7.1.0/streaming-integrator/ref/configuring-default-ports/#common-ports. Для этого блога было установлено значение 5.
  • Установите расширение для обработки wso2event - siddhi-map-wso2event и siddhi-io-wso2event, скопировав из в $SI_HOME/lib и перезапустив Streaming Integrator. Для этого блога использовались версии 5.0.1.
  • Далее - создаём Siddhi приложение со следующим содержанием:
@App:name('WSO2EventSampleApp')

@App:description('Receive WSO2 events')

@source(type='grpc', receiver.url='grpc://localhost:9806/org.wso2.grpc.EventService/consume', enable.ssl="FALSE" , @map(type='json'))
@source(type = 'wso2event', wso2.stream.id = 'org.wso2.apimgt.statistics.request:3.1.0', @map(type = 'wso2event'))
define stream InComingRequestStream (meta_clientType string,
    applicationConsumerKey string,
    applicationName string,
    applicationId string,
    applicationOwner string,
    apiContext string,
    apiName string,
    apiVersion string,
    apiResourcePath string,
    apiResourceTemplate string,
    apiMethod string,
    apiCreator string,
    apiCreatorTenantDomain string,
    apiTier string,
    apiHostname string,
    username string,
    userTenantDomain string,
    userIp string,
    userAgent string,
    requestTimestamp long,
    throttledOut bool,
    responseTime long,
    serviceTime long,
    backendTime long,
    responseCacheHit bool,
    responseSize long,
    protocol string,
    responseCode int,
    destination string,
    securityLatency long,
    throttlingLatency long,
    requestMedLat long,
    responseMedLat long,
    backendLatency long,
    otherLatency long,
    gatewayType string,
    label string);

@sink(type='file', @map(type='xml'), file.uri='/your/path/here/wso2event.xml')
@sink(type='log')
define stream TransformedWSO2Eventtream (meta_clientType string,
    applicationConsumerKey string,
    applicationName string,
    applicationId string,
    applicationOwner string,
    apiContext string,
    apiName string,
    apiVersion string,
    apiResourcePath string,
    apiResourceTemplate string,
    apiMethod string,
    apiCreator string,
    apiCreatorTenantDomain string,
    apiTier string,
    apiHostname string,
    username string,
    userTenantDomain string,
    userIp string,
    userAgent string,
    requestTimestamp long,
    throttledOut bool,
    responseTime long,
    serviceTime long,
    backendTime long,
    responseCacheHit bool,
    responseSize long,
    protocol string,
    responseCode int,
    destination string,
    securityLatency long,
    throttlingLatency long,
    requestMedLat long,
    responseMedLat long,
    backendLatency long,
    otherLatency long,
    gatewayType string,
    label string);

@info(name='Trim the event values')
from InComingRequestStream
select  meta_clientType, applicationConsumerKey, applicationName, applicationId, applicationOwner, apiContext,apiName, apiVersion, apiResourcePath, apiResourceTemplate, ifThenElse(str:length(apiMethod)>20,str:substr(apiMethod,0,20),apiMethod) as apiMethod,ifThenElse(str:length(apiCreator) > 150,str:substr(apiCreator,0,150), apiCreator) as  apiCreator, ifThenElse(str:length(apiCreatorTenantDomain) > 150,str:substr(apiCreatorTenantDomain,0,150), apiCreatorTenantDomain) as apiCreatorTenantDomain, apiTier, ifThenElse(str:length(apiHostname)>200, str:substr(apiHostname,0,200), apiHostname) as apiHostname, ifThenElse(str:length(username) > 150,str:substr(username,0,150), username) as username, ifThenElse(str:length(userTenantDomain) > 150,str:substr(userTenantDomain,0,150), userTenantDomain) as userTenantDomain, userIp, userAgent, requestTimestamp, throttledOut, responseTime, serviceTime, backendTime, responseCacheHit, responseSize, protocol, responseCode, destination, securityLatency, throttlingLatency, requestMedLat, responseMedLat, backendLatency, otherLatency, gatewayType, label
insert into TransformedWSO2Eventtream;    

Убедившись в том, что приложение успешно стартовало можно остановить Stream Integrator и перейти к настройке WSO2 API Manager. Для этого блога использовалась версия продукта 3.1.0. Из настроек потребуется сделать следующее:

  • В deployment.toml потребуется активировать работу с аналитикой и настроить порты. В нашем случае выглядит так:
[apim.analytics]
enable = true
receiver_url = "tcp://localhost:7616"
#store_api_url = "https://localhost:7444"
#username = "$ref{super_admin.username}"
#password = "$ref{super_admin.password}"
#event_publisher_type = "default"
#event_publisher_impl = "org.wso2.carbon.apimgt.usage.publisher.APIMgtUsageDataBridgeDataPublisher"
#publish_response_size = true

[[apim.analytics.url_group]]
analytics_url = ["tcp://localhost:7616"]
analytics_auth_url = ["ssl://localhost:7716"]
#type = "loadbalance"

#[[apim.analytics.url_group]]
#analytics_url =["tcp://analytics1:7612","tcp://analytics2:7612"]
#analytics_auth_url =["ssl://analytics1:7712","ssl://analytics2:7712"]
#type = "failover"
  • После этого запустите продукт и создайте тестовый API.
  • Затем запустите Streaming Integrator
  • Сделайте пробные вызовы API

В итоге, в логах интегратора и в файле /your/path/here/wso2event.xml должна появится следующая информация:

<events>
    <event>
        <meta_clientType>{&quot;correlationID&quot;:&quot;7324c2ac-95fe-46ce-bb3a-fef4a4332af1&quot;,&quot;keyType&quot;:&quot;PRODUCTION&quot;}</meta_clientType>
        <applicationConsumerKey>BTXW199T94mxk6s55CgCCc9UQU4a</applicationConsumerKey>
        <applicationName>DefaultApplication</applicationName>
        <applicationId>1</applicationId>
        <applicationOwner>admin</applicationOwner>
        <apiContext>/hw/1.0.0</apiContext>
        <apiName>HW</apiName>
        <apiVersion>1.0.0</apiVersion>
        <apiResourcePath>/</apiResourcePath>
        <apiResourceTemplate>/*</apiResourceTemplate>
        <apiMethod>GET</apiMethod>
        <apiCreator>admin</apiCreator>
        <apiCreatorTenantDomain>carbon.super</apiCreatorTenantDomain>
        <apiTier>Gold</apiTier>
        <apiHostname>localhost</apiHostname>
        <username>admin@carbon.super</username>
        <userTenantDomain>carbon.super</userTenantDomain>
        <userIp>127.0.0.1</userIp>
        <userAgent>Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.1 Safari/605.1.15</userAgent>
        <requestTimestamp>1591624150668</requestTimestamp>
        <throttledOut>false</throttledOut>
        <responseTime>83</responseTime>
        <serviceTime>1</serviceTime>
        <backendTime>82</backendTime>
        <responseCacheHit>false</responseCacheHit>
        <responseSize>0</responseSize>
        <protocol>https-8243</protocol>
        <responseCode>200</responseCode>
        <destination>http://www.mocky.io/v2/5185415ba171ea3a00704eed</destination>
        <securityLatency>0</securityLatency>
        <throttlingLatency>0</throttlingLatency>
        <requestMedLat>0</requestMedLat>
        <responseMedLat>0</responseMedLat>
        <backendLatency>82</backendLatency>
        <otherLatency>0</otherLatency>
        <gatewayType>SYNAPSE</gatewayType>
        <label>Synapse</label>
    </event>
</events>

Таким образом, мы получаем переотправку событий в нужную нам нам целевую систему - для этого примера это файловая система и журналы.

Обмен данными и событиями между продуктами и компонентами WSO2 происходит, в частности, при помощи библиотеки databridge. На стороне WSO2 API Manager компоненты моста находятся в $APIM_HOME/repository/components/plugins, на стороне Streaming Integrator - $SI_HOME/wso2/lib/features.

09.06.2020