【问题标题】:Simple cep querying with esper framework使用 esper 框架进行简单的 cep 查询
【发布时间】:2020-03-04 23:32:32
【问题描述】:

我正在尝试用 esper 做一些简单的 cep。我有一些作为文本文件给出的固定流数据,我的目标是当给定一些模式以输出流中的每个事件时,找到了多少匹配项。

所有模式都将由某个固定的窗口长度确定,这意味着它们将类似于 - 找到每个 A 和每个 B 后面没有 C,它们之间的间隔不超过 6 个事件(A 和 B 不必然相邻)(当然模式可能更复杂)。 为了使用 esper 的时间机制来模拟这一点,我给每个事件一个计数属性——第一个事件有 count=1,第二个有 count=2,依此类推,在发送每个事件后,我将时间提前 1 秒(参见代码)。在计算出现的匹配次数时,事件的计数也用作其 ID。

在几次尝试都没有成功后,我对 esper 框架感到有些沮丧。我尝试使用“每个 A -> 每个 B 其中 timer:within(X sec)”这样的模式,但我发现当我使用它时,我在侦听器函数中收到的 newData 对象似乎聚合了结果,而不仅仅是每次调用函数时保存新结果。另外,当在一段时间后使用这种模式时,运行变得如此缓慢,最终几乎停止。我也尝试过,使用 -

的语法
select * from Event 
match_recognize (
   measures A as X, B as Y
   pattern (A B)
   define 
     *something*
)

但我的理解是它只匹配相邻的 A、B 事件,这不是我的意图。 有没有一些简单的方法可以用 esper 进行一些固定长度的窗口 cep 查询?我应该尝试另一个框架吗?我附上了整个代码,并欢迎任何帮助。

public class Main {
    static int count = 0;
    static Map<Integer, Integer> eventToOccurrences = new HashMap<>();
    static String statementName = "mystatement";
    static int toPrint = 1000;
    static int eventsNum = 9999999;

    public static void main(String[] _s) throws IOException {
        EPCompiler compiler = EPCompilerProvider.getCompiler();
        Configuration configuration = new Configuration();
        configuration.getCommon().addEventType(Event.class);
        CompilerArguments args = new CompilerArguments(configuration);

        EPCompiled epCompiled;
        try {
            final var pattern = 
                 "select * from pattern [every e1=Event(type='A') -> every e2=Event(type='B') where timer:within(5 sec)];";
            epCompiled = compiler.compile("@name('" + statementName + "') " + pattern, args);
        }
        catch (EPCompileException ex) {
            // handle exception here
            throw new RuntimeException(ex);
        }

        EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(configuration);
        EPDeployment deployment;
        try {
            deployment = runtime.getDeploymentService().deploy(epCompiled);
        }
        catch (EPDeployException ex) {
            // handle exception here
            throw new RuntimeException(ex);
        }

        EPStatement statement =
                runtime.getDeploymentService().getStatement(deployment.getDeploymentId(), statementName);

        statement.addListener((newData, oldData, s, r) -> {
            final var events = ((MapEventBean) newData[0]).getProperties();
            for (var b : events.values()) {
               BeanEventBean event = (BeanEventBean)b;
               int count = (int) event.get("count");
               int occurrences = eventToOccurrences.getOrDefault(count, 0);
               eventToOccurrences.put(count, occurrences + 1);
            }
        });

        sendEvents(runtime);

        var fileName = "output.txt";
        FileWriter fileWriter = new FileWriter(fileName);
        PrintWriter printWriter = new PrintWriter(fileWriter);

        for (int count = 0; count < eventsNum; count++) {
            int occurrences = eventToOccurrences.getOrDefault(count, 0);
            printWriter.println(Integer.toString(occurrences));
        }
        printWriter.close();
    }

    static void sendEvents(EPRuntime runtime){
        BufferedReader reader;
        try {
            reader = new BufferedReader(new FileReader("synthetic_FOR_TRAIN_values.txt"));
            String line = reader.readLine();
            while (line != null) {
                String[] s = line.split(",");
                final var event = new Event(s[0], Double.parseDouble(s[1]), count);
                runtime.getEventService().sendEventBean(event,"Event");
                // read next line
                line = reader.readLine();
                count += 1;
                runtime.getEventService().advanceTime(count*1000);
                if(count % toPrint == 0){
                    System.out.println(Integer.toString(count));
                }
            }
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

public class Event {
    private String type;
    private double value;
    private int count;

    public Event(String type, double value, int count) {
        this.type = type;
        this.value = value;
        this.count = count;
    }

    public String getType() {
        return type;
    }
    public int getCount() {
        return count;
    }
    public double getValue() {
        return value;
    }
}

【问题讨论】:

    标签: complex-event-processing esper


    【解决方案1】:

    这里有一些东西。

    侦听器接收多个事件。但是,您发布的侦听器代码仅查看“newData [0]”并忽略所有其他输出。侦听器应遍历所有输出事件,以便您看到所有输出,即“for (EventBean event : newData)”。否则,如果代码只打印第一个输出行,您可能会感到困惑。

    “每个 A -> 每个 B”为您提供了曾经发生的所有 A 事件和曾经发生的所有 B 事件的所有可能组合。这意味着在 100 个 A 事件到达之后,当一个 B 事件到达时,有 100 个输出行,每个都有特定的 A-B 组合。这也将耗尽内存,因为运行时将记住曾经发生的所有 A 事件。如果您按照我之前所说的那样修复侦听器代码,您就会看到这一点。

    不幸的是,您的帖子实际上并未说明要求。用例是什么?预期输入和预期输出是多少?无信息。所以我给出一些一般性的建议。

    当订单定义明确时的典型模式是(向 B 添加过滤器或根据自己的喜好添加模式保护):

    select * from pattern [every a=A -> b=B]
    

    无需排序即可方便的典型连接(添加 where 子句或更改数据窗口或根据自己的喜好进行外部连接):

    select * from A#lastevent as a, B#lastevent as B 
    

    或者更改匹配识别以允许介于两者之间的任何内容:

    match_recognize .... pattern (A anything* B)
    

    如果“preceeding”要求是关于到达顺序,那么例如“match_recognize .... pattern (A {0,6}B C)”或“pattern[A -> [0:6]B -> C] ”。如果“前面的”要求是关于比较时间戳和事件到达乱序使用“where A.after(B)”或类似的连接。加入不关心到达顺序。

    【讨论】:

    • 因此,如果我使用“select * from pattern [every a=A -> b=B]”,我最终会在发送所有事件后,考虑 A 前面的每个组合乙?因此,类似地,如果我希望 A 在 B 之前的每个组合在 C 之前我应该​​使用“从模式中选择 * [每个 a = A -> 每个 b = B - > c = C]”?感谢您的帮助!
    • “每个 a=A -> b=B”和“每个 a=A -> 每个 b=B”之间存在很大差异。第二个every意味着所有可能的组合。对机制的理解在文档“Pattern Walkthru”esper.espertech.com/release-8.4.0/reference-esper/html_single/… 中。我会改用连接。
    • 我需要一个恒定大小的滑动窗口,例如我想找到每个 A 前面的 B 前面的 C 其中 A 和 C 彼此相距不超过 6 个事件(我将添加更多条件当然,最终)。我对 esper 的语法是如何工作的感到有点困惑,所以如果你能给我一些非常有用的语法,谢谢!
    • 加入#length(6)。建议使用在线应用程序之一esperonline.net 学习和尝试不同的东西
    • 所以看起来像:“从 A#length(6) 中选择 * 作为 a,B#length(6) 作为 B,C#length(6) 作为 c,...,F #length(6) as f"?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-12-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多