【问题标题】:dask + luigi: raise ValueError('url type not understood: %s' % urlpath)dask + luigi: raise ValueError('url type not understand: %s' % urlpath)
【发布时间】:2017-04-13 21:01:00
【问题描述】:

我正在尝试将 dask 与 luigi 合并, 虽然业务逻辑本身运行良好,但当我运行 Luigi 任务时代码开始抛出错误:

raise ValueError('url type not understood: %s' % urlpath)
ValueError: url type not understood: <_io.TextIOWrapper name='../data/2017_04_11_oldsource_geocoded.csv-luigi-tmp-1647603946' mode='wb' encoding='UTF-8'>

代码在这里(我删除了业务模型部分以使其更短):

import pandas as pd
import geopandas as gp
from geopandas.tools import sjoin
from dask import dataframe as dd
from shapely.geometry import Point
from os import path
import luigi

class geocode_tweets(luigi.Task):
    boundaries = _load_geoboundaries()
    nyc = boundaries[0].unary_union

    def requires(self):
        return []

    def output(self):
        self.path = '../data/2017_04_11_oldsource_geocoded.csv'
        return luigi.LocalTarget(self.path)

    def run(self):
        df = dd.read_csv(path.join(data_dir, '2017_03_22_oldsource.csv'))
        df['geometry'] = df.apply(_get_point, axis=1)
        meta = _form_meta(df)

        S = df.map_partitions(
            distributed_sjoin, boundaries=self.boundaries,
            nyc_border=self.nyc, meta=meta).drop('geometry', axis=1)

        f = self.output().open('w')
        S.to_csv(f)
        f.close()

问题似乎出在输出部分

据我了解,问题在于 dask 不喜欢将 Luigi 文件对象作为字符串的替代品。

【问题讨论】:

  • 我怀疑如果您能够生成mcve,您会得到更好的答案

标签: dask luigi


【解决方案1】:

Dask 定义了DataFrame.to_csv(filename, **kwargs),并且您正在向它发送文件而不是文件名。将最后三行替换为:

S.to_csv(self.output().path)

【讨论】:

    猜你喜欢
    • 2019-03-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-12
    • 1970-01-01
    • 2022-07-28
    相关资源
    最近更新 更多