【发布时间】:2019-03-13 17:05:04
【问题描述】:
我下面的代码在读取 AVRO 时给出了错误 row.key 和 row.value Spark 结构流中的生产者格式。请帮忙 解决问题。我收到错误作为 row.key 符号和 row.value 未找到。我想在 Spark 中读取数据并在 hadoop 中将其写入 cassandra system.I 看到这是在 spark 结构流中读取 AVRO 源数据的唯一方法。请让我知道是否有任何其他方式可以从生产者那里以 AVRO 格式读取 kafka 数据。
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.avro.Schema
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import scala.reflect.runtime.universe._
import scala.collection.JavaConverters._
object ReadKafkaAvro {
case class DeserializedFromKafkaRecord(key: String, value: String)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("ReadKafkaAvro")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
val schemaRegistryURL = "http://vtorppsdv01.corp.moneris.com:8081"
val topics = "b24_tx_financial_formatted_clean"
val subjectValueName = topics + "-value"
spark.sparkContext.setLogLevel("ERROR")
val restService = new RestService(schemaRegistryURL)
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)
//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)
val client = new CachedSchemaRegistryClient(schemaRegistryURL, 20)
//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null
//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "vtorppsdv01.corp.moneris.com:9093,vtorppsdv02.corp.moneris.com:9093,vtorppsdv03.corp.moneris.com:9093")
.option("subscribe", topics)
.option("specific.avro.reader", "true")
.option("startingOffsets", "earliest")
.option("group_id","b24_ptlf_eim_processing")
.option("security.protocol","SSL")
.option("ssl.keystore.location","C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\kafka-eim-dev.jks")
.option("ssl.keystore.password","BW^1=|sY$j")
.option("ssl.key.password","BW^1=|sY$j")
.option("ssl.truststore.location","C:\\Users\\pawan.likhi\\Desktop\\spark code\\SimpleKafkaConsumer\\cpbp-ca-dev.jks")
.option("ssl.truststore.password","iB>3v$6m@9")//remove for prod
.load()
//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
row =>
if (keyDeserializer == null) {
keyDeserializer = new KafkaAvroDeserializer
keyDeserializer.configure(props.asJava, true) //isKey = true
}
if (valueDeserializer == null) {
valueDeserializer = new KafkaAvroDeserializer
valueDeserializer.configure(props.asJava, false) //isKey = false
}
//Pass the Avro schema.
val deserializedKeyString = keyDeserializer.deserialize(topics, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
val deserializedValueJsonString = valueDeserializer.deserialize(topics, row.value, topicValueAvroSchema).toString
DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueJsonString)
}
val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
.awaitTermination()
}
}
【问题讨论】:
-
看到这个了吗? github.com/AbsaOSS/ABRiS
-
此外,如果您将数据从 Kafka 加载到 Cassandra 或 HDFS,Confluent 通常建议使用 Kafka Connect。我相信 Landoop 有一个 Cassandra 加载程序
-
我发现kafka AVRO格式的数据不容易读取。我使用 twitter bijection 在 spark 流中开发代码,但我收到任何建议的反转字节错误。
-
错误:线程“main”中的异常 org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中丢失任务 0.0 (TID 1,本地主机,执行程序驱动程序):com.twitter.bijection.InversionFailure:反转失败:[B@5335860
标签: scala apache-spark apache-kafka avro