【问题标题】:slice function in dstream spark streaming not workdstream 火花流中的切片功能不起作用
【发布时间】:2016-03-05 05:49:53
【问题描述】:

Spark 流提供滑动窗口功能,用于获取最后 k 的 rdd。但是我想尝试使用 slice 函数来获取最后 k 的 rdd,以防我想在当前时间之前的范围时间内查询 rdd。

delta = timedelta(seconds=30)
datates = datamap.slice(datetime.now()-delta,datetime.now())

执行代码时出现此错误

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/home/hduser/spark-1.5.0/<ipython-input-1364-f8d325e33d4c> in <module>()
----> 1 datates = datamap.slice(datetime.now()-delta,datetime.now())

/home/hduser/spark-1.5.0/python/pyspark/streaming/dstream.pyc in slice(self, begin, end)
    411         `begin`, `end` could be datetime.datetime() or unix_timestamp
    412         """
--> 413         jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end))
    414         return [RDD(jrdd, self._sc, self._jrdd_deserializer) for jrdd in jrdds]
    415

/home/hduser/spark-1.5.0/python/pyspark/streaming/dstream.pyc in _jdstream(self)
    629
    630         jfunc = TransformFunction(self._sc, self.func, self.prev._jrdd_deserializer)
--> 631         dstream = self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc)
    632         self._jdstream_val = dstream.asJavaDStream()
    633         return self._jdstream_val

/home/hduser/spark-1.5.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    699         answer = self._gateway_client.send_command(command)
    700         return_value = get_return_value(answer, self._gateway_client, None,
--> 701                 self._fqn)
    702
    703         for temp_arg in temp_args:

/home/hduser/spark-1.5.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling None.org.apache.spark.streaming.api.python.PythonTransformedDStream.
: java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported
        at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
        at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
        at org.apache.spark.streaming.api.python.PythonDStream.<init>(PythonDStream.scala:172)
        at org.apache.spark.streaming.api.python.PythonTransformedDStream.<init>(PythonDStream.scala:189)
        at sun.reflect.GeneratedConstructorAccessor80.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:214)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)

如何解决这个错误?谢谢

【问题讨论】:

    标签: python pyspark spark-streaming


    【解决方案1】:

    根据错误信息,

    “不支持在停止上下文后添加新的输入、转换和输出操作”

    看起来使用了 ssc.stop() 而不是 ssc.awaitTermination()。 请提供有关程序中 Spark Streaming Context (ssc) 设置的更多信息。

    【讨论】:

      猜你喜欢
      • 2019-04-02
      • 1970-01-01
      • 1970-01-01
      • 2019-03-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多