【发布时间】:2015-11-05 15:31:51
【问题描述】:
我有一个 spout 类,它有几个整数和字符串属性,它们按预期进行了序列化/反序列化。该类还有 1 个保存字节数组的 LinkedList。反序列化对象时,此 LinkedList 始终为空。
我已经在所有的 spout 方法中添加了日志语句,并且可以看到 spout 的 'activate' 方法被调用,之后 LinkedList 为空。当“停用”方法发生这种情况时,我没有看到任何日志。
在没有调用 'deactivate' 方法的情况下调用 spout 'activate' 方法似乎很奇怪。当调用 'activate' 方法时,没有重新提交拓扑。
我在 spout 构造函数中也有一个 log 语句,它在 LinkedList 被清空之前不会被调用。
我还反复验证了在 spout 类中的任何地方都没有调用任何会完全清空 LinkedList 的方法。有 1 个点使用 poll 方法,紧随其后的是一条 log 语句来记录新的 LinkedList 大小。
我找到了这个参考,它指出 Kryo 被用于序列化,但它可能只是用于序列化元组数据。 http://storm.apache.org/documentation/Serialization.html
Storm 使用 Kryo 进行序列化。 Kryo 是一种灵活且快速的 产生小序列化的序列化库。
默认情况下,Storm 可以序列化原始类型、字符串、字节数组、 ArrayList、HashMap、HashSet 和 Clojure 集合类型。如果你 想在你的元组中使用另一种类型,你需要注册一个 自定义序列化程序。
这篇文章听起来像是 Kryo 可能只是用于序列化和传递元组,但如果它也用于 Spout 对象,我无法弄清楚如何使用 LinkedList,因为 ArrayLists 和 HashMaps 并不是真的FIFO 队列的一个很好的替代方案。我必须推出自己的 LinkedList 吗?
public class MySpout extends BaseRichSpout
{
private SpoutOutputCollector _collector;
private LinkedList<byte[]> messages = new LinkedList<byte[]>();
public MyObject()
{
queue = new LinkedList<ObjectType>();
}
public void add(byte[] message)
{
messages.add(message);
}
@Override
public void open( Map conf, TopologyContext context,
SpoutOutputCollector collector )
{
_collector = collector;
try
{
Logger.getInstance().addMessage("Opening Spout");
// ####### Open client connection here to read messages
}
catch (MqttException e)
{
e.printStackTrace();
}
}
@Override
public void close()
{
Logger.getInstance().addMessage("Close Method Called!!!!!!!!!!!!!!!!!");
}
@Override
public void activate()
{
Logger.getInstance().addMessage("Activate Method Called!!!!!!!!!!!!!!!!!");
}
@Override
public void nextTuple()
{
if (!messages.isEmpty())
{
System.out.println("Tuple emitted from spout");
_collector.emit(new Values(messages.poll()));
Logger.getInstance().addMessage("Tuple emitted from spout. Remaining in queue: " + messages.size());
try
{
Thread.sleep(1);
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
Logger.getInstance().addMessage("Sleep thread interrupted in nextTuple(). " + Logger.convertStacktraceToString(e));
e.printStackTrace();
}
}
}
}
编辑:
Java Serialization of referenced objects is "losing values"? http://www.javaspecialists.eu/archive/Issue088.html
上面的 SO 链接和 java 专家文章调用了与我所看到的类似的具体示例,问题是执行序列化/反序列化缓存。但是因为 Storm 正在做这项工作,我不确定可以针对这个问题做些什么。
归根结底,更大的问题似乎是 Storm 一开始就突然序列化/反序列化数据。
编辑:
就在 Spout 被激活之前,大量日志消息在不到一秒的时间内通过,内容如下:
执行者 MyTopology-1-1447093098:[X Y] 不存在
在这些消息之后,有一个日志:
为拓扑 ID MyTopology-1-1447093098 设置新分配:#backtype.storm.daemon.common.Assignment{:master-code-dir ...
【问题讨论】:
-
你对
nextTuple()的实现是什么? -
完成。我没有添加数据如何进入消息 LinkedList 的方法,因为这不是问题。
标签: java serialization apache-storm