【问题标题】:Convert JSON file to AVRO将 JSON 文件转换为 AVRO
【发布时间】:2020-11-05 03:54:37
【问题描述】:

我是 AVRO 和 Python 的新手。我有一个用例,我想将 JSON 文件转换为 Avro 文件。我以 .avsc 格式存储了我的架构,以 .json 格式存储了 JSON 数据。现在我想将 JSON 文件和 .avsc 文件放在一起,并将 JSON 文件序列化为 Avro。下面是我的代码,我收到“avro.io.AvroTypeException: The datum file.json is not an example of schema”的错误。我不确定我做错了什么。在我的 writer.append 语句中,我希望脚本从 JSON 文件中获取数据并将其附加到 .avro 文件中,但以序列化格式。我不知道如何处理这个问题,任何帮助将不胜感激。

这是我的代码:

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from avro import schema, datafile, io
import json
from avro import schema, datafile, io


def json_to_avro():
        fo = open("laird.txt", "r")
        data = fo.readlines()
        final_header = []
        final_rec = []
        for header in data[0:1]:
            header = header.strip("\n")
            header = header.split(",")
            final_header = header
        for rec in data[1:]:
            rec = rec.strip("\n")
            rec = rec.split(" ")
            rec = ' '.join(rec).split()
            final_rec = rec
        final_dict = dict(zip(final_header,final_rec))
        # print(final_dict)
        json_dumps = json.dumps(final_dict, ensure_ascii=False)
        # print(json_dumps)
        schema = avro.schema.parse(open("laird.avsc", "rb").read())
        # print(schema)

        writer = DataFileWriter(open("laird.avro", "wb"), DatumWriter(), schema)

        with open("laird.json") as fp:
            contents = json.load(fp)
            print(contents)

        writer.append(contents)

        writer.close()

json_to_avro()

laird.avsc 文件中的数据:

{
  "name": "MyClass",
  "type": "record",
  "namespace": "com.acme.avro",
  "fields": [
    {
      "name": "event_type",
      "type": "string"
    },
    {
      "name": "event_data",
      "type": {
        "name": "event_data",
        "type": "record",
        "fields": [
          {
            "name": "device_id",
            "type": "string"
          },
          {
            "name": "user_id",
            "type": "string"
          },
          {
            "name": "payload",
            "type": {
              "type": "array",
              "items": {
                "name": "payload_record",
                "type": "record",
                "fields": [
                  {
                    "name": "name",
                    "type": "string"
                  },
                  {
                    "name": "sensor_id",
                    "type": "string"
                  },
                  {
                    "name": "type",
                    "type": "string"
                  },
                  {
                    "name": "unit",
                    "type": "string"
                  },
                  {
                    "name": "value",
                    "type": "float"
                  },
                  {
                    "name": "channel",
                    "type": "int"
                  },
                  {
                    "name": "timestamp",
                    "type": "long"
                  }
                ]
              }
            }
          },
          {
            "name": "client_id",
            "type": "string"
          },
          {
            "name": "hardware_id",
            "type": "string"
          },
          {
            "name": "timestamp",
            "type": "long"
          },
          {
            "name": "application_id",
            "type": "string"
          },
          {
            "name": "device_type_id",
            "type": "string"
          }
        ]
      }
    },
    {
      "name": "company",
      "type": {
        "name": "company",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": "int"
          },
          {
            "name": "address",
            "type": "string"
          },
          {
            "name": "city",
            "type": "string"
          },
          {
            "name": "country",
            "type": "string"
          },
          {
            "name": "created_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "industry",
            "type": "string"
          },
          {
            "name": "latitude",
            "type": "float"
          },
          {
            "name": "longitude",
            "type": "float"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "state",
            "type": "string"
          },
          {
            "name": "status",
            "type": "int"
          },
          {
            "name": "timezone",
            "type": "string"
          },
          {
            "name": "updated_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "user_id",
            "type": "string"
          },
          {
            "name": "zip",
            "type": "string"
          }
        ]
      }
    },
    {
      "name": "location",
      "type": {
        "name": "location",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": "int"
          },
          {
            "name": "address",
            "type": "string"
          },
          {
            "name": "city",
            "type": "string"
          },
          {
            "name": "country",
            "type": "string"
          },
          {
            "name": "created_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "industry",
            "type": "string"
          },
          {
            "name": "latitude",
            "type": "float"
          },
          {
            "name": "longitude",
            "type": "float"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "state",
            "type": "string"
          },
          {
            "name": "status",
            "type": "int"
          },
          {
            "name": "timezone",
            "type": "string"
          },
          {
            "name": "updated_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "user_id",
            "type": "string"
          },
          {
            "name": "zip",
            "type": "string"
          },
          {
            "name": "company_id",
            "type": "int"
          }
        ]
      }
    },
    {
      "name": "device_type",
      "type": {
        "name": "device_type",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": "string"
          },
          {
            "name": "application_id",
            "type": "string"
          },
          {
            "name": "category",
            "type": "string"
          },
          {
            "name": "codec",
            "type": "string"
          },
          {
            "name": "data_type",
            "type": "string"
          },
          {
            "name": "description",
            "type": "string"
          },
          {
            "name": "manufacturer",
            "type": "string"
          },
          {
            "name": "model",
            "type": "string"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "parent_constraint",
            "type": "string"
          },
          {
            "name": "proxy_handler",
            "type": "string"
          },
          {
            "name": "subcategory",
            "type": "string"
          },
          {
            "name": "transport_protocol",
            "type": "string"
          },
          {
            "name": "version",
            "type": "string"
          },
          {
            "name": "created_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "updated_at",
            "type": "int",
            "logicalType": "date"
          }
        ]
      }
    },
    {
      "name": "device",
      "type": {
        "name": "device",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": "int"
          },
          {
            "name": "thing_name",
            "type": "string"
          },
          {
            "name": "created_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "updated_at",
            "type": "int",
            "logicalType": "date"
          },
          {
            "name": "status",
            "type": "int"
          }
        ]
      }
    }
  ]
}

