【问题标题】:How to Insert KafkaToSparkstreaming JsonData in Hive DataBase如何在 Hive 数据库中插入 Kafka Spark 流式 Json 数据
【发布时间】:2017-05-13 17:19:16
【问题描述】:

我的流式 JsonData 在这里:

{
    "visibility": "public",
    "response": "yes",
    "guests": 0,
    "member": {
        "member_id": 145170662,
        "photo": "http:\/\/photos2.meetupstatic.com\/photos\/member\/b\/8\/1\/7\/thumb_255947127.jpeg",
        "member_name": "Claude"
    },
    "rsvp_id": 1645188104,
    "mtime": 1483074100082,
    "event": {
        "event_name": "Kayakers' 65th (Sun) :  FishMkt Reserve to Ascot(Garvey Pk)",
        "event_id": "236414110",
        "time": 1483833600000,
        "event_url": "https:\/\/www.meetup.com\/swankayakers\/events\/236414110\/"
    },
    "group": {
        "group_topics": [{
            "urlkey": "kayaking",
            "topic_name": "Kayaking"
        }, {
            "urlkey": "newintown",
            "topic_name": "New In Town"
        }, {
            "urlkey": "socialnetwork",
            "topic_name": "Social Networking"
        }, {
            "urlkey": "wellness",
            "topic_name": "Wellness"
        }, {
            "urlkey": "outdoors",
            "topic_name": "Outdoors"
        }, {
            "urlkey": "multicultural-couples",
            "topic_name": "Multicultural Couples"
        }, {
            "urlkey": "scuba-diving",
            "topic_name": "Scuba Diving"
        }, {
            "urlkey": "singles-over-50",
            "topic_name": "Singles Over 50"
        }, {
            "urlkey": "scuba-diving-adventures",
            "topic_name": "Scuba Diving Adventures"
        }, {
            "urlkey": "female-fitness",
            "topic_name": "female fitness"
        }, {
            "urlkey": "fun-females-over-40",
            "topic_name": "Fun Females Over 40"
        }, {
            "urlkey": "singles-over-40",
            "topic_name": "Singles Over 40"
        }, {
            "urlkey": "couples-over-40",
            "topic_name": "Couples over 40+"
        }, {
            "urlkey": "kayaking-and-canoeing",
            "topic_name": "Kayaking and Canoeing"
        }, {
            "urlkey": "nature",
            "topic_name": "Nature"
        }],
        "group_city": "Perth",
        "group_country": "au",
        "group_id": 18906617,
        "group_name": "Swan River* Kayaking",
        "group_lon": 115.84,
        "group_urlname": "swankayakers",
        "group_lat": -31.96
    }
}

我从 Kafka 读取 JSON 数据到 Spark Streaming 并在 Hive 数据库中插入 JSON 值,使用 SparkSql(Data-Set) 在下面的代码中使用

data.write().mode("overwrite").saveAsTable("dk.jsondata");     

我正在使用这些版本:

 spark 2.0.0,
 kafka_2.11-0.8.2.1,
 Hive 1.2.1

所以请使用 SparkJava 为这个任务提供一个解决方案

    import java.util.Arrays;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.api.java.function.VoidFunction2;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    /*import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.catalog.Function;*/
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.AbstractJavaDStreamLike;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import kafka.serializer.StringDecoder;
    import scala.Tuple2;
    //Kafka To SparkStreaming and Json Parsing Working Code and DataAnalysis
    public class KafkaToSparkStreaming 
    {
        public static  void main(String arr[]) throws InterruptedException
        {

            JavaSparkContext sc=null;
            JavaStreamingContext ssc=null;
            SQLContext sqlContext=null;        
            SparkConf conf = new SparkConf();
            conf.set("spark.app.name", "SparkReceiver"); //The name of application. This will appear in the UI and in log data.
            //conf.set("spark.ui.port", "7077");    //Port for application's dashboard, which shows memory and workload data.
            conf.set("dynamicAllocation.enabled","false");  //Which scales the number of executors registered with this application up and down based on the workload
            //conf.set("spark.cassandra.connection.host", "localhost"); //Cassandra Host Adddress/IP
            conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");  //For serializing objects that will be sent over the network or need to be cached in serialized form.
            //conf.setMaster("local");
            conf.set("spark.streaming.stopGracefullyOnShutdown", "true");

            sc = new JavaSparkContext(conf);         
            // Create the context with 2 seconds batch size
            ssc = new JavaStreamingContext(sc, new Duration(2000));
            Map<String, String> kafkaParams = new HashMap<String, String>();
            kafkaParams.put("zookeeper.connect", "localhost:2181"); //Make all kafka data for this cluster appear under a particular path. 
            kafkaParams.put("group.id", "testgroup");   //String that uniquely identifies the group of consumer processes to which this consumer belongs
            kafkaParams.put("metadata.broker.list", "localhost:9092"); //Producer can find a one or more Brokers to determine the Leader for each topic.
            kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); //Serializer to use when preparing the message for transmission to the Broker.
            kafkaParams.put("request.required.acks", "1");  //Producer to require an acknowledgement from the Broker that the message was received.
            Set<String> topics = Collections.singleton("RsvpsJsonData2");
            //Create an input DStream for Receiving data from socket
            JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
                    String.class, 
                    String.class, 
                    StringDecoder.class, 
                    StringDecoder.class, 
                    kafkaParams, topics);

            System.out.println(directKafkaStream);
            directKafkaStream.print();
            JavaDStream<Long> data=directKafkaStream.count();
            data.print();        
            //json parsing
            JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
                public String call(Tuple2<String,String> message) throws Exception {
                    return message._2();
                };
            });
            //json.print();

            final SparkSession spark = SparkSession
                    .builder()
                    .appName("Json Parsing Example")
                    .getOrCreate();;

                    json.foreachRDD(new VoidFunction<JavaRDD<String>>() {
                        @Override
                        public void call(JavaRDD<String> rdd) {
                            if(!rdd.isEmpty()){
                                Dataset<Row> data = spark.read().json(rdd).select("mtime");
                                data.printSchema();
                                data.show();
                                data.createOrReplaceTempView("jsonData");
                                data.filter("rsvp_id");
                                data.write().mode("overwrite").saveAsTable("dk.jsondata");                    
                            }
                        }
                    });   
                    ssc.start();            
                    ssc.awaitTermination();  
        }
    }

请给出解决方案

【问题讨论】:

  • 到底是什么问题?它不工作吗?
  • jsondata 在 hive 中插入不起作用
  • 如何在hive中插入json数据
  • json 数据插入配置单元不起作用

标签: apache-spark hive apache-kafka apache-spark-sql spark-streaming


【解决方案1】:

您可以在创建 sparksession 时尝试使用 .enableHiveSupport()

final SparkSession spark = SparkSession
                    .builder()
                    .appName("Json Parsing Example")
                    .getOrCreate();

final SparkSession spark = SparkSession
                    .builder()
                    .appName("Json Parsing Example")
                    .enableHiveSupport()
                    .getOrCreate();

【讨论】:

    【解决方案2】:

    它在 Spark 2.2.0 和 Kafka 0.10.0.0 上发生了变化

    import org.apache.spark.streaming.kafka010.KafkaUtils;
    //Create direct kafka stream with brokers and topics
    JavaInputDStream<ConsumerRecord<String, String>> messages =
        KafkaUtils.createDirectStream(
            jssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
        );
    

    【讨论】:

      猜你喜欢
      • 2020-08-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-25
      • 2020-04-10
      • 2019-10-25
      • 1970-01-01
      • 2018-10-03
      相关资源
      最近更新 更多