【问题标题】:Row.key and row.value not working in spark structure streaming codeRow.key 和 row.value 在 Spark 结构化流代码中不起作用
【发布时间】:2019-03-13 17:05:04
【问题描述】:

我下面的代码在读取 AVRO 时给出了错误 row.key 和 row.value Spark 结构流中的生产者格式。请帮忙 解决问题。我收到错误作为 row.key 符号和 row.value 未找到。我想在 Spark 中读取数据并在 hadoop 中将其写入 cassandra system.I 看到这是在 spark 结构流中读取 AVRO 源数据的唯一方法。请让我知道是否有任何其他方式可以从生产者那里以 AVRO 格式读取 kafka 数据。

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.avro.Schema
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import scala.reflect.runtime.universe._
import scala.collection.JavaConverters._


object ReadKafkaAvro {

  case class DeserializedFromKafkaRecord(key: String, value: String)

  def main(args: Array[String]): Unit = {



    val spark = SparkSession
      .builder
      .appName("ReadKafkaAvro")
      .config("spark.master", "local")
      .getOrCreate()

    import spark.implicits._

      val schemaRegistryURL = "http://vtorppsdv01.corp.moneris.com:8081"
      val topics = "b24_tx_financial_formatted_clean"
      val subjectValueName = topics + "-value"

    spark.sparkContext.setLogLevel("ERROR")
    val restService = new RestService(schemaRegistryURL)

    val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)

    //Use Avro parsing classes to get Avro Schema
    val parser = new Schema.Parser
    val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)

    //key schema is typically just string but you can do the same process for the key as the value
    val keySchemaString = "\"string\""
    val keySchema = parser.parse(keySchemaString)



    //Create a map with the Schema registry url.
    //This is the only Required configuration for Confluent's KafkaAvroDeserializer.
    val props = Map("schema.registry.url" -> schemaRegistryURL)
    val client = new CachedSchemaRegistryClient(schemaRegistryURL, 20)

    //Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
    var keyDeserializer: KafkaAvroDeserializer = null
    var valueDeserializer: KafkaAvroDeserializer = null

    //Create structured streaming DF to read from the topic.
    val rawTopicMessageDF = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "vtorppsdv01.corp.moneris.com:9093,vtorppsdv02.corp.moneris.com:9093,vtorppsdv03.corp.moneris.com:9093")
      .option("subscribe", topics)
       .option("specific.avro.reader", "true")
      .option("startingOffsets", "earliest")
      .option("group_id","b24_ptlf_eim_processing")
      .option("security.protocol","SSL")
      .option("ssl.keystore.location","C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\kafka-eim-dev.jks")
      .option("ssl.keystore.password","BW^1=|sY$j")
      .option("ssl.key.password","BW^1=|sY$j")
      .option("ssl.truststore.location","C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\cpbp-ca-dev.jks")
      .option("ssl.truststore.password","iB>3v$6m@9")//remove for prod
      .load()



    //instantiate the SerDe classes if not already, then deserialize!
    val deserializedTopicMessageDS = rawTopicMessageDF.map{
      row =>
        if (keyDeserializer == null) {
          keyDeserializer = new KafkaAvroDeserializer
          keyDeserializer.configure(props.asJava, true)  //isKey = true
        }
        if (valueDeserializer == null) {
          valueDeserializer = new KafkaAvroDeserializer
          valueDeserializer.configure(props.asJava, false) //isKey = false
        }

        //Pass the Avro schema.
        val deserializedKeyString = keyDeserializer.deserialize(topics, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
      val deserializedValueJsonString = valueDeserializer.deserialize(topics, row.value, topicValueAvroSchema).toString

        DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueJsonString)
    }

    val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", false)
      .start()
      .awaitTermination()

  }
}

