【发布时间】:2021-06-27 04:03:11
【问题描述】:
我正在尝试将数据加载到 jet pipeLine 阶段内的 IMap 中,但出现错误
这是我的代码
public static Pipeline pipeLineStage(JetInstance jet) {
Pipeline pipeLine = Pipeline.create();
BatchStage<DataModel> dbValue = pipeLine.readFrom(Sources.jdbc(
"jdbc:postgresql://localhost/postgres?user=postgres&password=root",
"SELECT id1, id2, id3, id4\r\n"
+ " FROM public.tbl_test where id1='3'",
resultSet -> new DataModel(resultSet.getString(2), resultSet.getString(3), resultSet.getString(4))));
dbValue.filter(model -> model.getId2().equals("person"))
.map(model -> JsonUtil.mapFrom(model.getObject_value())).map(map -> {
IMap<Object, Object> map1 = jet.getMap("map1");
map1.put("employee_id", map.get("id"));
return map;
}).writeTo(Sinks.logger());
return pipeLine;
}
错误:-
Exception in thread "main" java.lang.IllegalArgumentException: "mapFn" must be serializable
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:203)
*如果我将数据存储在普通的Map 中,我不会收到任何错误,并且只有当我存储在IMap 对象中时才会出现错误,并且在上面的代码中我使用的是模型类,即@987654326 @ 并实现了public class DataModel implements Serializable {}.....任何建议也会有所帮助..谢谢*
【问题讨论】:
-
您能分享一下错误吗?此外,您不应该在
map()转换步骤中放置数据,而是在writeTo()最后一步中放置数据。 -
您无法从在 Jet 集群上执行的代码中访问
jet(或任何其他资源)。要访问 Hazelcast 地图,请使用提供的mapUsingIMap或mapUsingService。 -
我想在 IMap 中输入键和值?我不认为 mapUsingIMap 会支持这一点。或者你能举一些例子吗?
-
如果要写入地图,则需要 IMap 接收器。从映射步骤发出您要插入的条目并添加
.writeTo(Sinks.map(...))。 -
此代码示例说明了使用 Jet Pipeline 处理数据并将结果存储到 IMap 的方法:github.com/hazelcast/hazelcast-jet/blob/master/examples/…
标签: java hazelcast-imap hazelcast-jet