【发布时间】:2025-11-29 22:40:02
【问题描述】:
我想构建一个数据管道,对数据系列的行执行一系列操作。
大多数函数将在一行进一出的基础上工作,但其中一些操作会“扩展”系列 - 我的意思是一行将进入函数并且不止一行可能会作为该函数的结果生成。
我想建立一个足够强大的函数链来自己处理这种行为,而不必编写一堆监督代码。
使用yield 就像提供了一个机会——如果每个函数都消耗了前一个函数的产量,并自己充当生成器,那么我可以任意将这些格式良好的函数链接在一起——这会很好从优雅的角度来看。
这是我的设置代码,func_x 充当简单的 1-1 函数,func_y 进行扩展。
from collections import OrderedDict
data_source = [ OrderedDict({"id" : "1", "name" : "Tom", "sync" : "a"}),
OrderedDict({"id" : "2", "name" : "Steve", "sync" : "a"}),
OrderedDict({"id" : "3", "name" : "Ulrich", "sync" : "b"}),
OrderedDict({"id" : "4", "name" : "Victor", "sync" : "b"}),
OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c"}),
OrderedDict({"id" : "6", "name" : "Xavier", "sync" : "c"}),
OrderedDict({"id" : "7", "name" : "Yves", "sync" : "c"}),
OrderedDict({"id" : "8", "name" : "Zaphod", "sync" : "d"})]
def row_getter(source):
for content in source:
yield content.copy()
def func_x(row):
try:
q=next(row)
if q['name']=="Tom":
q['name']="Richard"
yield q.copy()
except StopIteration:
print ("Stop x")
def func_y(row):
try:
q=next(row)
for thingy in range(0,2):
q['thingy']=thingy
yield q.copy()
except StopIteration:
print ("Stop y")
rg = row_getter(data_source)
iter_func = func_y(func_x(rg))
现在,我可以通过迭代 iter_func 对象来获取第一组数据:
print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 0)])
再说一遍:
print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 1)])
再一次,虽然这一次,我没有看到 Steve 的记录(即流中的下一条记录,现在第一条记录上的 func_y 扩展已完成)我收到了 StopIteration 错误。
print (next(iter_func))
>> StopIteration Traceback (most recent call last)
<ipython-input-15-0fd1ed48c61b> in <module>()
----> 1 print (next(iter_func))
StopIteration:
所以我不明白这是从哪里来的,因为我试图在 func_x 和 func_y 中捕获这些。
【问题讨论】:
-
你不需要
rowgetter;它基本上只是重新实现iter。 -
func_x和func_y会不会故意修改data_source的内容? -
不,不是 - 松散地说,起始数据是一次性的 - 除了将其读入管道的开头之外,我不会引用它,但我应该在它之前清理它产生任何不需要的副作用。 [编辑]