【问题标题】:Kafka topic object into spark data frame conversion and writing into HDFSKafka主题对象转换成spark数据帧并写入HDFS
【发布时间】:2020-01-06 09:33:14
【问题描述】:

我正在尝试在火花编码中创建 kafka 消费者,同时创建我得到异常。我的目标是我必须从主题中读取并需要写入 HDFS 路径。

scala> df2.printSchema()
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

scala> print(df1)
[key: binary, value: binary ... 5 more fields]

即使将这 6 个值作为输入,我也没有在该主题中提供任何输入。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import spark.implicits._
object Read {  
  def main(args: Array[String]): Unit = {  

    val spark = SparkSession.builder()
    .appName("spark Oracle Kafka")
    .master("local")
    .getOrCreate()
val df2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka server ip address i have given")
  .option("subscribe", "topic20190904")
  .load()

print(df1)//it is return some values 
df2.show() it's throwing exception i hope it's not dataframe.
df2.write.parquet("/user/xrrn5/abcd")// I am getting java.lang.AbstractMethodError
java.lang.AbstractMethodError  at rg.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala)

【问题讨论】:

  • 您是否考虑过使用 Kafka Connect 将数据写入 HDFS?它专门为此而设计,是 Apache Kafka 的一部分。
  • 谢谢罗宾...我可以有任何代码在 kafka 连接中做同样的事情吗...我的目标是我必须将它从 kafka 主题写到 HDFS 它可以是 scala 或 kafka connect o 火花...

标签: scala dataframe apache-spark apache-kafka apache-kafka-connect


【解决方案1】:

要将数据从 Kafka 写入 HDFS,您实际上不需要任何代码 - 您可以使用 Kafka Connect,它是 Apache Kafka 的一部分。这是一个示例配置:

{
  "name": "hdfs-sink",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "test_hdfs",
    "hdfs.url": "hdfs://localhost:9000",
    "flush.size": "3",
    "name": "hdfs-sink"
  }
}

See here 用于连接器的文档,here 用于使用 Kafka Connect 的一般介绍和概述。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-04-23
    • 2018-12-25
    • 2019-07-12
    • 1970-01-01
    • 2020-09-05
    • 2021-06-11
    • 1970-01-01
    • 2018-09-01
    相关资源
    最近更新 更多