【问题标题】:java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRowjava.lang.ClassCastException:java.lang.Integer 无法转换为 org.apache.spark.sql.catalyst.expressions.UnsafeRow
【发布时间】:2020-07-22 14:59:23
【问题描述】:

我是 Spark Structured Streaming 编程的新手。使用此F.approx_count_distinct 后出现此错误,这是我的代码。我的问题是我想获得一个检测欺诈的数据框,但首先我需要检查是否有人拥有相同的card_number。谁能帮我?提前致谢。

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import from_json, col
from pyspark.sql import functions as F
from pyspark.sql.functions import when
from pyspark.sql.types import *
conf = SparkConf().setAppName("Pruebas").setMaster("local")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sparkSQL = SparkSession \
.builder \
.appName("SparkSQL") \
.master("local") \
.getOrCreate()
broker="localhost:9092"
topic = "transacts"

# Construir el dataframe de streaming
df = sparkSQL \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", broker) \
.option("failOnDataLoss", "false") \
.option("subscribe", topic) \
.option("startingOffsets", "latest") \
.option("includeTImestamp", "true") \
.load()

# Definir el esquema que utilizaremos en el json
schema = StructType([ StructField("card_owner", StringType(), True),
StructField("card_number", StringType(), True),
StructField("geography", StringType(), True),
StructField("target", StringType(), True),
StructField("amount",  StringType(), True),
StructField("currency", StringType(), True)])


# decodificar el json
# al decodificar el json nos genera una serie de subcolumnas dentro del campo value
df = df.withColumn("value", from_json(df["value"].cast("string"), schema))
df.printSchema()

# seleccionamos el timestamp del mensaje y las columnas del json
df = df.select("timestamp","value.*")
df1 = df.groupBy(df.card_number).agg(F.approx_count_distinct(df.card_owner).alias('titulares')).filter((F.col('titulares')>1))

df1 = df1.selectExpr("'a' as key", "to_json(struct(*)) as value")

query= df1.writeStream\
.outputMode("complete")\
.format("kafka")\
.option("topic","aux_topic1")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("checkpointLocation","hdfs://localhost:9000/checkpoints")\
.start()
#query.awaitTermination(200)

# Paso de json a df

topic1= "aux_topic1"
df1 = sparkSQL \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", broker) \
.option("failOnDataLoss", "false") \
.option("subscribe", topic1) \
.option("startingOffsets", "latest") \
.option("includeTImestamp", "true") \
.load()

# Definir el esquema que utilizaremos en el json
schema = StructType([ StructField("card_number", StringType(), True),
StructField("titulares", StringType(), True)])

# decodificar el json
df1 = df1.withColumn("value", from_json(df1["value"].cast("string"), schema))
df1.printSchema()

df1 = df1.select("timestamp","value.*")
df2=df.join(df1, on="card_number")

#Mostrar por pantalla
query1= df2.writeStream\
.outputMode("append")\
.format("console")\
.queryName("test")\
.start()
query1.awaitTermination()

【问题讨论】:

  • 你能分享完整的错误跟踪吗?

标签: python apache-spark pyspark apache-kafka spark-structured-streaming


【解决方案1】:

问题似乎是这一行:

df1 = df \
    .groupBy(df.card_number) \
    .agg(F.approx_count_distinct(df.card_owner).alias('titulares')) \
    .filter((F.col('titulares')>1))

更准确地说是您的过滤器.filter((F.col('titulares')>1))


如果您想获取所有出现多次的卡号,以下方法可以解决问题:

这是你的数据框

df.show()
+-----------+-------------+                                                     
|card_number|   card_owner|
+-----------+-------------+
|      12345| Andrew Smith|
|      98765|   John Brown|
|      12345| Andrew Smith|
|      98765|   John Brown|
|      33445|Maria Johnson|
+-----------+-------------+

现在获取每个卡号的所有计数(过滤掉没有重复的那些):

>>> df \
...     .groupBy('card_number') \
...     .count() \
...     .filter('count>1') \
...     .show()
+-----------+-----+
|card_number|count|
+-----------+-----+
|      12345|    2|
|      98765|    2|
+-----------+-----+

现在如果你也想要card_owner,那么:

>>> df \
...     .groupBy(['card_number', 'card_owner']) \
...     .count() \
...     .filter('count>1') \
...     .show()
+-----------+------------+-----+
|card_number|  card_owner|count|
+-----------+------------+-----+
|      12345|Andrew Smith|    2|
|      98765|  John Brown|    2|
+-----------+------------+-----+

【讨论】:

  • 我也有同样的问题,就我而言,我只是使用f.count。但我不明白为什么我们不能使用 f.col 过滤
猜你喜欢
  • 2018-03-16
  • 1970-01-01
  • 2019-11-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-12-20
  • 1970-01-01
相关资源
最近更新 更多