【问题标题】:monotonically_increasing_id() is not supported with streaming DataFrames/Datasets流数据帧/数据集不支持 monotonically_increasing_id()
【发布时间】:2021-11-29 01:50:59
【问题描述】:

首先,我尝试在 BATCH MODE 中计算数据框中不同列之间的相似度,并且效果很好。

现在,我正在尝试将数据从 kafka 流式传输到 Pyspark 并执行与在 BATCH MODE 中相同的工作,但是当我尝试时出现此错误:

上面写着monotonically_increasing_id() is not supported,但我需要这个函数,这样我的算法才能工作,这里是代码:

        start = timer()
    #idoffre=str(df3["idoffre"])
    idoffre="2F12EE1B-E548-4246-8E49-4CC453A50953"
    print("\n idoffre=",idoffre,"\n")
    demandeurs = get_offre_demandeurs(idoffre)
    """"
    print("\tOffre : ")
    print(o)
    print("\t Demandeurs : ")
    i = 1
    for d in demandeurs:
        print("\n Demaneur N°",i," : ")
        print(d)
        i += 1
    print()
    """
    print("idoffre = ",idoffre,"\n")
    for d in demandeurs:
        print(d['iddemandeur'], "\t",d['fichenamereference'])
    print()
    df=spark.createDataFrame(demandeurs)
    df_ofr=df3
    #df_ofr.select("idoffre","metieroffert","wilaya").show()
    
    df_sim_metier=df.selectExpr("metierprincipal","iddemandeur").crossJoin(df_ofr.selectExpr("metieroffert","idoffre")).sort("idoffre")
    df_sim_salaire=df.selectExpr("salairesouhaite","iddemandeur").crossJoin(df_ofr.selectExpr("salaireoffert","idoffre")).sort("idoffre")
    df_sim_maitrise_info=df.selectExpr("niveaumaitriseoutilinformatique AS niveaumaitriseoutilinformatique_dem ","iddemandeur").crossJoin(df_ofr.selectExpr("niveaumaitriseoutilinformatique AS niveaumaitriseoutilinformatique_ofr","idoffre")).sort("idoffre")
    df_sim_instruction=df.selectExpr("niveauinstruction AS niveauinstruction_dem","iddemandeur").crossJoin(df_ofr.selectExpr("niveauinstruction AS niveauinstruction_ofr","idoffre")).sort("idoffre")
    df_sim_qualification=df.selectExpr("niveauqualification AS niveauqualification_dem","iddemandeur").crossJoin(df_ofr.selectExpr("niveauqualification AS niveauqualification_ofr","idoffre")).sort("idoffre")
    df_sim_permis=df.selectExpr("categoriepermisconduire AS categoriepermisconduire_dem","iddemandeur").crossJoin(df_ofr.selectExpr("categoriepermisconduire AS categoriepermisconduire_ofr","idoffre")).sort("idoffre")
    df_sim_exp=df.selectExpr("experience AS experience_dem","iddemandeur").crossJoin(df_ofr.selectExpr("experience AS experience_ofr","idoffre")).sort("idoffre")
    df_sim_age=df.selectExpr("datenaissance","iddemandeur").crossJoin(df_ofr.selectExpr("ageminimum","agemaximum","idoffre")).sort("idoffre")
    df_sim_melitaire=df.selectExpr("situationmilitaire","iddemandeur").crossJoin(df_ofr.selectExpr("servicemilitaire","idoffre")).sort("idoffre")
    df_sim_typecontrat=df.selectExpr("typecontrat AS typecontrat_dem","iddemandeur").crossJoin(df_ofr.selectExpr("typecontrat AS typecontrat_ofr","idoffre")).sort("idoffre")

    df_sim_instruction=df_sim_instruction.withColumn("niveauinstruction_dem",when(col("niveauinstruction_dem")=="Sans Niveau",0).otherwise(when(col("niveauinstruction_dem")=="PRIMAIRE",1).otherwise(when(col("niveauinstruction_dem")=="MOYEN",2).otherwise(when(col("niveauinstruction_dem")=="Secondaire 1AS",3).otherwise(when(col("niveauinstruction_dem")=="Secondaire 2AS",4).otherwise(when(col("niveauinstruction_dem")=="Secondaire 3AS",5).otherwise(when(col("niveauinstruction_dem")=="UNIVERSITAIRE",6).otherwise(when(col("niveauinstruction_dem")=="Supérieur 1",7).otherwise(when(col("niveauinstruction_dem")=="Supérieur 2",8))))))))))
    df_sim_instruction=df_sim_instruction.withColumn("niveauinstruction_ofr",when(col("niveauinstruction_ofr")=="Sans Niveau",0).otherwise(when(col("niveauinstruction_ofr")=="PRIMAIRE",1).otherwise(when(col("niveauinstruction_ofr")=="MOYEN",2).otherwise(when(col("niveauinstruction_ofr")=="Secondaire 1AS",3).otherwise(when(col("niveauinstruction_ofr")=="Secondaire 2AS",4).otherwise(when(col("niveauinstruction_ofr")=="Secondaire 3AS",5).otherwise(when(col("niveauinstruction_ofr")=="UNIVERSITAIRE",6).otherwise(when(col("niveauinstruction_ofr")=="Supérieur 1",7).otherwise(when(col("niveauinstruction_ofr")=="Supérieur 2",8))))))))))  
    
    df_sim_qualification=df_sim_qualification.withColumn("niveauqualification_dem",when(col("niveauqualification_dem")=="Sans Qualification",0).otherwise(when(col("niveauqualification_dem")=="Personnel d'aide",1).otherwise(when(col("niveauqualification_dem")=="Personnel Qualifié",2).otherwise(when(col("niveauqualification_dem")=="Techniciens et techniciens supérieurs",3).otherwise(when(col("niveauqualification_dem")=="Cadres et cadres superieurs",4).otherwise(when(col("niveauqualification_dem")=="Personnel hautement qualifie",5)))))))
    df_sim_qualification=df_sim_qualification.withColumn("niveauqualification_ofr",when(col("niveauqualification_ofr")=="Sans Qualification",0).otherwise(when(col("niveauqualification_ofr")=="Personnel d'aide",1).otherwise(when(col("niveauqualification_ofr")=="Personnel Qualifié",2).otherwise(when(col("niveauqualification_ofr")=="Techniciens et techniciens supérieurs",3).otherwise(when(col("niveauqualification_ofr")=="Cadres et cadres superieurs",4).otherwise(when(col("niveauqualification_ofr")=="Personnel hautement qualifie",5)))))))

    
    
    
    #df_spec4=df_spec4.withColumn("similarite_act_comp_spec",groupby('iddemandeur').agg((sum("similarity act_comp") / count('iddemandeur')).alias('similarite_act_comp_spec')))
    df_sim_metier=df_sim_metier.withColumn("similarite_metier",when(col("metierprincipal")==col("metieroffert"),1).otherwise(0)).drop("metierprincipal","metieroffert")
    df_sim_salaire=df_sim_salaire.withColumn("similarite_salaire",when(col("salaireoffert")=="None",1).otherwise(when(col("salairesouhaite")=="None",1).otherwise(when(col("salairesouhaite").cast(DoubleType()) <= col("salaireoffert").cast(DoubleType()),1).otherwise(0)))).drop("salairesouhaite","salaireoffert")
    df_sim_maitrise_info=df_sim_maitrise_info.withColumn("similarite_informatique",when(col("niveaumaitriseoutilinformatique_ofr")=="None",0).otherwise(when(col("niveaumaitriseoutilinformatique_ofr").cast(IntegerType())<=col("niveaumaitriseoutilinformatique_dem").cast(IntegerType()),1).otherwise(0))).drop("niveaumaitriseoutilinformatique_dem","niveaumaitriseoutilinformatique_ofr")
    df_sim_instruction=df_sim_instruction.withColumn("similarite_instruction",when(col("niveauinstruction_dem")>=col("niveauinstruction_ofr"),1).otherwise(0)).drop("niveauinstruction_dem","niveauinstruction_ofr")
    df_sim_qualification=df_sim_qualification.withColumn("similarite_qualification",when(col("niveauqualification_dem")>=col("niveauqualification_ofr"),1).otherwise(0)).drop("niveauqualification_dem","niveauqualification_ofr")
    df_sim_permis=df_sim_permis.withColumn("similarite_permis",when(col("categoriepermisconduire_ofr")=="None",0).otherwise(when(col("categoriepermisconduire_dem")==col("categoriepermisconduire_ofr"),1).otherwise(0))).drop("categoriepermisconduire_dem","categoriepermisconduire_ofr")
    df_sim_exp=df_sim_exp.withColumn("similarite_exp",when(ceil(col("experience_dem").cast(DoubleType()))>=ceil(col("experience_ofr").cast(DoubleType())),1).otherwise(0)).drop("experience_dem","experience_ofr")
    df_sim_melitaire=df_sim_melitaire.withColumn("similarite_militaire",when(col("situationmilitaire")==col("servicemilitaire"),1).otherwise(0)).drop("situationmilitaire","servicemilitaire")
    df_sim_typecontrat=df_sim_typecontrat.withColumn("similarite_contrat",when(col("typecontrat_dem")==col("typecontrat_ofr"),1).otherwise(0)).drop("typecontrat_dem","typecontrat_ofr")
    df_sim_age=df_sim_age.withColumn("similarite_age",when(col("agemaximum")=="None",when(col("ageminimum")=="None",1).otherwise(when(col("ageminimum")<=floor(datediff(current_date(),col("datenaissance").cast(DateType()))/365.25),1).otherwise(0))).otherwise(when(col("agemaximum")>=floor(datediff(current_date(),col("datenaissance").cast(DateType()))/365.25),1).otherwise(0))).drop("datenaissance","ageminimum","agemaximum")
    #df_sim_act_base=spark.createDataFrame()
    #df_sim_comp_base=spark.createDataFrame()
    #df_sim_act_comp_spec=spark.createDataFrame()
    df_sim_act_base=similarite_act_base(df,df_ofr)
    df_sim_comp_base=similarite_comp_base(df,df_ofr)
    df_sim_act_comp_spec=similarite_act_comp_spec(df,df_ofr)
    
    
    #ofr_size=len(df_ofr.columns)-2
    ofr_size=13
    df_Columns=["niveaumaitriseoutilinformatique","servicemilitaire"]
    #df_ofr=df_ofr.withColumn("column_numbers",when((col("niveaumaitriseoutilinformatique")=="None")&(col("servicemilitaire")=="None"),ofr_size-2).otherwise(when((col("niveaumaitriseoutilinformatique")=="None")|(col("servicemilitaire")=="None"),ofr_size-1).otherwise(ofr_size)))
    df_ofr2=df_ofr
    df_ofr=df_ofr.withColumn("column_numbers",count_nones(df_ofr,df_Columns)).select("niveaumaitriseoutilinformatique","servicemilitaire","column_numbers")
    df_ofr=df_ofr.withColumn("Nbr",ofr_size- col("column_numbers"))
    df_ofr2=df_ofr.select("Nbr")
    df_ofr2=df_ofr2.crossJoin(df.select("iddemandeur"))
    #df_ofr2=df_ofr2.select("Nbr")
    #df_ofr2.show(30)
    #offre_size=df_ofr.select("column_numbers").collect()[0][0]
    #offre_size
    
    

    df_sim_metier=df_sim_metier.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
    df_sim_salaire=df_sim_salaire.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_maitrise_info=df_sim_maitrise_info.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_instruction=df_sim_instruction.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_qualification=df_sim_qualification.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_permis=df_sim_permis.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_exp=df_sim_exp.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_melitaire=df_sim_melitaire.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_typecontrat=df_sim_typecontrat.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_age=df_sim_age.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_act_base=df_sim_act_base.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre","activitebase_ofr","activitebase_dm")
    df_sim_comp_base=df_sim_comp_base.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre","comp_base_ofr","comp_base_dm")
    df_sim_act_comp_spec=df_sim_act_comp_spec.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")


    df_sim1 = df_sim_metier.join(df_sim_salaire, on=["row_index"]).join(df_sim_maitrise_info, on=["row_index"]).join(df_sim_instruction, on=["row_index"]).join(df_sim_qualification, on=["row_index"]).join(df_sim_permis, on=["row_index"]).join(df_sim_exp, on=["row_index"]).join(df_sim_melitaire, on=["row_index"]).join(df_sim_typecontrat, on=["row_index"]).join(df_sim_age, on=["row_index"]).join(df_sim_act_base, on=["row_index"]).join(df_sim_comp_base, on=["row_index"]).join(df_sim_act_comp_spec, on=["row_index"]).drop("row_index")

    #df_sim1.select("similarite_permis","similarite_exp").show()
    df_sim1=df_sim1.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
    df_ofr2=df_ofr2.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur")
    df_sim1=df_ofr2.join(df_sim1, on=["row_index"]).drop("row_index")
    #df_sim1.select("idoffre","iddemandeur","Nbr","Similarite_activitebase").show()
    
    
    df_sim1=df_sim1.withColumn("SIMILARITE",(col("similarite_metier")+col("similarite_salaire")+col("similarite_informatique")+col("similarite_instruction")+col("similarite_qualification")+col("similarite_permis")+col("similarite_exp")+col("similarite_militaire")+col("similarite_contrat")+col("similarite_age")+col("similarite_act_comp_spec")+col("Similarite_activitebase")+col("Similarite_comp_base")) / col("Nbr"))
    #df_sim1=df_sim1.withColumn("SIMILARITE",(col("similarite_metier")+col("similarite_salaire")+col("similarite_informatique")+col("similarite_instruction")+col("similarite_qualification")+col("similarite_permis")+col("similarite_exp")+col("similarite_militaire")+col("similarite_contrat")+col("similarite_age")+col("similarite_act_comp_spec")+col("Similarite_activitebase")+col("Similarite_comp_base")) / offre_size)
    #resultat_elastic=df_sim1.collect()
    
    """
    for row in resultat_elastic:
        dict_score = {}
        dict_score['iddemandeur']=row['iddemandeur']
        dict_score['idoffre']=row['idoffre']
        dict_score['similarite_metier']=row['similarite_metier']
        dict_score['similarite_informatique']=row['similarite_informatique']
        dict_score['similarite_instruction']=row['similarite_instruction']
        dict_score['similarite_qualification']=row['similarite_qualification']
        dict_score['similarite_permis']=row['similarite_permis']
        dict_score['similarite_exp']=row['similarite_exp']
        dict_score['similarite_militaire']=row['similarite_militaire']
        dict_score['similarite_contrat']=row['similarite_contrat']
        dict_score['similarite_age']=row['similarite_age']
        dict_score['similarite_activitebase']=row['Similarite_activitebase']
        dict_score['similarite_comp_base']=row['Similarite_comp_base']
        dict_score['similarite_act_comp_spec']=row['similarite_act_comp_spec']
        dict_score['similarite']=row['SIMILARITE']
        dict_score['count_attributs']=row['Nbr']
        es.index(index="test_score2",body=dict_score)
        print ("\n\n -- SCORE FINAL ", dict_score['iddemandeur'],' : ',dict_score['similarite'])
        
        print(dict_score['similarite_activitebase'])
        print(dict_score['similarite_comp_base'])
        print(dict_score['similarite_act_comp_spec'])
    
    """
    end = timer()

