【发布时间】:2020-03-04 23:32:32
【问题描述】:
我正在尝试用 esper 做一些简单的 cep。我有一些作为文本文件给出的固定流数据,我的目标是当给定一些模式以输出流中的每个事件时,找到了多少匹配项。
所有模式都将由某个固定的窗口长度确定,这意味着它们将类似于 - 找到每个 A 和每个 B 后面没有 C,它们之间的间隔不超过 6 个事件(A 和 B 不必然相邻)(当然模式可能更复杂)。 为了使用 esper 的时间机制来模拟这一点,我给每个事件一个计数属性——第一个事件有 count=1,第二个有 count=2,依此类推,在发送每个事件后,我将时间提前 1 秒(参见代码)。在计算出现的匹配次数时,事件的计数也用作其 ID。
在几次尝试都没有成功后,我对 esper 框架感到有些沮丧。我尝试使用“每个 A -> 每个 B 其中 timer:within(X sec)”这样的模式,但我发现当我使用它时,我在侦听器函数中收到的 newData 对象似乎聚合了结果,而不仅仅是每次调用函数时保存新结果。另外,当在一段时间后使用这种模式时,运行变得如此缓慢,最终几乎停止。我也尝试过,使用 -
的语法select * from Event
match_recognize (
measures A as X, B as Y
pattern (A B)
define
*something*
)
但我的理解是它只匹配相邻的 A、B 事件,这不是我的意图。 有没有一些简单的方法可以用 esper 进行一些固定长度的窗口 cep 查询?我应该尝试另一个框架吗?我附上了整个代码,并欢迎任何帮助。
public class Main {
static int count = 0;
static Map<Integer, Integer> eventToOccurrences = new HashMap<>();
static String statementName = "mystatement";
static int toPrint = 1000;
static int eventsNum = 9999999;
public static void main(String[] _s) throws IOException {
EPCompiler compiler = EPCompilerProvider.getCompiler();
Configuration configuration = new Configuration();
configuration.getCommon().addEventType(Event.class);
CompilerArguments args = new CompilerArguments(configuration);
EPCompiled epCompiled;
try {
final var pattern =
"select * from pattern [every e1=Event(type='A') -> every e2=Event(type='B') where timer:within(5 sec)];";
epCompiled = compiler.compile("@name('" + statementName + "') " + pattern, args);
}
catch (EPCompileException ex) {
// handle exception here
throw new RuntimeException(ex);
}
EPRuntime runtime = EPRuntimeProvider.getDefaultRuntime(configuration);
EPDeployment deployment;
try {
deployment = runtime.getDeploymentService().deploy(epCompiled);
}
catch (EPDeployException ex) {
// handle exception here
throw new RuntimeException(ex);
}
EPStatement statement =
runtime.getDeploymentService().getStatement(deployment.getDeploymentId(), statementName);
statement.addListener((newData, oldData, s, r) -> {
final var events = ((MapEventBean) newData[0]).getProperties();
for (var b : events.values()) {
BeanEventBean event = (BeanEventBean)b;
int count = (int) event.get("count");
int occurrences = eventToOccurrences.getOrDefault(count, 0);
eventToOccurrences.put(count, occurrences + 1);
}
});
sendEvents(runtime);
var fileName = "output.txt";
FileWriter fileWriter = new FileWriter(fileName);
PrintWriter printWriter = new PrintWriter(fileWriter);
for (int count = 0; count < eventsNum; count++) {
int occurrences = eventToOccurrences.getOrDefault(count, 0);
printWriter.println(Integer.toString(occurrences));
}
printWriter.close();
}
static void sendEvents(EPRuntime runtime){
BufferedReader reader;
try {
reader = new BufferedReader(new FileReader("synthetic_FOR_TRAIN_values.txt"));
String line = reader.readLine();
while (line != null) {
String[] s = line.split(",");
final var event = new Event(s[0], Double.parseDouble(s[1]), count);
runtime.getEventService().sendEventBean(event,"Event");
// read next line
line = reader.readLine();
count += 1;
runtime.getEventService().advanceTime(count*1000);
if(count % toPrint == 0){
System.out.println(Integer.toString(count));
}
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class Event {
private String type;
private double value;
private int count;
public Event(String type, double value, int count) {
this.type = type;
this.value = value;
this.count = count;
}
public String getType() {
return type;
}
public int getCount() {
return count;
}
public double getValue() {
return value;
}
}
【问题讨论】:
标签: complex-event-processing esper