【发布时间】:2021-10-10 16:54:10
【问题描述】:
我有来自 LDAP 系统的 ldif 扩展文件。我能够轻松地在 python 中解析它并从文件中提取所需的数据并插入到 SQL 服务器中。我的示例 python 如下所示。
import os
from ldif3 import LDIFParser
import pymssql
parser = LDIFParser(open('temp.ldiff', 'rb'))
def return_dictionary_element_if_present(dict_entry, element):
if dict_entry.get(element):
return dict_entry.get(element)[0]
return ''
def add_new_user():
for dn, entry in parser.parse():
dict_entry = dict(entry)
email = return_dictionary_element_if_present(dict_entry,'email')
password = return_dictionary_element_if_present(dict_entry,'password')
#some code to insert into SQL server
add_new_user()
但是当我希望将其转换为数据流时,我无法理解要修改的内容和位置。我的数据流代码如下所示
class sqlserverwriteDoFn(beam.DoFn):
#insert statement
class CreateEntities(beam.DoFn):
def process(self, element):
#figure out how to return dictionary if parsed correctly
return [{"email": email, "password": password}]
def dataflow(input_file, pipeline_options):
print("starting")
options = GoogleCloudOptions.from_dictionary(pipeline_options)
with beam.Pipeline(options=options) as p:
(p | 'Reading Ldif data from GCS' >> beam.io.ReadFromText(input_file)
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Insert data to SQLSERVER' >> beam.ParDo(sqlserverwriteDoFn(pipeline_options['project']))
)
我认为 ReadFromText 将每一行转换为 pcollection 在我的情况下不起作用。示例 ldif 文件如下所示
dn: uid=12345,ab=users,xy=random
phone: 111
address: someaddress
email: true
username:abc
password:abc
dn: uid=12345,ab=users,xy=random
objectClass: inetOrgPerson
objectClass: top
phone: 111
address: someaddress
email: true
username:abcd
password:abcd
任何想法都非常感谢,因为我希望从 LDIF 文件中导入 5000 万个用户名和密码,并且绝对无法扩展简单的 python for 循环。
[Edit1] 根据 cmets,修改代码并出现其他错误
def return_dictionary_element_if_present(dict_entry, element):
if dict_entry.get(element):
return dict_entry.get(element)[0]
return ''
class CreateEntities(beam.DoFn):
def process(self, file):
parser = LDIFParser(open(file, 'rb'))
arr=[]
for dn, entry in parser.parse():
dict1 ={}
dict_entry = dict(entry)
email = return_dictionary_element_if_present(dict_entry,'email')
password = return_dictionary_element_if_present(dict_entry,'password')
dict1['email'] = email
dict1['password'] = password
arr.append(dict1)
return arr
def dataflow(pipeline_options):
print("starting")
options = GoogleCloudOptions.from_dictionary(pipeline_options)
with beam.Pipeline(options=options) as p:
(p | 'Reading data from GCS' >> MatchFiles(file_pattern="temp.ldiff")
| 'file match' >> ReadMatches()
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'print to screen' >> beam.Map(print)
)
出现以下错误
File "dataflow.py", line 26, in process
parser = LDIFParser(open(file, 'rb'))
TypeError: expected str, bytes or os.PathLike object, not ReadableFile [while running 'Create entities']
编辑2 改了一行python代码如下
parser = LDIFParser(file)
遇到这个错误
File "dataflow.py", line 28, in process
for dn, entry in parser.parse():
File "C:\Users\sande\anaconda3\envs\saopaulo\lib\site-packages\ldif3.py", line 383, in parse
for block in self._iter_blocks():
File "C:\Users\sande\anaconda3\envs\saopaulo\lib\site-packages\ldif3.py", line 282, in _iter_blocks
for line in self._iter_unfolded_lines():
File "C:\Users\sande\anaconda3\envs\saopaulo\lib\site-packages\ldif3.py", line 263, in _iter_unfolded_lines
line = self._input_file.readline()
AttributeError: 'ReadableFile' object has no attribute 'readline' [while running 'Create entities']
我应该如何更改我的代码以解决错误?
【问题讨论】:
标签: google-cloud-platform google-cloud-dataflow apache-beam