【问题讨论】:

  • 看到这个了吗? github.com/AbsaOSS/ABRiS
  • 此外,如果您将数据从 Kafka 加载到 Cassandra 或 HDFS,Confluent 通常建议使用 Kafka Connect。我相信 Landoop 有一个 Cassandra 加载程序
  • 我发现kafka AVRO格式的数据不容易读取。我使用 twitter bijection 在 spark 流中开发代码,但我收到任何建议的反转字节错误。
  • 错误:线程“main”中的异常 org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中丢失任务 0.0 (TID 1,本地主机,执行程序驱动程序):com.twitter.bijection.InversionFailure:反转失败:[B@5335860

标签: scala apache-spark apache-kafka avro


【解决方案1】:

我发现它不容易读取 kafka AVRO 格式的数据。我使用 twitter bijection 在 spark 流中开发代码,但我收到任何建议的反转字节错误。

Error : Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): com.twitter.bijection.InversionFailure: Failed to invert: [B@5335860

我使用的新代码:

import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.{KafkaAvroDecoder, KafkaAvroDeserializer}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.avro.generic.GenericRecord
import org.apache.avro.Schema
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.avro.Schema
import org.apache.avro.hadoop.io.AvroDeserializer
import org.apache.commons.codec.StringDecoder


object ReadKafkaAvro1 {

   object Injection {

  val schemaRegistryURL = "http://vtorppsdv01.corp.moneris.com:8081"
 val topics = "b24_tx_financial_formatted_clean"
   val subjectValueName = topics + "-value"
    val restService = new RestService(schemaRegistryURL)
    val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
  val parser = new Schema.Parser()
 //   val schema = parser.parse(getClass.getResourceAsStream("src\\main\\resources\\b24_tx_financial_formatted_clean.avsc"))
    val schema = parser.parse((valueRestResponseSchema.getSchema))
        val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
 }

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("ReadKafkaAvro").setMaster("local[*]")
    val streamingCtx = new StreamingContext(conf,Seconds(30))
    val schemaRegistryURL1 = "http://vtorppsdv01.corp.moneris.com:8081"
    val topics = Array("b24_tx_financial_formatted_clean")

    streamingCtx.sparkContext.setLogLevel("ERROR")

    val kafkaParms = Map[String,Object]("bootstrap.servers" -> "vtorppsdv01.corp.moneris.com:9093,vtorppsdv02.corp.moneris.com:9093,vtorppsdv03.corp.moneris.com:9093",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
      "group.id" -> "b24_ptlf_eim_processing" ,
      "auto.offset.reset" -> "earliest",
      "auto.commit.interval.ms" -> "2000",
      "schema.registry.url" -> schemaRegistryURL1,
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "security.protocol" -> "SSL",
      "ssl.keystore.location" -> "C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\kafka-eim-dev.jks",
      "ssl.keystore.password" -> "BW^1=|sY$j",
      "ssl.key.password" -> "BW^1=|sY$j",
      "ssl.truststore.location" -> "C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\cpbp-ca-dev.jks",
      "ssl.truststore.password" -> "iB>3v$6m@9",
      "ssl.keystore.type" -> "JCEKS",
      "ssl.truststore.type" -> "JCEKS",
      "specific.avro.reader" -> "True"
    )

    val inputStream = KafkaUtils.createDirectStream[String,Array[Byte]](streamingCtx,PreferConsistent,Subscribe[String,Array[Byte]](topics,kafkaParms))



    val recordStream = inputStream.map(msg => Injection.injection.invert(msg.value()).get)
   // .map(record => (record.get("AuthorizationTransactionSource"),record.get("AuthorizationTransactionSourceID")))



    inputStream.map(x => (x.key,x.value)).print()

    //recordStream.print()


    recordStream.print()

    streamingCtx.start()
    streamingCtx.awaitTermination()


  }
}

【讨论】:

  • 双射不使用架构注册表。使用这种方法,您只需重新执行 Confluent 的 KafkaAvroDeserializer 已经执行的内部方法。如果你得到一个错误,这不应该是一个公认的答案
猜你喜欢
  • 2020-10-29
  • 1970-01-01
  • 2021-11-23
  • 2017-05-04
  • 1970-01-01
  • 1970-01-01
  • 2018-08-18
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多