【问题标题】:Read custom input file(ldif) type/format in GCP DatafLow在 GCP DatafLow 中读取自定义输入文件(ldif)类型/格式
【发布时间】:2021-10-10 16:54:10
【问题描述】:

我有来自 LDAP 系统的 ldif 扩展文件。我能够轻松地在 python 中解析它并从文件中提取所需的数据并插入到 SQL 服务器中。我的示例 python 如下所示。

import os
from ldif3 import LDIFParser
import pymssql

parser = LDIFParser(open('temp.ldiff', 'rb'))

def return_dictionary_element_if_present(dict_entry, element):
    if dict_entry.get(element):
        return dict_entry.get(element)[0]
    return ''   

def add_new_user():
    for dn, entry in parser.parse():
        dict_entry = dict(entry)
        email = return_dictionary_element_if_present(dict_entry,'email')
        password = return_dictionary_element_if_present(dict_entry,'password')
        #some code to insert into SQL server
add_new_user()

但是当我希望将其转换为数据流时,我无法理解要修改的内容和位置。我的数据流代码如下所示

class sqlserverwriteDoFn(beam.DoFn):
    #insert statement 


class CreateEntities(beam.DoFn):
    def process(self, element):
        #figure out how to return dictionary if parsed correctly
        return [{"email": email, "password": password}]


def dataflow(input_file, pipeline_options):
    print("starting")
    options = GoogleCloudOptions.from_dictionary(pipeline_options)
    with beam.Pipeline(options=options) as p:
        (p | 'Reading Ldif data from GCS' >> beam.io.ReadFromText(input_file)
           | 'Create entities' >> beam.ParDo(CreateEntities())
           | 'Insert data to SQLSERVER' >> beam.ParDo(sqlserverwriteDoFn(pipeline_options['project']))
         )

我认为 ReadFromText 将每一行转换为 pcollection 在我的情况下不起作用。示例 ldif 文件如下所示

dn: uid=12345,ab=users,xy=random
phone: 111
address: someaddress
email: true
username:abc
password:abc


dn: uid=12345,ab=users,xy=random
objectClass: inetOrgPerson
objectClass: top
phone: 111
address: someaddress
email: true
username:abcd
password:abcd

任何想法都非常感谢,因为我希望从 LDIF 文件中导入 5000 万个用户名和密码,并且绝对无法扩展简单的 python for 循环。

[Edit1] 根据 cmets,修改代码并出现其他错误

def return_dictionary_element_if_present(dict_entry, element):
    if dict_entry.get(element):
        return dict_entry.get(element)[0]
    return ''

class CreateEntities(beam.DoFn):
    def process(self, file):
        parser = LDIFParser(open(file, 'rb'))
        arr=[]
        for dn, entry in parser.parse():
            dict1 ={}
            dict_entry = dict(entry)
            email = return_dictionary_element_if_present(dict_entry,'email')
            password = return_dictionary_element_if_present(dict_entry,'password')
            dict1['email'] = email
            dict1['password'] = password
            arr.append(dict1)
        return arr


def dataflow(pipeline_options):
    print("starting")
    options = GoogleCloudOptions.from_dictionary(pipeline_options)
    with beam.Pipeline(options=options) as p:
        (p | 'Reading data from GCS' >> MatchFiles(file_pattern="temp.ldiff")
           | 'file match' >> ReadMatches()
           | 'Create entities' >> beam.ParDo(CreateEntities())
           | 'print to screen' >> beam.Map(print)
         )

出现以下错误

  File "dataflow.py", line 26, in process
    parser = LDIFParser(open(file, 'rb'))
TypeError: expected str, bytes or os.PathLike object, not ReadableFile [while running 'Create entities']

编辑2 改了一行python代码如下

parser = LDIFParser(file)

遇到这个错误

  File "dataflow.py", line 28, in process
    for dn, entry in parser.parse():
  File "C:\Users\sande\anaconda3\envs\saopaulo\lib\site-packages\ldif3.py", line 383, in parse
    for block in self._iter_blocks():
  File "C:\Users\sande\anaconda3\envs\saopaulo\lib\site-packages\ldif3.py", line 282, in _iter_blocks
    for line in self._iter_unfolded_lines():
  File "C:\Users\sande\anaconda3\envs\saopaulo\lib\site-packages\ldif3.py", line 263, in _iter_unfolded_lines
    line = self._input_file.readline()
AttributeError: 'ReadableFile' object has no attribute 'readline' [while running 'Create entities']

我应该如何更改我的代码以解决错误?

【问题讨论】:

    标签: google-cloud-platform google-cloud-dataflow apache-beam


    【解决方案1】:

    你是对的,Python SDK 中的 TextIO 使用换行符作为分隔元素的分隔符。所以生成的每个元素都是输入文件的一行。

    在您的原始代码中,您已经有一个可以读取 LDIF 文件的解析器。您可以通过ParDo transform 在您的管道中使用它。我建议从 FileIO 开始创建 LDIF 文件的 PCollection,然后将它们用作您自己的 ParDo 的输入,ParDo 解析这些文件并输出您的记录。请注意,如果您想在 Dataflow 上使用现有解析器,您可能需要阅读 managing Beam Python dependencies,因为您的 Dataflow 工作人员需要访问该依赖项。

    【讨论】:

    • 感谢您回答问题。我已经根据您的建议更新了问题,现在我遇到了不同的错误。你能帮我调试一下吗?
    • @sandeep007 我建议阅读ReadableFile 的代码(ReadMatches 的输出类型)。我想你可能需要的是LDIFParser(file.open())
    • 感谢丹尼尔的时间和帮助。它现在工作。我可以在屏幕上打印。
    猜你喜欢
    • 1970-01-01
    • 2017-04-30
    • 2022-01-14
    • 2011-11-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-24
    相关资源
    最近更新 更多