【发布时间】:2021-11-29 01:50:59
【问题描述】:
首先,我尝试在 BATCH MODE 中计算数据框中不同列之间的相似度,并且效果很好。
现在,我正在尝试将数据从 kafka 流式传输到 Pyspark 并执行与在 BATCH MODE 中相同的工作,但是当我尝试时出现此错误:
上面写着monotonically_increasing_id() is not supported,但我需要这个函数,这样我的算法才能工作,这里是代码:
start = timer()
#idoffre=str(df3["idoffre"])
idoffre="2F12EE1B-E548-4246-8E49-4CC453A50953"
print("\n idoffre=",idoffre,"\n")
demandeurs = get_offre_demandeurs(idoffre)
""""
print("\tOffre : ")
print(o)
print("\t Demandeurs : ")
i = 1
for d in demandeurs:
print("\n Demaneur N°",i," : ")
print(d)
i += 1
print()
"""
print("idoffre = ",idoffre,"\n")
for d in demandeurs:
print(d['iddemandeur'], "\t",d['fichenamereference'])
print()
df=spark.createDataFrame(demandeurs)
df_ofr=df3
#df_ofr.select("idoffre","metieroffert","wilaya").show()
df_sim_metier=df.selectExpr("metierprincipal","iddemandeur").crossJoin(df_ofr.selectExpr("metieroffert","idoffre")).sort("idoffre")
df_sim_salaire=df.selectExpr("salairesouhaite","iddemandeur").crossJoin(df_ofr.selectExpr("salaireoffert","idoffre")).sort("idoffre")
df_sim_maitrise_info=df.selectExpr("niveaumaitriseoutilinformatique AS niveaumaitriseoutilinformatique_dem ","iddemandeur").crossJoin(df_ofr.selectExpr("niveaumaitriseoutilinformatique AS niveaumaitriseoutilinformatique_ofr","idoffre")).sort("idoffre")
df_sim_instruction=df.selectExpr("niveauinstruction AS niveauinstruction_dem","iddemandeur").crossJoin(df_ofr.selectExpr("niveauinstruction AS niveauinstruction_ofr","idoffre")).sort("idoffre")
df_sim_qualification=df.selectExpr("niveauqualification AS niveauqualification_dem","iddemandeur").crossJoin(df_ofr.selectExpr("niveauqualification AS niveauqualification_ofr","idoffre")).sort("idoffre")
df_sim_permis=df.selectExpr("categoriepermisconduire AS categoriepermisconduire_dem","iddemandeur").crossJoin(df_ofr.selectExpr("categoriepermisconduire AS categoriepermisconduire_ofr","idoffre")).sort("idoffre")
df_sim_exp=df.selectExpr("experience AS experience_dem","iddemandeur").crossJoin(df_ofr.selectExpr("experience AS experience_ofr","idoffre")).sort("idoffre")
df_sim_age=df.selectExpr("datenaissance","iddemandeur").crossJoin(df_ofr.selectExpr("ageminimum","agemaximum","idoffre")).sort("idoffre")
df_sim_melitaire=df.selectExpr("situationmilitaire","iddemandeur").crossJoin(df_ofr.selectExpr("servicemilitaire","idoffre")).sort("idoffre")
df_sim_typecontrat=df.selectExpr("typecontrat AS typecontrat_dem","iddemandeur").crossJoin(df_ofr.selectExpr("typecontrat AS typecontrat_ofr","idoffre")).sort("idoffre")
df_sim_instruction=df_sim_instruction.withColumn("niveauinstruction_dem",when(col("niveauinstruction_dem")=="Sans Niveau",0).otherwise(when(col("niveauinstruction_dem")=="PRIMAIRE",1).otherwise(when(col("niveauinstruction_dem")=="MOYEN",2).otherwise(when(col("niveauinstruction_dem")=="Secondaire 1AS",3).otherwise(when(col("niveauinstruction_dem")=="Secondaire 2AS",4).otherwise(when(col("niveauinstruction_dem")=="Secondaire 3AS",5).otherwise(when(col("niveauinstruction_dem")=="UNIVERSITAIRE",6).otherwise(when(col("niveauinstruction_dem")=="Supérieur 1",7).otherwise(when(col("niveauinstruction_dem")=="Supérieur 2",8))))))))))
df_sim_instruction=df_sim_instruction.withColumn("niveauinstruction_ofr",when(col("niveauinstruction_ofr")=="Sans Niveau",0).otherwise(when(col("niveauinstruction_ofr")=="PRIMAIRE",1).otherwise(when(col("niveauinstruction_ofr")=="MOYEN",2).otherwise(when(col("niveauinstruction_ofr")=="Secondaire 1AS",3).otherwise(when(col("niveauinstruction_ofr")=="Secondaire 2AS",4).otherwise(when(col("niveauinstruction_ofr")=="Secondaire 3AS",5).otherwise(when(col("niveauinstruction_ofr")=="UNIVERSITAIRE",6).otherwise(when(col("niveauinstruction_ofr")=="Supérieur 1",7).otherwise(when(col("niveauinstruction_ofr")=="Supérieur 2",8))))))))))
df_sim_qualification=df_sim_qualification.withColumn("niveauqualification_dem",when(col("niveauqualification_dem")=="Sans Qualification",0).otherwise(when(col("niveauqualification_dem")=="Personnel d'aide",1).otherwise(when(col("niveauqualification_dem")=="Personnel Qualifié",2).otherwise(when(col("niveauqualification_dem")=="Techniciens et techniciens supérieurs",3).otherwise(when(col("niveauqualification_dem")=="Cadres et cadres superieurs",4).otherwise(when(col("niveauqualification_dem")=="Personnel hautement qualifie",5)))))))
df_sim_qualification=df_sim_qualification.withColumn("niveauqualification_ofr",when(col("niveauqualification_ofr")=="Sans Qualification",0).otherwise(when(col("niveauqualification_ofr")=="Personnel d'aide",1).otherwise(when(col("niveauqualification_ofr")=="Personnel Qualifié",2).otherwise(when(col("niveauqualification_ofr")=="Techniciens et techniciens supérieurs",3).otherwise(when(col("niveauqualification_ofr")=="Cadres et cadres superieurs",4).otherwise(when(col("niveauqualification_ofr")=="Personnel hautement qualifie",5)))))))
#df_spec4=df_spec4.withColumn("similarite_act_comp_spec",groupby('iddemandeur').agg((sum("similarity act_comp") / count('iddemandeur')).alias('similarite_act_comp_spec')))
df_sim_metier=df_sim_metier.withColumn("similarite_metier",when(col("metierprincipal")==col("metieroffert"),1).otherwise(0)).drop("metierprincipal","metieroffert")
df_sim_salaire=df_sim_salaire.withColumn("similarite_salaire",when(col("salaireoffert")=="None",1).otherwise(when(col("salairesouhaite")=="None",1).otherwise(when(col("salairesouhaite").cast(DoubleType()) <= col("salaireoffert").cast(DoubleType()),1).otherwise(0)))).drop("salairesouhaite","salaireoffert")
df_sim_maitrise_info=df_sim_maitrise_info.withColumn("similarite_informatique",when(col("niveaumaitriseoutilinformatique_ofr")=="None",0).otherwise(when(col("niveaumaitriseoutilinformatique_ofr").cast(IntegerType())<=col("niveaumaitriseoutilinformatique_dem").cast(IntegerType()),1).otherwise(0))).drop("niveaumaitriseoutilinformatique_dem","niveaumaitriseoutilinformatique_ofr")
df_sim_instruction=df_sim_instruction.withColumn("similarite_instruction",when(col("niveauinstruction_dem")>=col("niveauinstruction_ofr"),1).otherwise(0)).drop("niveauinstruction_dem","niveauinstruction_ofr")
df_sim_qualification=df_sim_qualification.withColumn("similarite_qualification",when(col("niveauqualification_dem")>=col("niveauqualification_ofr"),1).otherwise(0)).drop("niveauqualification_dem","niveauqualification_ofr")
df_sim_permis=df_sim_permis.withColumn("similarite_permis",when(col("categoriepermisconduire_ofr")=="None",0).otherwise(when(col("categoriepermisconduire_dem")==col("categoriepermisconduire_ofr"),1).otherwise(0))).drop("categoriepermisconduire_dem","categoriepermisconduire_ofr")
df_sim_exp=df_sim_exp.withColumn("similarite_exp",when(ceil(col("experience_dem").cast(DoubleType()))>=ceil(col("experience_ofr").cast(DoubleType())),1).otherwise(0)).drop("experience_dem","experience_ofr")
df_sim_melitaire=df_sim_melitaire.withColumn("similarite_militaire",when(col("situationmilitaire")==col("servicemilitaire"),1).otherwise(0)).drop("situationmilitaire","servicemilitaire")
df_sim_typecontrat=df_sim_typecontrat.withColumn("similarite_contrat",when(col("typecontrat_dem")==col("typecontrat_ofr"),1).otherwise(0)).drop("typecontrat_dem","typecontrat_ofr")
df_sim_age=df_sim_age.withColumn("similarite_age",when(col("agemaximum")=="None",when(col("ageminimum")=="None",1).otherwise(when(col("ageminimum")<=floor(datediff(current_date(),col("datenaissance").cast(DateType()))/365.25),1).otherwise(0))).otherwise(when(col("agemaximum")>=floor(datediff(current_date(),col("datenaissance").cast(DateType()))/365.25),1).otherwise(0))).drop("datenaissance","ageminimum","agemaximum")
#df_sim_act_base=spark.createDataFrame()
#df_sim_comp_base=spark.createDataFrame()
#df_sim_act_comp_spec=spark.createDataFrame()
df_sim_act_base=similarite_act_base(df,df_ofr)
df_sim_comp_base=similarite_comp_base(df,df_ofr)
df_sim_act_comp_spec=similarite_act_comp_spec(df,df_ofr)
#ofr_size=len(df_ofr.columns)-2
ofr_size=13
df_Columns=["niveaumaitriseoutilinformatique","servicemilitaire"]
#df_ofr=df_ofr.withColumn("column_numbers",when((col("niveaumaitriseoutilinformatique")=="None")&(col("servicemilitaire")=="None"),ofr_size-2).otherwise(when((col("niveaumaitriseoutilinformatique")=="None")|(col("servicemilitaire")=="None"),ofr_size-1).otherwise(ofr_size)))
df_ofr2=df_ofr
df_ofr=df_ofr.withColumn("column_numbers",count_nones(df_ofr,df_Columns)).select("niveaumaitriseoutilinformatique","servicemilitaire","column_numbers")
df_ofr=df_ofr.withColumn("Nbr",ofr_size- col("column_numbers"))
df_ofr2=df_ofr.select("Nbr")
df_ofr2=df_ofr2.crossJoin(df.select("iddemandeur"))
#df_ofr2=df_ofr2.select("Nbr")
#df_ofr2.show(30)
#offre_size=df_ofr.select("column_numbers").collect()[0][0]
#offre_size
df_sim_metier=df_sim_metier.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
df_sim_salaire=df_sim_salaire.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_maitrise_info=df_sim_maitrise_info.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_instruction=df_sim_instruction.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_qualification=df_sim_qualification.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_permis=df_sim_permis.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_exp=df_sim_exp.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_melitaire=df_sim_melitaire.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_typecontrat=df_sim_typecontrat.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_age=df_sim_age.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_act_base=df_sim_act_base.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre","activitebase_ofr","activitebase_dm")
df_sim_comp_base=df_sim_comp_base.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre","comp_base_ofr","comp_base_dm")
df_sim_act_comp_spec=df_sim_act_comp_spec.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim1 = df_sim_metier.join(df_sim_salaire, on=["row_index"]).join(df_sim_maitrise_info, on=["row_index"]).join(df_sim_instruction, on=["row_index"]).join(df_sim_qualification, on=["row_index"]).join(df_sim_permis, on=["row_index"]).join(df_sim_exp, on=["row_index"]).join(df_sim_melitaire, on=["row_index"]).join(df_sim_typecontrat, on=["row_index"]).join(df_sim_age, on=["row_index"]).join(df_sim_act_base, on=["row_index"]).join(df_sim_comp_base, on=["row_index"]).join(df_sim_act_comp_spec, on=["row_index"]).drop("row_index")
#df_sim1.select("similarite_permis","similarite_exp").show()
df_sim1=df_sim1.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
df_ofr2=df_ofr2.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur")
df_sim1=df_ofr2.join(df_sim1, on=["row_index"]).drop("row_index")
#df_sim1.select("idoffre","iddemandeur","Nbr","Similarite_activitebase").show()
df_sim1=df_sim1.withColumn("SIMILARITE",(col("similarite_metier")+col("similarite_salaire")+col("similarite_informatique")+col("similarite_instruction")+col("similarite_qualification")+col("similarite_permis")+col("similarite_exp")+col("similarite_militaire")+col("similarite_contrat")+col("similarite_age")+col("similarite_act_comp_spec")+col("Similarite_activitebase")+col("Similarite_comp_base")) / col("Nbr"))
#df_sim1=df_sim1.withColumn("SIMILARITE",(col("similarite_metier")+col("similarite_salaire")+col("similarite_informatique")+col("similarite_instruction")+col("similarite_qualification")+col("similarite_permis")+col("similarite_exp")+col("similarite_militaire")+col("similarite_contrat")+col("similarite_age")+col("similarite_act_comp_spec")+col("Similarite_activitebase")+col("Similarite_comp_base")) / offre_size)
#resultat_elastic=df_sim1.collect()
"""
for row in resultat_elastic:
dict_score = {}
dict_score['iddemandeur']=row['iddemandeur']
dict_score['idoffre']=row['idoffre']
dict_score['similarite_metier']=row['similarite_metier']
dict_score['similarite_informatique']=row['similarite_informatique']
dict_score['similarite_instruction']=row['similarite_instruction']
dict_score['similarite_qualification']=row['similarite_qualification']
dict_score['similarite_permis']=row['similarite_permis']
dict_score['similarite_exp']=row['similarite_exp']
dict_score['similarite_militaire']=row['similarite_militaire']
dict_score['similarite_contrat']=row['similarite_contrat']
dict_score['similarite_age']=row['similarite_age']
dict_score['similarite_activitebase']=row['Similarite_activitebase']
dict_score['similarite_comp_base']=row['Similarite_comp_base']
dict_score['similarite_act_comp_spec']=row['similarite_act_comp_spec']
dict_score['similarite']=row['SIMILARITE']
dict_score['count_attributs']=row['Nbr']
es.index(index="test_score2",body=dict_score)
print ("\n\n -- SCORE FINAL ", dict_score['iddemandeur'],' : ',dict_score['similarite'])
print(dict_score['similarite_activitebase'])
print(dict_score['similarite_comp_base'])
print(dict_score['similarite_act_comp_spec'])
"""
end = timer()
知道我正在使用 Spark 3.1.2 并使用 Spark 结构化流进行流式处理。
有没有办法替换 monotonically_increasing_id() 以便此代码在流式传输中正常工作?
【问题讨论】:
标签: apache-spark pyspark apache-kafka apache-spark-sql spark-streaming