【问题标题】:pyspark streaming DStreams to kafka topicpyspark 将 DStreams 流式传输到 kafka 主题
【发布时间】:2018-11-18 00:49:38
【问题描述】:

尽可能简单,是否可以将 DStream 流式传输到 Kafka 主题?

我有完成所有数据处理的 Spark 流作业,现在我想将数据推送到 Kafka 主题。在pyspark中可以这样做吗?

【问题讨论】:

    标签: pyspark apache-kafka kafka-consumer-api kafka-producer-api


    【解决方案1】:

    在写入 kafka 之前最好转换为 json,否则指定正在写入 kafka 的键和值列。

        query = jdf.selectExpr("to_json(struct(*)) AS value")\
      .writeStream\
      .format("kafka")\
      .option("zookeeper.connect", "localhost:2181")\
      .option("kafka.bootstrap.servers", "localhost:9092")\
      .option("topic", "test-spark")\
      .option("checkpointLocation", "/root/")\
      .outputMode("append")\
      .start()
    

    【讨论】:

      【解决方案2】:

      如果您的消息是 AVRO 格式,我们可以直接发送消息并用 kafka 编写。

      from pyspark import SparkConf, SparkContext
      from kafka import KafkaProducer
      from kafka.errors import KafkaError
      from pyspark.sql import SQLContext, SparkSession
      
          from pyspark.streaming import StreamingContext
          from pyspark.streaming.kafka import KafkaUtils
          import json
          from kafka import SimpleProducer, KafkaClient
          from kafka import KafkaProducer
          from pyspark.streaming.kafka import KafkaUtils, OffsetRange, TopicAndPartition
          import avro.schema
          from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
          from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
          import pandas as pd
      
      
          ssc = StreamingContext(sc, 2)
          ssc = StreamingContext(sc, 2)
          topic = "test"
          brokers = "localhost:9092"
          kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
          kvs.foreachRDD(handler)
          def handler(message):
              records = message.collect()
              for record in records:
                   <Data processing whatever you want and creating the var_val_value,var_val_key pair >
      
      
                     var_kafka_parms_tgt = {'bootstrap.servers': var_bootstrap_servr,'schema.registry.url': var_schema_url} 
                     avroProducer = AvroProducer(var_kafka_parms_tgt,default_key_schema=key_schema, default_value_schema=value_schema)
                     avroProducer.produce(topic=var_topic_tgt_name, value=var_val_value, key=var_val_key)
                     avroProducer.flush()
      

      【讨论】:

        猜你喜欢
        • 2016-03-24
        • 2020-10-02
        • 2016-02-16
        • 2019-05-05
        • 2016-06-04
        • 2021-07-28
        • 2021-09-19
        • 2018-05-13
        • 2022-01-11
        相关资源
        最近更新 更多