【发布时间】:2021-03-06 05:35:02
【问题描述】:
我的火花:
spark = SparkSession\
.builder\
.appName("Demo")\
.master("local[3]")\
.config("spark.streaming.stopGracefullyonShutdown", "true")\
.config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
.getOrCreate()
Mongo URI:
input_uri_weld = 'mongodb://127.0.0.1:27017/db.coll1'
output_uri_weld = 'mongodb://127.0.0.1:27017/db.coll1'
将流批次写入Mongo的功能:
def save_to_mongodb_collection(current_df, epoc_id, mongodb_collection_name):
current_df.write\
.format("com.mongodb.spark.sql.DefaultSource") \
.mode("append") \
.option("spark.mongodb.output.uri", output_uri_weld) \
.save()
卡夫卡流:
kafka_df = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("subscribe", kafka_topic)\
.option("startingOffsets", "earliest")\
.load()
写信给 Mongo:
mongo_writer = df_parsed.write\
.format('com.mongodb.spark.sql.DefaultSource')\
.mode('append')\
.option("spark.mongodb.output.uri", output_uri_weld)\
.save()
&我的 spark.conf 文件:
spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-avro_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0
错误:
java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql.DefaultSource. Please find packages at http://spark.apache.org/third-party-projects.html
【问题讨论】:
-
@OneCricketeer 您删除了错误的问题。这个应该被编辑/删除,因为我忘了在标题中加上“...的问题”
-
我没有删除任何问题。您创建了一个重复的帖子
标签: mongodb apache-spark apache-kafka