package spark

import java.util.Properties

import java.util.HashMap
import org.apache.kafka.clients.producer._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.sql.SQLContext
import org.apache.spark.{ SparkContext, SparkConf }
import spark.bean.orders

object SelectFromOneTable {
  def main(args: Array[String]) {
    val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_test", "1")
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    props.put("producer.type", "async")

    val producer = new KafkaProducer[String, String](props)

    val sparkConf = new SparkConf().setAppName("Spark SQL Test Case").setMaster("local")
    val sparkContext = new SparkContext(sparkConf)

    val sqlContext = new SQLContext(sparkContext)
    val url = "jdbc:mysql://localhost:3306/sun_test?user=root&password=Sun@123";
    val prop = new Properties();
    val df = sqlContext.read.jdbc(url, "flow", prop).collect()

    for (a <- df) {
      println(a)
      val message = new ProducerRecord[String, String](topic, null, a.toString())
      producer.send(message)
    }
  }
}  

 

相关文章:

  • 2022-12-23
  • 2021-08-23
  • 2021-12-29
  • 2021-12-12
  • 2021-10-20
  • 2021-08-03
  • 2022-12-23
  • 2021-12-01
猜你喜欢
  • 2021-11-02
  • 2022-12-23
  • 2021-11-21
  • 2022-12-23
  • 2021-06-10
  • 2021-12-23
  • 2022-12-23
相关资源
相似解决方案