我主要使用pyspark,但你可以适应你的环境
## could add some conditional logic or just always output 2 data frames where
## one would be empty
print("pdf - two dataframe")
## create pandas dataframe
pdf = pd.DataFrame({'col1':['abc','abc','cde','cde'],'col2':[9,7,4,3],'col3':[40,50,20,25],'col4':['A','A','B','B']})
print( pdf )
## move it to spark
print("sdf")
sdf = spark.createDataFrame(pdf)
sdf.show()
# +----+----+----+----+
# |col1|col2|col3|col4|
# +----+----+----+----+
# | abc| 9| 40| A|
# | abc| 7| 50| A|
# | cde| 4| 20| B|
# | cde| 3| 25| B|
# +----+----+----+----+
## filter
pl = sdf.filter('col3 <= 30')\
.groupBy("col1","col4").agg(F.sum('col2').alias('sumC2'))
pr = sdf.filter('col3 > 30')\
.groupBy("col1","col4").agg(F.sum('col2').alias('sumC2'))
print("pl")
pl.show()
# +----+----+-----+
# |col1|col4|sumC2|
# +----+----+-----+
# | cde| B| 7|
# +----+----+-----+
print("pr")
pr.show()
# +----+----+-----+
# |col1|col4|sumC2|
# +----+----+-----+
# | abc| A| 16|
# +----+----+-----+
print("pdf - one dataframe")
## create pandas dataframe
pdf = pd.DataFrame({'col1':['abc','abc','cde','cde'],'col2':[9,7,4,3],'col3':[11,29,20,25],'col4':['A','A','B','B']})
print( pdf )
## move it to spark
print("sdf")
sdf = spark.createDataFrame(pdf)
sdf.show()
# +----+----+----+----+
# |col1|col2|col3|col4|
# +----+----+----+----+
# | abc| 9| 11| A|
# | abc| 7| 29| A|
# | cde| 4| 20| B|
# | cde| 3| 25| B|
# +----+----+----+----+
pl = sdf.filter('col3 <= 30')\
.groupBy("col1","col4").agg( F.sum('col2').alias('sumC2') )
pr = sdf.filter('col3 > 30')\
.groupBy("col1","col4").agg(F.sum('col2').alias('sumC2'))
print("pl")
pl.show()
# +----+----+-----+
# |col1|col4|sumC2|
# +----+----+-----+
# | abc| A| 16|
# | cde| B| 7|
# +----+----+-----+
print("pr")
pr.show()
# +----+----+-----+
# |col1|col4|sumC2|
# +----+----+-----+
# +----+----+-----+
通过动态均值过滤
print("pdf - filter by mean")
## create pandas dataframe
pdf = pd.DataFrame({'col1':['abc','abc','cde','cde'],'col2':[9,7,4,3],'col3':[40,50,20,25],'col4':['A','A','B','B']})
print( pdf )
## move it to spark
print("sdf")
sdf = spark.createDataFrame(pdf)
sdf.show()
# +----+----+----+----+
# |col1|col2|col3|col4|
# +----+----+----+----+
# | abc| 9| 40| A|
# | abc| 7| 50| A|
# | cde| 4| 20| B|
# | cde| 3| 25| B|
# +----+----+----+----+
w = Window.partitionBy("col1").orderBy("col2")
## add another column, the mean of col2 partitioned by col1
sdf = sdf.withColumn('mean_c2', F.mean('col2').over(w))
## filter by the dynamic mean
pr = sdf.filter('col2 > mean_c2')
pr.show()
# +----+----+----+----+-------+
# |col1|col2|col3|col4|mean_c2|
# +----+----+----+----+-------+
# | cde| 4| 20| B| 3.5|
# | abc| 9| 40| A| 8.0|
# +----+----+----+----+-------+