【问题标题】:How to work with wso2CEP3.0.0 and activemq 5.8.0如何使用 wso2CEP3.0.0 和 activemq 5.8.0
【发布时间】:2023-03-16 19:18:02
【问题描述】:

我正在使用 wso2cep 3.0.0 和 activemq5.8.0 根据 CEP 文档,我希望使用 CEP 发布事件。 为此,我开始使用 2 定义 QUEUES 的 activemq,名称为 jmsProxy 用于传入消息,JmsProxy 用于输出消息。我在 CEP 库中添加了所需的 jar activemq-broker-5.8.0.jar,activemq-client-5.8.0.jar,axiom.jar,geronimo-j2ee-management_1.1_spec-1.0.1.jar,geronimo-jms_1.1_spec-1.1.1 .jar,hawtbuf-1.2.jar,xpp3-1.1.4c.jar,xstream-1.4.4.jar

我的配置是这样的 InputEventAdaptor

<?xml version="1.0" encoding="UTF-8"?>
<inputEventAdaptor name="jmsProxy" statistics="disable" trace="enable"
  type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
  <property name="java.naming.provider.url">tcp://localhost:61616</property>
  <property name="transport.jms.SubscriptionDurable">true</property>
  <property name="transport.jms.DurableSubscriberName">jmsProxy</property>
  <property name="transport.jms.UserName">admin</property>
  <property name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</property>
  <property name="transport.jms.Password">admin</property>
  <property name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</property>
  <property name="transport.jms.DestinationType">queue</property>
</inputEventAdaptor>

上面的传入消息将从 jmsProxy 队列中选择消息 但它无法从 jmsProxy Queue 中选择消息。我将如何启动它以将消息放入 CEPoutputEventAdaptor

<?xml version="1.0" encoding="UTF-8"?>
<outputEventAdaptor name="JmsProxy" statistics="disable" trace="disable"
  type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
  <property name="java.naming.security.principal">admin</property>
  <property name="java.naming.provider.url">tcp://localhost:61616</property>
  <property name="java.naming.security.credentials">admin</property>
  <property name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</property>
  <property name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</property>
  <property name="transport.jms.DestinationType">queue</property>
</outputEventAdaptor>

像这样的事件生成器配置

<?xml version="1.0" encoding="UTF-8"?>
<eventBuilder name="ReadingsDtoBuilder" statistics="disable"
    trace="disable" xmlns="http://wso2.org/carbon/eventbuilder">
<from eventAdaptorName="jmsProxy" eventAdaptorType="jmsProxy">
    <property name="transport.jms.Destination">JmsProxy</property>
</from>
<mapping customMapping="disable"
    parentXpath="//ReadingsLiteTaildtos" type="xml">
    <property>
        <from xpath="//ReadingsLiteTaildto/ParameterId"/>
        <to name="meta_parameterId" type="string"/>
    </property>
    <property>
        <from xpath="//ReadingsLiteTaildto/Slno"/>
        <to name="meta_slno" type="string"/>
    </property>
    <property>
        <from xpath="//ReadingsLiteTaildto/FinalValue"/>
        <to name="finalValue" type="int"/>
    </property>
    <property>
        <from xpath="//ReadingsLiteTaildto/InputText"/>
        <to name="inputText" type="string"/>
    </property>
    <property>
        <from xpath="//ReadingsLiteTaildto/InputValue"/>
        <to name="inputValue" type="double"/>
    </property>
</mapping>
<to streamName="org.sample.readings.dto.stream" version="1.0.0"/>
</eventBuilder>

执行计划可以如下.like this

