【发布时间】:2020-01-06 09:33:14
【问题描述】:
我正在尝试在火花编码中创建 kafka 消费者,同时创建我得到异常。我的目标是我必须从主题中读取并需要写入 HDFS 路径。
scala> df2.printSchema()
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
scala> print(df1)
[key: binary, value: binary ... 5 more fields]
即使将这 6 个值作为输入,我也没有在该主题中提供任何输入。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import spark.implicits._
object Read {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("spark Oracle Kafka")
.master("local")
.getOrCreate()
val df2 = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "kafka server ip address i have given")
.option("subscribe", "topic20190904")
.load()
print(df1)//it is return some values
df2.show() it's throwing exception i hope it's not dataframe.
df2.write.parquet("/user/xrrn5/abcd")// I am getting java.lang.AbstractMethodError
java.lang.AbstractMethodError at rg.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala)
【问题讨论】:
-
您是否考虑过使用 Kafka Connect 将数据写入 HDFS?它专门为此而设计,是 Apache Kafka 的一部分。
-
谢谢罗宾...我可以有任何代码在 kafka 连接中做同样的事情吗...我的目标是我必须将它从 kafka 主题写到 HDFS 它可以是 scala 或 kafka connect o 火花...
标签: scala dataframe apache-spark apache-kafka apache-kafka-connect