【发布时间】:2020-10-21 04:33:52
【问题描述】:
我最近开始使用 apache spark,遇到了一个需求,我需要读取 kafka 流并在 cassandra 中提供数据。这样做时,我遇到了一个问题,因为流是基于 SQL 的,而 cassandra 连接器在 rdd 上(我可能在这里错了,请纠正我)我正在努力让它工作。不知何故,我现在让它工作了,但不确定这是否是真正的实现方式。
下面是代码
架构
StructType getSchema(){
StructField[] structFields = new StructField[]{
new StructField("id", DataTypes.LongType, true, Metadata.empty()),
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("cat", DataTypes.StringType, true, Metadata.empty()),
new StructField("tag", DataTypes.createArrayType(DataTypes.StringType), true, Metadata.empty())
};
return new StructType(structFields);
}
流式阅读器
Dataset<Row> results = kafkaDataset.select(
col("key").cast("string"),
from_json(col("value").cast("string"), getSchema()).as("value"),
col("topic"),
col("partition"),
col("offset"),
col("timestamp"),
col("timestampType"));
results.select("value.*")
.writeStream()
.foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
@Override
public void call(Dataset<Row> dataset, Long batchId) throws Exception {
ObjectMapper mapper = new ObjectMapper();
List<DealFeedSchema> list = new ArrayList<>();
List<Row> rowList = dataset.collectAsList();
if (!rowList.isEmpty()) {
rowList.forEach(row -> {
if (row == null) logger.info("Null DataSet");
else {
try {
list.add(mapper.readValue(row.json(), DealFeedSchema.class));
} catch (JsonProcessingException e) {
logger.error("error parsing Data", e);
}
}
});
JavaRDD<DealFeedSchema> rdd = new JavaSparkContext(session.sparkContext()).parallelize(list);
javaFunctions(rdd).writerBuilder(Constants.CASSANDRA_KEY_SPACE,
Constants.CASSANDRA_DEAL_TABLE_SPACE, mapToRow(DealFeedSchema.class)).saveToCassandra();
}
}
}).
start().awaitTermination();
虽然这很好用,但我需要知道是否有更好的方法来做到这一点,如果有的话,请告诉我如何实现它。
提前致谢。 对于那些正在寻找一种方法的人,您可以参考此代码作为替代方法.. :)
【问题讨论】:
-
解决这个问题的方法是结合以下两个答案
标签: java apache-spark apache-kafka spark-structured-streaming spark-cassandra-connector