laird.json 中的数据

{
  "event_type": "uplink",
  "event_data": {
    "device_id": "fec5d310-88bf-11ea-aadf-2bd85a1038fa",
    "user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
    "payload": [
      {
        "name": "Temperature",
        "sensor_id": "fef87bd0-88bf-11ea-ae96-6fae9a4d5562",
        "type": "temp",
        "unit": "c",
        "value": 21.9,
        "channel": 3,
        "timestamp": 1603300033446
      },
      {
        "name": "Humidity",
        "sensor_id": "fef399d0-88bf-11ea-a424-59b141c8d9bf",
        "type": "rel_hum",
        "unit": "p",
        "value": 74,
        "channel": 4,
        "timestamp": 1603300033446
      },
      {
        "name": "Battery",
        "sensor_id": "feef2d00-88bf-11ea-aadf-2bd85a1038fa",
        "type": "batt",
        "unit": "p",
        "value": 100,
        "channel": 5,
        "timestamp": 1603300033446
      },
      {
        "name": "RSSI",
        "sensor_id": "fef658f0-88bf-11ea-ae96-6fae9a4d5562",
        "type": "rssi",
        "unit": "dbm",
        "value": -42,
        "channel": 100,
        "timestamp": 1603300033446
      },
      {
        "name": "SNR",
        "sensor_id": "",
        "type": "snr",
        "unit": "db",
        "value": 10.2,
        "channel": 101,
        "timestamp": 1603300033446
      }
    ],
    "client_id": "1c46a2e0-88b9-11ea-90ae-77c0364d6e36",
    "hardware_id": "0025ca0a00008612",
    "timestamp": 1603300033446,
    "application_id": "shipcomwireless",
    "device_type_id": "16a4d0c0-3edf-11e9-bf5e-a180edbfa9bb"
  },
  "company": {
    "id": 6854,
    "address": "10500 University Center Dr",
    "city": "Tampa",
    "country": "United States",
    "created_at": "2020-04-27T19:44:46Z",
    "industry": "[\"Health Care\"]",
    "latitude": 28.045853,
    "longitude": -82.421,
    "name": "Dermpath Diagnostics Bay Area",
    "state": "FL",
    "status": 0,
    "timezone": "America/New_York",
    "updated_at": "2020-04-27T19:44:46Z",
    "user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
    "zip": "33612"
  },
  "location": {
    "id": 8138,
    "address": "10500 University Center Dr",
    "city": "Tampa",
    "country": "United States",
    "created_at": "2020-04-27T19:44:46Z",
    "industry": "[\"Health Care\"]",
    "latitude": 28.045853,
    "longitude": -82.421,
    "name": "Dermpath Diagnostics Bay Area",
    "state": "FL",
    "status": 0,
    "timezone": "America/New_York",
    "updated_at": "2020-10-19T18:22:19Z",
    "user_id": "ba5e224b-fcc2-4e1b-b2b3-97ef57b9ba60",
    "zip": "33612",
    "company_id": 6854
  },
  "device_type": {
    "id": "16a4d0c0-3edf-11e9-bf5e-a180edbfa9bb",
    "application_id": "",
    "category": "module",
    "codec": "lorawan.laird.rs1xx",
    "data_type": "",
    "description": "Temp Sensor",
    "manufacturer": "Laird",
    "model": "RS1xx",
    "name": "Laird Temp & Humidity RS1xx Sensor - IoT in a Box",
    "parent_constraint": "NOT_ALLOWED",
    "proxy_handler": "PrometheusClient",
    "subcategory": "lora",
    "transport_protocol": "lorawan",
    "version": "",
    "created_at": "2019-03-05T00:39:09Z",
    "updated_at": "2019-03-05T00:39:09Z"
  },
  "device": {
    "id": 217200,
    "thing_name": "Slide/Block Warehouse STE250",
    "created_at": "2020-04-27T19:47:58Z",
    "updated_at": "2020-04-28T12:58:08Z",
    "status": 0
  }
}