<?xml version="1.0" encoding="UTF-8"?>
<executionPlan name="ReadingsAnalyzer" statistics="disable"
  trace="disable" xmlns="http://wso2.org/carbon/eventprocessor">
  <description>This execution plan analyzes readings and triggers notifications based on     threshold.</description>
  <siddhiConfiguration>
    <property name="siddhi.enable.distributed.processing">false</property>
    <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
  </siddhiConfiguration>
  <importedStreams>
    <stream as="readings" name="org.sample.readings.dto.stream" version="1.0.0"/>
  </importedStreams>
  <queryExpressions><![CDATA[from readings[finalValue > 100]
select *
insert into notificationStream;]]></queryExpressions>
  <exportedStreams>
    <stream name="notificationStream" valueOf="notificationStream" version="1.0.0"/>
  </exportedStreams>
</executionPlan>

我在 stream-manager-config.xml 中定义了流,类似于以下内容。

<streamDefinition name="org.sample.readings.dto.stream" version="1.0.0">
<metaData>
        <property name="parameterId" type="STRING"/>
        <property name="slno" type="STRING"/>
</metaData>
    <payloadData>
        <property name="finalValue" type="INT"/>
        <property name="inputText" type="STRING"/>
        <property name="inputValue" type="DOUBLE"/>
    </payloadData>
</streamDefinition>
<streamDefinition name="notificationStream" version="1.0.0">
<metaData>
        <property name="parameterId" type="STRING"/>
        <property name="slno" type="STRING"/>
</metaData>
    <payloadData>
        <property name="finalValue" type="INT"/>
        <property name="inputText" type="STRING"/>
        <property name="inputValue" type="DOUBLE"/>
    </payloadData>
</streamDefinition>.

当我向我的 jmsProxy 队列发送任何消息时,它看起来一切正常,该消息没有反映给 CEP 事件,我在 CEP

中收到此消息
[2014-02-18 11:57:53,159]  INFO - {EventBuilderDeployer}  Event Builder undeployed successfully : ReadingsDtoBuilder.xml
[2014-02-18 11:57:53,160]  INFO - {EventBuilderDeployer}  Event builder deployment held back and in inactive state :ReadingsDtoBuilder, Waiting for Input Event Adaptor dependency :jmsProxy
[2014-02-18 12:03:58,006]  INFO - {InputEventAdaptorConfigurationFilesystemInvoker}  Input Event Adaptor configuration deleted from file system : jmsProxy.xml
[2014-02-18 12:03:58,006]  INFO - {InputEventAdaptorDeployer}  Input Event Adaptor undeployed successfully : jmsProxy.xml
[2014-02-18 12:03:58,008]  INFO - {InputEventAdaptorConfigurationFilesystemInvoker}  Input Event Adaptor configuration saved in th filesystem : jmsProxy
[2014-02-18 12:03:58,009]  INFO - {InputEventAdaptorDeployer}  Input Event Adaptor deployed successfully and in active state : jmsProxy
[2014-02-18 12:03:58,009]  INFO - {EventBuilderDeployer}  Event Builder undeployed successfully : ReadingsDtoBuilder.xml
[2014-02-18 12:03:58,009]  INFO - {EventBuilderDeployer}  Event builder deployment held back and in inactive state :ReadingsDtoBuilder, Waiting for Input Event Adaptor dependency :jmsProxy

【问题讨论】:

标签: wso2 wso2esb wso2carbon complex-event-processing siddhi


【解决方案1】:

看你的配置,好像是事件生成器配置不对。事件生成器配置中的事件适配器类型应为“jms”。

<from eventAdaptorName="jmsProxy" eventAdaptorType="jms">

请按上述方式更正此问题以使其正常工作。您可以启用跟踪 [1] 以查看消息是否到达 CEP 的特定组件。

[1]https://docs.wso2.org/display/CEP300/CEP+Event+Tracer

【讨论】:

    猜你喜欢
    • 2019-05-11
    • 1970-01-01
    • 1970-01-01
    • 2016-04-14
    • 1970-01-01
    • 1970-01-01
    • 2011-08-02
    • 1970-01-01
    • 2017-10-07
    相关资源
    最近更新 更多