【问题标题】:Kafka-spark Streaming processing jobs synchronicallyKafka-spark Streaming 同步处理作业
【发布时间】:2018-10-26 09:41:34
【问题描述】:

我正在尝试一个使用 Kafka-connect 和 spark 的简单测试

我写了一个自定义的 kafka-connect 来创建这个源记录

SourceRecord sr = new SourceRecord(null,
                    null,
                    destTopic,
                   Schema.STRING_SCHEMA,
                    cleanPath);

在火花中我收到这样的消息

val kafkaConsumerParams = Map[String, String](
      "metadata.broker.list" -> prop.getProperty("kafka_host"),
      "zookeeper.connect" -> prop.getProperty("zookeeper_host"),
      "group.id" -> prop.getProperty("kafka_group_id"),
      "schema.registry.url" -> prop.getProperty("schema_registry_url"),
      "auto.offset.reset" -> prop.getProperty("auto_offset_reset")
    )
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConsumerParams, topicsSet)

val ds = messages.foreachRDD(rdd => {
          val toPrint = rdd.map(t => {
            val file_path = t._2

            val startTime = DateTime.now()


            Thread.sleep(1000 * 60)
            1
      }).sum()
        LogUtils.getLogger(classOf[DeviceManager]).info(" toPrint = " + toPrint +" (number of flows calculated)")
      })
    }

当我使用连接器向所需主题发送多条消息时(在我的测试中它有 6 个分区) 睡眠线程获取所有消息,但同步而不是异步执行它们。

当我创建一个简单的测试生产者时,睡眠是异步完成的。

我还创建了 2 个简单的消费者,并尝试了连接器和生产者,这两个任务都被异步消费了 这意味着我的问题在于火花接收从连接器发送的消息的方式。 我不明白为什么这些任务的行为方式与我从生产者那里发送时的方式不同。

我什至打印了火花收到的记录,它们完全一样

生产者发送记录

1: {partition=2, offset=11, value=something, key=null}
2: {partition=5, offset=9, value=something2, key=null}

连接发送记录

1: {partition=3, offset=9, value=something, key=null}

我的项目中使用的版本是

    <scala.version>2.11.7</scala.version>
    <confluent.version>4.0.0</confluent.version>
    <kafka.version>1.0.0</kafka.version>
    <java.version>1.8</java.version>
    <spark.version>2.0.0</spark.version>

依赖

 <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-graphx_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>2.0.0-RC1</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.8.0</version>
        </dependency>
<dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <scope>${global.scope}</scope>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-connect-avro-converter</artifactId>
            <version>${confluent.version}</version>
            <scope>${global.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>${kafka.version}</version>
        </dependency>

【问题讨论】:

  • 如果您使用的是 Spark2,您需要在您的 POM 中更新 &lt;version&gt;1.6.3&lt;/version&gt; 以反映这一点

标签: scala apache-spark apache-kafka spark-streaming confluent-platform


【解决方案1】:

我们无法异步运行 Spark-Kafka 流作业。但是我们可以像 Kafka 消费者那样并行运行它们。为此,我们需要在SparkConf() 中设置以下配置:

sparkConf.set("spark.streaming.concurrentJobs","4")

默认情况下,其值为"1"。但我们可以将其覆盖为更高的值。

我希望这会有所帮助!

【讨论】:

  • 它确实,此外,我添加了一些配置来实现我想要的。通过将参数“spark.streaming.receiver.maxRate”和“spark.streaming.kafka.maxRatePerPartition”设置为 1,并将 kafka 主题分区设置为 1,我设法实现了我正在寻找的行为。这样,我“减轻”了 kafka 的分区职责,让 spark 自己完成。现在,对于从 kafka 收到的每条消息,spark 都会将其发送给他的任何工作人员并并行完成,当然还有“spark.streaming.concurrentJobs”限制。
猜你喜欢
  • 1970-01-01
  • 2018-07-15
  • 2017-02-06
  • 2017-08-24
  • 2017-03-29
  • 2021-05-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多