【问题标题】:Mask/replace inner part of string column in Pyspark在 Pyspark 中屏蔽/替换字符串列的内部
【发布时间】:2020-02-25 01:29:41
【问题描述】:

我在数据框中有一个电子邮件列,我想用星号替换其中的一部分。我无法使用 PySpark 函数解决这个问题。

我的电子邮件栏可能是这样的"

email_col
abc123@gmail.com
123abc123@yahoo.com

我想要实现的是:

mod_email_col
ab**23@gmail.com
12*****23@yahoo.com

所以基本上除了前 2 个字符和最后 2 个字符之外,我希望将剩余部分替换为星号。

这是我尝试过的

from pyspark.sql import functions as F

split_email = F.split(df.email_address, "@")
df = df.withColumn('email_part', split_email.getItem(0))
df = df.withColumn('start', df.email_part.substr(0,2))
df = df.withColumn('end', df.email_part.substr(-2,2))

df.withColumn(
    'masked_part', 
     F.expr("regexp_replace(email_part, email_part[email_part.index(start)+len(start):email_part.index(end)], '*')")
).show(n=5)

【问题讨论】:

标签: apache-spark pyspark apache-spark-sql regexp-replace


【解决方案1】:

我认为您可以借助以下正则表达式来实现这一点:(?<=.{2})\w+(?=.{2}@)

  • (?<=.{2}): 正面向后看 2 个字符
  • \w+: 任意单词字符
  • (?=.{2}@):2 个字符的正向前瞻,后跟文字 @

首先使用regexp_extract 从您的字符串中提取此模式。

from pyspark.sql.functions import regexp_extract, regexp_replace

df = df.withColumn(
    "pattern", 
    regexp_extract("email", r"(?<=.{2})\w+(?=.{2}@)", 0)
)
df.show()
#+-------------------+-------+
#|              email|pattern|
#+-------------------+-------+
#|   abc123@gmail.com|     c1|
#|123abc123@yahoo.com|  3abc1|
#|      abcd@test.com|       |
#+-------------------+-------+

然后使用regexp_replace 创建相同长度的* 的替换。

df = df.withColumn(
    "replacement",
    regexp_replace("pattern", r"\w", "*")
)
df.show()
#+-------------------+-------+-----------+
#|              email|pattern|replacement|
#+-------------------+-------+-----------+
#|   abc123@gmail.com|     c1|         **|
#|123abc123@yahoo.com|  3abc1|      *****|
#|      abcd@test.com|       |           |
#+-------------------+-------+-----------+

接下来使用派生的patternreplacement 列在原始email 列上再次使用regexp_replace

为了安全起见,concat 在替换时从原始模式向后/向前看。为此,我们必须使用expr 才能使用pass the column values as parameters

from pyspark.sql.functions import concat, expr, lit

df = df.withColumn(
    "mod_email_col",
    expr("regexp_replace(email, concat('(?<=.{2})', pattern, '(?=.{2}@)'), replacement)")
)
df.show()
#+-------------------+-------+-----------+-------------------+
#|              email|pattern|replacement|      mod_email_col|
#+-------------------+-------+-----------+-------------------+
#|   abc123@gmail.com|     c1|         **|   ab**23@gmail.com|
#|123abc123@yahoo.com|  3abc1|      *****|12*****23@yahoo.com|
#|      abcd@test.com|       |           |      abcd@test.com|
#+-------------------+-------+-----------+-------------------+

最后删除中间列:

df = df.drop("pattern", "replacement")
df.show()
#+-------------------+-------------------+
#|              email|      mod_email_col|
#+-------------------+-------------------+
#|   abc123@gmail.com|   ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#|      abcd@test.com|      abcd@test.com|
#+-------------------+-------------------+

注意:我添加了一个测试用例来表明如果电子邮件地址部分为 4 个字符或更少,这将不起作用。


更新:这里有一些方法可以处理电子邮件地址部分少于 4 个字符的边缘情况。

我使用的规则:

  • 电子邮件地址长度超过 5:按上述操作
  • 电子邮件地址长度为 3、4 或 5:保留第一个和最后一个字符,用 * 屏蔽其他字符
  • 电子邮件地址长度为 1 或 2:屏蔽 @ 之前的单个字符

代码:

patA = "regexp_replace(email, concat('(?<=.{2})', pattern, '(?=.{2}@)'), replacement)"
patB = "regexp_replace(email, concat('(?<=.{1})', pattern, '(?=.{1}@)'), replacement)"

from pyspark.sql.functions import regexp_extract, regexp_replace
from pyspark.sql.functions import concat, expr, length, lit, split, when

