【发布时间】:2016-12-29 18:16:51
【问题描述】:
我想使用 RxPy 打开一个 (csv) 文件并逐行处理该文件。我确切地设想有以下步骤
- 为流提供文件名
- 打开文件
- 逐行读取文件
- 删除以注释开头的行(例如 # ...)
- 应用 csv 阅读器
- 过滤符合某些条件的记录
到目前为止我有:
def to_file(filename):
f = open(filename)
return Observable.using(
lambda: AnonymousDisposable(lambda: f.close()),
lambda d: Observable.just(f)
)
def to_reader(f):
return csv.reader(f)
def print_rows(reader):
for row in reader:
print(row)
这行得通
Observable.from_(["filename.csv", "filename2.csv"])
.flat_map(to_file).**map**(to_reader).subscribe(print_rows)
这不是:ValueError: I/O operation on closed file
Observable.from_(["filename.csv", "filename2.csv"])
.flat_map(to_file).**flat_map**(to_rows).subscribe(print)
第二个不起作用,因为(请参阅https://github.com/ReactiveX/RxPY/issues/69)
当来自第一个平面图的 observables 被第二个平面图合并时,内部订阅将在它们完成时被释放。因此文件将被关闭,即使文件句柄被 on_next'ed 到由第二个平面图设置的新 observable 中。
知道如何实现: 比如:
Observable.from_(["filename.csv", "filename2.csv"]
).flat_map(to_file
).filter(comment_lines
).filter(empty_lines
).map(to_csv_reader
).filter(filter_by.. )
).do whatever
非常感谢您的帮助
于尔根
【问题讨论】: