【问题标题】:NotImplementedError apache beam pythonNotImplementedError apache 梁 python
【发布时间】:2019-09-11 02:34:01
【问题描述】:

我正在使用 apache 梁将 json 写入 gcs。但是遇到如下错误

NotImplementedError: offset: 0, whence: 0, position: 50547, last: 50547 [while running 'Writing new data to gcs/write data gcs/Write/WriteImpl/WriteBundles/WriteBundles']

不知道为什么会出现这个错误。代码如下:

class WriteDataGCS(beam.PTransform):
        """
        To write data to GCS
        """

        def __init__(self, bucket):
            """
            Initiate the bucket as a class field

            :type bucket:string
            :param bucket: query to be run for data
            """
            self.bucket = bucket

        def expand(self, pcoll):
            """
            PTransform Method run when called on Class Name

            :type pcoll: PCollection
            :param pcoll: A pcollection
            """
            (pcoll | "print intermediate" >> beam.Map(print_row))
            return (pcoll | "write data gcs" >> beam.io.WriteToText(self.bucket, coder=JsonCoder(), file_name_suffix=".json"))
class JsonCoder:
    """
    This class represents dump and load operations performed on json
    """
    def encode(self,data):
        """
        Encodes the json data.

        :type data: string
        :param data: Data to be encoded
        """
        # logger.info("JSON DATA for encoding - {}".format(data))
        return json.dumps(data,default=str)

    def decode(self,data):
        """
        Decodes the json data.

        :type data: string
        :param data: Data to be decoded
        """
        # logger.info("JSON DATA for decoding - {}".format(data))
        return json.loads(data)

【问题讨论】:

  • user@beam.apache.com 可能是这个问题的更好的地方。

标签: python python-2.7 apache-beam apache-beam-io


【解决方案1】:

WriteToTextcoder 参数需要一个 apache_beam.coders.Coder 实例。您可以尝试让您的 JsonCoder 从基类 Coder 继承,但我认为您也可以使用 Map 将数据转换为字符串:

        def expand(self, pcoll):
            """
            PTransform Method run when called on Class Name

            :type pcoll: PCollection
            :param pcoll: A pcollection
            """
            return (pcoll
              | "print intermediate" >> beam.Map(print_row))
              | "to_json" >> beam.Map(lambda x: json.dumps(x, default=str)))
              | "write data gcs" >> beam.io.WriteToText(self.bucket, file_name_suffix=".json"))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-01-20
    • 2023-03-07
    • 2018-09-07
    • 1970-01-01
    • 2020-01-10
    • 2018-07-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多