df.withColumn("address_part", split("email", "@").getItem(0))\
.withColumn(
    "pattern", 
    when(
        length("address_part") > 5, 
        regexp_extract("email", r"(?<=.{2})\w+(?=.{2}@)", 0)
    ).otherwise(
        regexp_extract("email", r"(?<=.{1})\w+(?=.{1}@)", 0)
    )
).withColumn(
    "replacement", regexp_replace("pattern", r"\w", "*")
).withColumn(
    "mod_email_col",
    when(
        length("address_part") > 5, expr(patA)
    ).when(
        length("address_part") > 3, expr(patB)
    ).otherwise(regexp_replace('email', '\w(?=@)', '*'))
).drop("pattern", "replacement", "address_part").show()
#+-------------------+-------------------+
#|              email|      mod_email_col|
#+-------------------+-------------------+
#|   abc123@gmail.com|   ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#|     abcde@test.com|     a***e@test.com|
#|      abcd@test.com|      a**d@test.com|
#|        ab@test.com|        a*@test.com|
#|         a@test.com|         *@test.com|
#+-------------------+-------------------+

【讨论】:

  • 哇,这太漂亮了。太感谢了!如果我想包含一个只有 4 个字符的字符,比如取第一个字符和最后一个字符并替换两者之间的字符怎么办?
  • @ManasJani 在这种情况下使用 when 检查替换字符串的长度。如果它 > 0 这样做,否则使用不同的模式(未经测试,我认为这就像将 {2}s 更改为 {1}s 一样简单。您还必须处理电子邮件只有 2 个字符长的情况..
  • 所以如果我想保持动态,那么我将不得不使用 when 条件之类的东西?
  • @ManasJani 我发布了一个更新来处理你的边缘情况
  • 非常感谢你,真的帮助我更好地理解了正则表达式。如何将函数参数传递给上面的表达式?喜欢def mask_email(df, email_col):?将值传递给 email_col arg 时出现错误。
【解决方案2】:

您的问题可以使用一些字符串操作来简化(Spark SQL 函数:instr、concat、leftrepeatsubstr):

首先找到@在邮件字符串中的位置:pos_at = instr(email_col, '@'),那么用户名部分的长度就是pos_at - 1。如果我们将N=2作为要保留的字符数,那么要屏蔽的字符数应该是pos_at - 1 - 2*N,在代码中,我们有:

from pyspark.sql.functions import instr, expr

df = spark.createDataFrame(
        [(e,) for e in ['abc123@gmail.com', '123abc123@yahoo.com', 'abd@gmail.com']]
      , ['email_col']
)

# set N=2 as a parameter in the SQL expression
N = 2

df.withColumn('pos_at', instr('email_col', '@')) \
  .withColumn('new_col', expr("""
        CONCAT(LEFT(email_col,{0}), REPEAT('*', pos_at-1-2*{0}), SUBSTR(email_col, pos_at-{0}))
   """.format(N))).show(truncate=False)
#+-------------------+------+-------------------+
#|email_col          |pos_at|new_col            |
#+-------------------+------+-------------------+
#|abc123@gmail.com   |7     |ab**23@gmail.com   |
#|123abc123@yahoo.com|10    |12*****23@yahoo.com|
#|abd@gmail.com      |4     |abbd@gmail.com     |
#+-------------------+------+-------------------+

注意pos_at - 1 &lt;= 2*N时最后一行的问题,必须单独处理。如果我定义以下逻辑:

if `pos_at - 1 <= 2*N`:   keep the first char and mask the rest
otherwise: keep the original processing routine

整个处理过程可以包含在一个带有两个参数(column_nameN)的 lambda 函数中

# in the SQL expression, {0} is column_name and {1} is N
mask_email = lambda col_name, N: expr("""

  IF(INSTR({0}, '@') <= {1}*2+1
    , CONCAT(LEFT({0},1), REPEAT('*', INSTR({0}, '@')-2), SUBSTR({0}, INSTR({0}, '@')))
    , CONCAT(LEFT({0},{1}), REPEAT('*', INSTR({0}, '@')-1-2*{1}), SUBSTR({0}, INSTR({0}, '@')-{1}))
  ) as `{0}_masked`

""".format(col_name, N))

df.select('*', mask_email('email_col', 2)).show()
#+-------------------+-------------------+
#|          email_col|   email_col_masked|
#+-------------------+-------------------+
#|   abc123@gmail.com|   ab**23@gmail.com|
#|123abc123@yahoo.com|12*****23@yahoo.com|
#|      abd@gmail.com|      a**@gmail.com|
#+-------------------+-------------------+

【讨论】:

    猜你喜欢
    • 2019-04-04
    • 2021-12-13
    • 2017-12-15
    • 2014-09-07
    • 2016-08-30
    • 2019-11-20
    • 1970-01-01
    • 1970-01-01
    • 2012-02-20
    相关资源
    最近更新 更多