【发布时间】:2018-06-12 02:02:24
【问题描述】:
我在 scala 中有以下代码。我正在使用 spark sql 从 hadoop 中提取数据,对结果执行一些分组,对其进行序列化,然后将该消息写入 Kafka。
我已经编写了代码 - 但我想以实用的方式编写它。我应该创建一个具有函数“getCategories”的新类来从 Hadoop 中获取类别吗?我不知道如何处理这个问题。
这里是代码
class ExtractProcessor {
def process(): Unit = {
implicit val formats = DefaultFormats
val spark = SparkSession.builder().appName("test app").getOrCreate()
try {
val df = spark.sql("SELECT DISTINCT SUBCAT_CODE, SUBCAT_NAME, CAT_CODE, CAT_NAME " +
"FROM CATEGORY_HIERARCHY " +
"ORDER BY CAT_CODE, SUBCAT_CODE ")
val result = df.collect().groupBy(row => (row(2), row(3)))
val categories = result.map(cat =>
category(cat._1._1.toString(), cat._1._2.toString(),
cat._2.map(subcat =>
subcategory(subcat(0).toString(), subcat(1).toString())).toList))
val jsonMessage = write(categories)
val kafkaKey = java.security.MessageDigest.getInstance("SHA-1").digest(jsonMessage.getBytes("UTF-8")).map("%02x".format(_)).mkString.toString()
val key = write(kafkaKey)
Logger.log.info(s"Json Message: ${jsonMessage}")
Logger.log.info(s"Kafka Key: ${key}")
KafkaUtil.apply.send(key, jsonMessage, "testTopic")
}
这里是卡夫卡代码
class KafkaUtil {
def send(key: String, message: String, topicName: String): Unit = {
val properties = new Properties()
properties.put("bootstrap.servers", "localhost:9092")
properties.put("client.id", "test publisher")
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](properties)
try {
val record = new ProducerRecord[String, String](topicName, key, message)
producer.send(record)
}
finally {
producer.close()
Logger.log.info("Kafka producer closed...")
}
}
}
object KafkaUtil {
def apply: KafkaUtil = {
new KafkaUtil
}
}
另外,对于编写单元测试,我应该在功能方法中测试什么。在 OOP 中我们对业务逻辑进行单元测试,但在我的 scala 代码中几乎没有任何业务逻辑。
感谢任何帮助。
提前致谢, 苏约格
【问题讨论】:
标签: scala unit-testing functional-programming apache-kafka