【问题讨论】:

    标签: python json avro


    【解决方案1】:

    你可以试试 fastavro 和 rec_avro 模块,这里有一些例子

    from fastavro import writer, reader, schema
    from rec_avro import to_rec_avro_destructive, from_rec_avro_destructive, rec_avro_schema
    
    def json_objects():
        return [{'a': 'a'}, {'b':'b'}]
    
    # For efficiency, to_rec_avro_destructive() destroys rec, and reuses it's
    # data structures to construct avro_objects 
    avroObjects = (to_rec_avro_destructive(rec) for rec in json_objects())
    
    # store records in avro
    with open('json_in_avro.avro', 'wb') as x:
        writer(x, schema.parse_schema(rec_avro_schema()), avroObjects)
    
    #load records from avro
    with open('json_in_avro.avro', 'rb') as f_in:
        # For efficiency, from_rec_avro_destructive(rec) destroys rec, and 
        # reuses it's data structures to construct it's output
        loadedJson = [from_rec_avro_destructive(rec) for rec in reader(f_in)]
    
    assert loadedJson == json_objects()
    

    【讨论】:

    • 谢谢@batman。我会试试看。因此,如果我理解正确,我应该在从 json 文件中获取架构后使用上述函数。对不起,如果我听起来很傻,但我对 Python 和 Avro 的了解有限。
    【解决方案2】:

    更新:

    有两个问题。第一个是在模式中。您目前拥有以下物品的任何地方:

          {
            "name": "created_at",
            "type": "int",
            "logicalType": "date"
          },
    

    应该是这样的:

          {
            "name": "created_at",
            "type": {"type": "int", "logicalType": "date"}
          },
    

    修复该问题后,另一个问题在于您的数据。当您读入 JSON 文件时,created_atupdated_at 字段是字符串,但它们必须是 datetime.date 对象。您可以对它们中的每一个执行此操作,如下所示:

    from datetime import date
    contents["company"]["created_at"] = date.fromisoformat(contents["company"]["created_at"].split("T")[0])
    

    原文:

    writer.append 函数不接受 JSON 文件的名称。相反,它接受 JSON 对象。所以而不是:

    writer.append("laird.json") 
    

    你会想要这样的:

    with open("laird.json") as fp:
        contents = json.load(fp)
    
    writer.append(contents)
    

    【讨论】:

    • 感谢您的回复,所以我尝试了您在我的代码中建议的代码 sn-p,如下所示,但我收到错误 schema = avro.schema.parse(open("laird.avsc", "rb").read()) print(schema) writer = DataFileWriter(open("laird.avro", "wb"), DatumWriter(), schema) with open("laird.json") as fp: contents = json.load(fp) writer.append(contents) writer.close() json_to_avro()
    • 错误:文件“C:\main.py”,第 39 行,在 json_to_avro() 文件“C:\main.py”,第 35 行,在 json_to_avro writer.append(contents ) 文件“C:Python\Python39\lib\site-packages\avro\datafile.py”,第 227 行,附加 self.datum_writer.write(datum, self.buffer_encoder) 文件“C:\Python\Python39\lib\ site-packages\avro\io.py",第 979 行,写入引发 AvroTypeException(self.writers_schema, datum) avro.io.AvroTypeException: The datum {'event_type': 'uplink', 'event_data': {'device_id' :
    • 您应该使用 laird.avsc 和 laird.json 的实际内容更新您的问题,以便可以运行示例。您现在看到的异常似乎是说您的数据与架构不同,但回溯已被切断,因此无法知道问题所在。
    • 我已经用代码和文件更新了问题。你能帮我调试这个问题吗?我真的不知道我做错了什么。谢谢!!
    • 非常感谢,斯科特。将尝试您的建议。
    猜你喜欢
    • 2020-07-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-13
    • 2020-05-20
    • 2020-10-01
    • 2020-02-10
    相关资源
    最近更新 更多