【发布时间】:2018-12-22 19:10:59
【问题描述】:
我有一种情况,我正在尝试使用来自 kafka 的火花流进行流式传输。流是直接流。我能够创建一个流,然后开始流式传输,还能够通过流式获取 kafka 的任何更新(如果有)。
当我有新请求流式传输新主题时,问题就出现了。由于每个 jvm 只能有 1 个 SparkStreaming 上下文,因此我无法为每个新请求创建一个新流。
我想出来的方法是
一旦创建了 DStream 并且 spark 流已经在进行中,只需将新流附加到它。这似乎不起作用,createDStream(用于新主题2)不返回流并且停止进一步处理。在第一个请求(比如 topic1)上继续流式传输。
其次,我想停止流,创建 DStream,然后重新开始流。我不能使用相同的流上下文(它会抛出一个在流停止后无法添加作业的异常),如果我为新主题(topic2)创建一个新流,旧的流主题(topic1)会丢失并且它会流式传输只有新的。
这是代码,看看
JavaStreamingContext javaStreamingContext;
if(null == javaStreamingContext) {
javaStreamingContext = JavaStreamingContext(sparkContext, Durations.seconds(duration));
} else {
StreamingContextState streamingContextState = javaStreamingContext.getState();
if(streamingContextState == StreamingContextState.STOPPED) {
javaStreamingContext = JavaStreamingContext(sparkContext, Durations.seconds(duration));
}
}
Collection<String> topics = Arrays.asList(getTopicName(schemaName));
SparkVoidFunctionImpl impl = new SparkVoidFunctionImpl(getSparkSession());
KafkaUtils.createDirectStream(javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, getKafkaParamMap()))
.map((stringStringConsumerRecord) -> stringStringConsumerRecord.value())
.foreachRDD(impl);
if (javaStreamingContext.getState() == StreamingContextState.ACTIVE) {
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
不用担心 SparkVoidFunctionImpl,这是一个自定义类,是 VoidFunction 的实现。
以上是方法 1,我不停止现有的流式传输。当一个新的请求进入这个方法时,它没有得到一个新的流对象,它试图创建一个 dstream。问题是 DStream 对象永远不会返回。
KafkaUtils.createDirectStream(javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, getKafkaParamMap()))
这不会返回 dstream,控件只是终止而没有错误。进一步的步骤不会执行。
我尝试了很多事情并阅读了多篇文章,但我相信这是一个非常常见的生产级别问题。任何流式传输都将针对多个不同的主题进行,并且每个主题的处理方式都不同。
请帮忙
【问题讨论】:
-
您的目标是一次阅读多个主题吗?它们真的需要动态添加/删除吗?
-
不是一下子,把它们当成一个rest api请求。所以时间T1,一个请求来到流主题Topic1。我创建了一个 dstream 并开始流式传输。现在在时间 T2 > T1,第二个休息 api 请求来到流主题 2,所以主题 1 的流已经在进行中并且一些客户端正在订阅它。其他一些客户希望 Topic2 被流式传输。我的选择是停止流式传输并为 topic2 创建一个 dstream,但随后我将流式传输到 topic1。第二种选择是在不停止流的情况下创建一个 dStream,这似乎不起作用
标签: java apache-spark apache-kafka streaming spark-streaming