【问题标题】:Java Apache Storm Spout empty deserialized LinkedList object attributeJava Apache Storm Spout 空的反序列化 LinkedList 对象属性
【发布时间】:2015-11-05 15:31:51
【问题描述】:

我有一个 spout 类,它有几个整数和字符串属性,它们按预期进行了序列化/反序列化。该类还有 1 个保存字节数组的 LinkedList。反序列化对象时,此 LinkedList 始终为空。

我已经在所有的 spout 方法中添加了日志语句,并且可以看到 spout 的 'activate' 方法被调用,之后 LinkedList 为空。当“停用”方法发生这种情况时,我没有看到任何日志。

在没有调用 'deactivate' 方法的情况下调用 spout 'activate' 方法似乎很奇怪。当调用 'activate' 方法时,没有重新提交拓扑。

我在 spout 构造函数中也有一个 log 语句,它在 LinkedList 被清空之前不会被调用。

我还反复验证了在 spout 类中的任何地方都没有调用任何会完全清空 LinkedList 的方法。有 1 个点使用 poll 方法,紧随其后的是一条 log 语句来记录新的 LinkedList 大小。

我找到了这个参考,它指出 Kryo 被用于序列化,但它可能只是用于序列化元组数据。 http://storm.apache.org/documentation/Serialization.html

Storm 使用 Kryo 进行序列化。 Kryo 是一种灵活且快速的 产生小序列化的序列化库。

默认情况下,Storm 可以序列化原始类型、字符串、字节数组、 ArrayList、HashMap、HashSet 和 Clojure 集合类型。如果你 想在你的元组中使用另一种类型,你需要注册一个 自定义序列化程序。

这篇文章听起来像是 Kryo 可能只是用于序列化和传递元组,但如果它也用于 Spout 对象,我无法弄清楚如何使用 LinkedList,因为 ArrayLists 和 HashMaps 并不是真的FIFO 队列的一个很好的替代方案。我必须推出自己的 LinkedList 吗?

public class MySpout extends BaseRichSpout
{

    private SpoutOutputCollector _collector;
    private LinkedList<byte[]> messages = new LinkedList<byte[]>();

    public MyObject()
    {
        queue = new LinkedList<ObjectType>();
    }

    public void add(byte[] message)
    {
        messages.add(message);
    }

    @Override
    public void open( Map conf, TopologyContext context,
            SpoutOutputCollector collector )
    {
        _collector = collector;

        try
        {           
            Logger.getInstance().addMessage("Opening Spout");
            // ####### Open client connection here to read messages
        }
        catch (MqttException e)
        {
            e.printStackTrace();
        }
    }

    @Override
    public void close()
    {
        Logger.getInstance().addMessage("Close Method Called!!!!!!!!!!!!!!!!!");
    }

    @Override
    public void activate()
    {
        Logger.getInstance().addMessage("Activate Method Called!!!!!!!!!!!!!!!!!");
    }

    @Override
    public void nextTuple()
    {

        if (!messages.isEmpty())
        {
            System.out.println("Tuple emitted from spout");            
            _collector.emit(new Values(messages.poll()));
            Logger.getInstance().addMessage("Tuple emitted from spout. Remaining in queue: " + messages.size());
            try
            {
                Thread.sleep(1);
            }
            catch (InterruptedException e)
            {
                // TODO Auto-generated catch block
                Logger.getInstance().addMessage("Sleep thread interrupted in nextTuple(). " + Logger.convertStacktraceToString(e));
                e.printStackTrace();
            }
        }
    }
}

编辑:

Java Serialization of referenced objects is "losing values"? http://www.javaspecialists.eu/archive/Issue088.html

上面的 SO 链接和 java 专家文章调用了与我所看到的类似的具体示例,问题是执行序列化/反序列化缓存。但是因为 Storm 正在做这项工作,我不确定可以针对这个问题做些什么。

归根结底,更大的问题似乎是 Storm 一开始就突然序列化/反序列化数据。

编辑:

就在 Spout 被激活之前,大量日志消息在不到一秒的时间内通过,内容如下:

执行者 MyTopology-1-1447093098:[X Y] 不存在

在这些消息之后,有一个日志:

为拓扑 ID MyTopology-1-1447093098 设置新分配:#backtype.storm.daemon.common.Assignment{:master-code-dir ...

【问题讨论】:

  • 你对nextTuple()的实现是什么?
  • 完成。我没有添加数据如何进入消息 LinkedList 的方法,因为这不是问题。

标签: java serialization apache-storm


【解决方案1】:

如果我正确理解您的问题,您在客户端实例化您的 spout,通过addMessage() 添加消息,通过addSpout() 将 spout 提供给 TopologyBuilder,然后将拓扑提交到您的集群?当拓扑启动时,您希望 spout 消息列表包含您添加的消息吗?如果这是正确的,那么您的使用模式很奇怪......

我猜这个问题与用于将拓扑提交到集群的 Thrift 有关。不使用 Java 序列化,我假设 Thrift 代码不会序列化实际对象。据我了解的代码,拓扑 jar 是二进制的,拓扑 structure 是通过 Thrift 提供的。在执行拓扑的工作人员上,通过new 创建新的 spout/bolt 对象。因此,不会发生 Java 序列化/反序列化,并且 LinkedList 是空的。由于new的调用,当然也不是null

顺便说一句:你说得对 Kryo,它只用于传送数据(即元组)。

作为一种变通方法,您可以将LinkedList 添加到MapStormSubmitter.submitTopology(...)。在Spout.open(...) 中,您应该从Map 参数中获得正确的消息副本。但是,正如我已经提到的,您的使用模式很奇怪——您可能需要重新考虑这一点。一般来说,spout应该以某种方式实现,即可以从外部数据源获取nextTuple()中的数据。

【讨论】:

  • 在将 spout 提交给拓扑构建器之前,我不会添加数据。数据通过 Spout 中的套接字连接进入,实际上是使用 MQTT。我没有包含所有这些代码,因为消息完美地通过 MQTT 客户端连接传入。问题是,经过一段可变的时间(从 2 分钟到 30 分钟),Spout 会突然再次运行“激活”方法,此时“消息”LinkedList 将突然为空。我自己没有重置 LinkedList,也没有调用 Spout 构造函数。
  • 如果第二次调用activate,那么在spout之前一定有一些错误。因此,Storm 重新启动 spout,这意味着创建了一个新的 spout 实例。所有内部状态都丢失了...如果要保留内部状态,则需要在外部备份并在启动时加载。许多人使用 Kafka 作为可靠的外部资源,并使用 KafkaSpout 将其状态存储在 Zookeeper 中。为了调试导致 spout 崩溃的错误,您应该详细查看日志文件(也许提高日志级别会有所帮助)。
  • 这听起来很合理,但我在 Spout 构造函数中打开了日志记录。我看到在提交 Topology 时创建了 Spout 对象,但是当稍后调用“activate”方法时,Spout 构造函数的日志记录不显示。
  • 不知道这种情况...对不起。
猜你喜欢
  • 2019-02-18
  • 1970-01-01
  • 1970-01-01
  • 2020-05-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-30
  • 2018-08-14
相关资源
最近更新 更多