知道我正在使用 Spark 3.1.2 并使用 Spark 结构化流进行流式处理。

有没有办法替换 monotonically_increasing_id() 以便此代码在流式传输中正常工作?

【问题讨论】:

    标签: apache-spark pyspark apache-kafka apache-spark-sql spark-streaming


    【解决方案1】:

    由于您是从 Kafka 流式传输,您可以尝试使用您的 kafka 源提供的offset or timestamp 来订购您的数据。

    例如。从 spark kafka 文档修改

    df = spark \
      .read \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
      .option("subscribe", "topic1") \
      .load()
    
    df.selectExpr(
        "CAST(key AS STRING)", 
        "CAST(value AS STRING)",
        "offset as row_index",
        "timestamp as source_timestamp", 
    )
    

    Offset 现在将成为您的 row_index 或者您可以尝试

    df_sim1=df_sim1.withColumn('row_index', row_number().over(Window.orderBy("source_timestamp")))
    df_ofr2=df_ofr2.withColumn('row_index', row_number().over(Window.orderBy("source_timestamp"))).drop("iddemandeur")
    

    让我知道这是否适合你。

    【讨论】:

      猜你喜欢
      • 2019-04-17
      • 2011-09-09
      • 2018-04-25
      • 2018-06-20
      • 1970-01-01
      • 1970-01-01
      • 2021-12-31
      • 2019-08-26
      • 1970-01-01
      相关资源
      最近更新 更多