【问题标题】:Scala spark kafka code - functional approachScala spark kafka 代码 - 函数式方法
【发布时间】:2018-06-12 02:02:24
【问题描述】:

我在 scala 中有以下代码。我正在使用 spark sql 从 hadoop 中提取数据,对结果执行一些分组,对其进行序列化,然后将该消息写入 Kafka。

我已经编写了代码 - 但我想以实用的方式编写它。我应该创建一个具有函数“getCategories”的新类来从 Hadoop 中获取类别吗?我不知道如何处理这个问题。

这里是代码

class ExtractProcessor {
  def process(): Unit = {

  implicit val formats = DefaultFormats

  val spark = SparkSession.builder().appName("test app").getOrCreate()

  try {
     val df = spark.sql("SELECT DISTINCT SUBCAT_CODE, SUBCAT_NAME, CAT_CODE, CAT_NAME " +
    "FROM CATEGORY_HIERARCHY " +
    "ORDER BY CAT_CODE, SUBCAT_CODE ")

     val result = df.collect().groupBy(row => (row(2), row(3)))
     val categories = result.map(cat =>
                    category(cat._1._1.toString(), cat._1._2.toString(),
                      cat._2.map(subcat =>
                      subcategory(subcat(0).toString(), subcat(1).toString())).toList))

     val jsonMessage = write(categories)
     val kafkaKey = java.security.MessageDigest.getInstance("SHA-1").digest(jsonMessage.getBytes("UTF-8")).map("%02x".format(_)).mkString.toString()
     val key = write(kafkaKey)

     Logger.log.info(s"Json Message: ${jsonMessage}")
     Logger.log.info(s"Kafka Key: ${key}")

     KafkaUtil.apply.send(key, jsonMessage, "testTopic")      
}

这里是卡夫卡代码

class KafkaUtil {
  def send(key: String, message: String, topicName: String): Unit = {
  val properties = new Properties()
  properties.put("bootstrap.servers", "localhost:9092")
  properties.put("client.id", "test publisher")
  properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  val producer = new KafkaProducer[String, String](properties)

  try {

    val record = new ProducerRecord[String, String](topicName, key, message)
    producer.send(record)
  }
  finally {
    producer.close()
    Logger.log.info("Kafka producer closed...")
  }
 }
}

object KafkaUtil {
  def apply: KafkaUtil = {
  new KafkaUtil
 }
}

另外,对于编写单元测试,我应该在功能方法中测试什么。在 OOP 中我们对业务逻辑进行单元测试,但在我的 scala 代码中几乎没有任何业务逻辑。

感谢任何帮助。

提前致谢, 苏约格

【问题讨论】:

    标签: scala unit-testing functional-programming apache-kafka


    【解决方案1】:

    您的代码包括 1) 将数据加载到 spark df 2) 处理数据 3)创建一个json消息 4) 向kafka发送json消息

    单元测试适用于测试纯函数。 您可以将步骤2) 提取到具有类似签名的方法中 def getCategories(df: DataFrame): Seq[Category] 并通过测试覆盖它。 在测试中,数据帧将仅从一个普通的硬编码内存序列生成。

    如果您认为步骤3) 容易出错,也可以通过单元测试覆盖它

    步骤1)4) 将包含在端到端测试中

    顺便说一句 val result = df.collect().groupBy(row => (row(2), row(3))) 效率低下。最好换成val result = df.groupBy(row => (row(2), row(3))).collect

    此外,无需为每条消息单独初始化 KafkaProducer。

    【讨论】:

    • 感谢您的回答!提取步骤 2 你的意思是在同一个类中创建 getCategories 方法?此外,您提到的步骤 1) 和 4) 将包含在端到端测试中。您是指单元测试还是集成测试?
    • 它不能在同一个类中(至少与您当前的设计相比),因为ExtractProcessor 与 spark 环境的初始化相结合。端到端是指集成测试。
    • 有没有更好的设计可以应用到上面的代码中?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-07-21
    • 2017-09-22
    • 1970-01-01
    • 1970-01-01
    • 2021-09-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多