【问题标题】:Reading the data written to s3 by Amazon Kinesis Firehose stream读取 Amazon Kinesis Firehose 流写入 s3 的数据
【发布时间】:2016-03-31 19:33:55
【问题描述】:

我正在将记录写入 Kinesis Firehose 流,该流最终由 Amazon Kinesis Firehose 写入 S3 文件。

我的记录对象看起来像

ItemPurchase {
    String personId,
    String itemId
}

写入S3的数据如下:

{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}

没有逗号分隔。

Json 数组中没有起始括号

[

Json 数组中没有结束括号

]

我想读取这个数据获取 ItemPurchase 对象的列表。

List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))

读取这些数据的正确方法是什么?

【问题讨论】:

    标签: json amazon-s3 amazon-kinesis amazon-kinesis-firehose


    【解决方案1】:

    令我惊讶的是,Amazon Firehose 以这种方式将 JSON 消息转储到 S3,并且不允许您设置分隔符或任何东西。

    最终,我发现解决问题的技巧是使用 JSON raw_decode 方法处理文本文件

    这将允许您读取一堆连接的 JSON 记录,它们之间没有任何分隔符。

    Python 代码:

    import json
    
    decoder = json.JSONDecoder()
    
    with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file:
    
        content = content_file.read()
    
        content_length = len(content)
        decode_index = 0
    
        while decode_index < content_length:
            try:
                obj, decode_index = decoder.raw_decode(content, decode_index)
                print("File index:", decode_index)
                print(obj)
            except JSONDecodeError as e:
                print("JSONDecodeError:", e)
                # Scan forward and keep trying to decode
                decode_index += 1
    

    【讨论】:

      【解决方案2】:

      我也遇到了同样的问题,我是这样解决的。

      1. 将“}{”替换为“}\n{”
      2. 行被“\n”分割。

        input_json_rdd.map(lambda x : re.sub("}{", "}\n{", x, flags=re.UNICODE))
                      .flatMap(lambda line: line.split("\n"))
        

      一个嵌套的 json 对象有几个“}”,所以用“}”分割行并不能解决问题。

      【讨论】:

      • 我考虑过做这样的事情,但我认为如果 JSON 对象中的一个字符串碰巧包含一个 }{ 那么这种技术就会失败。也许如果你遍历每个字符,如果你点击一个“”(表示输入或离开一个字符串)切换一个布尔值,计算你所在的对象的级别(在看到 { 字符串之外增加,在看到 } 之外减少string),然后将对象的结尾视为您的级别计数器再次达到 0 时。
      • 分隔符}{ 是有问题的,因为内部字符串可以在其中包含像这样的json:}{\"(带有转义引号),因此使用}{"作为分隔符会更好一些,因为内部字符串不能有引号
      • 为了以 Eran 的回答为基础,我使用否定的前瞻来解释}{ 出现在字符串末尾的情况:re.sub('}{"(?![,}])', '}\n{"', string)
      【解决方案3】:

      我也遇到了同样的问题。

      如果AWS 允许我们设置分隔符会更好,但我们可以自己做。

      在我的用例中,我一直在收听推文流,一旦收到一条新推文,我立即将其发送至Firehose

      这当然会导致无法解析的 1 行文件。

      所以,为了解决这个问题,我将推文的 JSON 与 \n 连接起来。 这反过来又让我使用一些可以在读取流内容时输出行的包,并轻松解析文件。

      希望对你有所帮助。

      【讨论】:

        【解决方案4】:

        我认为解决这个问题的最佳方法是首先创建一个格式正确的 json 文件,其中包含良好分离的 json 对象。就我而言,我在被推入消防软管的事件中添加了“,”。然后在s3中保存一个文件后,所有文件都将包含由一些分隔符分隔的json对象(在我们的例子中是逗号)。必须添加的另一件事是文件开头和结尾的“[”和“]”。然后你有一个包含多个 json 对象的正确 json 文件。现在可以解析它们了。

        【讨论】:

        • 这适用于 JSON,但不适用于更复杂的标记,例如 XML。如果每条记录都是 XML 文档,则需要对其进行解析并将根元素包装到新的 XML 文档和某种封闭元素中(我使用过&lt;array&gt;&lt;/array&gt;)。我目前正在尝试弄清楚如何以这种方式从 S3 中读取数据。
        【解决方案5】:

        如果 firehose 的输入源是 Analytics 应用程序,则此不带分隔符的串联 JSON 是引用 here 的已知问题。你应该有一个 here 的 lambda 函数,它在多行中输出 JSON 对象。

        【讨论】:

          【解决方案6】:

          使用这个简单的 Python 代码。

          input_str = '''{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}'''
          
          data_str = "[{}]".format(input_str.replace("}{","},{"))
          data_json = json.loads(data_str)
          

          然后(如果需要)转换为 Pandas。

          import pandas as pd   
          df = pd.DataFrame().from_records(data_json)
          print(df)
          

          这就是结果

          itemId personId
          0  i-111    p-111
          1  i-222    p-222
          2  i-333    p-333
          

          【讨论】:

            【解决方案7】:

            我使用转换 Lambda 在每条记录的末尾添加换行符

            def lambda_handler(event, context):
                output = []
            
                for record in event['records']:
            
                    # Decode from base64 (Firehose records are base64 encoded)
                    payload = base64.b64decode(record['data'])
            
                    # Read json as utf-8    
                    json_string = payload.decode("utf-8")
            
                    # Add a line break
                    output_json_with_line_break = json_string + "\n"
            
                    # Encode the data
                    encoded_bytes = base64.b64encode(bytearray(output_json_with_line_break, 'utf-8'))
                    encoded_string = str(encoded_bytes, 'utf-8')
            
                    # Create a deep copy of the record and append to output with transformed data
                    output_record = copy.deepcopy(record)
                    output_record['data'] = encoded_string
                    output_record['result'] = 'Ok'
            
                    output.append(output_record)
            
                print('Successfully processed {} records.'.format(len(event['records'])))
            
                return {'records': output}
            

            【讨论】:

              【解决方案8】:

              您可以通过计算括号找到每个有效的 JSON。假设文件以{ 开头,这个 python sn-p 应该可以工作:

              import json
              
              def read_block(stream):
                  open_brackets = 0
                  block = ''
                  while True:
                      c = stream.read(1)
                      if not c:
                          break
              
                      if c == '{':
                          open_brackets += 1
                      elif c == '}':
                          open_brackets -= 1
              
                      block += c
              
                      if open_brackets == 0:
                          yield block
                          block = ''
              
              
              if __name__ == "__main__":
                  c = 0
                  with open('firehose_json_blob', 'r') as f:
                      for block in read_block(f):
                          record = json.loads(block)
                          print(record)
              

              【讨论】:

              • 警告:这只是一个盲流阅读器,因此如果任何 JSON blob 包含恰好在其中包含转义括号的字符串,它将中断。
              【解决方案9】:

              如果有办法改变数据的写入方式,请用一行分隔所有记录。这样您就可以简单地逐行读取数据。如果没有,那么只需构建一个以“}”作为分隔符的扫描仪对象并使用扫描仪进行读取。这样就可以了。

              【讨论】:

                【解决方案10】:

                在 Spark 中,我们遇到了同样的问题。我们正在使用以下内容:

                from pyspark.sql.functions import *
                
                @udf
                def concatenated_json_to_array(text):
                  final = "["
                  separator = ""
                  
                  for part in text.split("}{"):
                    final += separator + part
                    separator = "}{" if re.search(r':\s*"([^"]|(\\"))*$', final) else "},{"
                      
                  return final + "]"
                
                
                def read_concatenated_json(path, schema):
                  return (spark.read
                          .option("lineSep", None)
                          .text(path)
                          .withColumn("value", concatenated_json_to_array("value"))
                          .withColumn("value", from_json("value", schema))
                          .withColumn("value", explode("value"))
                          .select("value.*"))  
                

                它的工作原理如下:

                1. 将数据读取为每个文件一个字符串(无分隔符!)
                2. 使用 UDF 引入 JSON 数组,并通过引入逗号分割 JSON 对象。 注意:注意不要破坏任何带有}{ 的字符串!
                3. 将带有架构的 JSON 解析为 DataFrame 字段。
                4. 将数组分解为单独的行
                5. 将值对象展开到列中。

                像这样使用它:

                from pyspark.sql.types import *
                
                schema = ArrayType(
                  StructType([
                    StructField("type", StringType(), True),
                    StructField("value", StructType([
                      StructField("id", IntegerType(), True),
                      StructField("joke", StringType(), True),
                      StructField("categories", ArrayType(StringType()), True)  
                    ]), True)
                  ])
                )
                
                path = '/mnt/my_bucket_name/messages/*/*/*/*/'
                df = read_concatenated_json(path, schema)
                

                我在这里写了更多细节和注意事项:Parsing JSON data from S3 (Kinesis) with Spark。不要只用}{ 分割,因为它会弄乱你的字符串数据!例如:{ "line": "a\"r}{t" }

                【讨论】:

                  【解决方案11】:

                  你可以使用下面的脚本。

                  如果流数据大小不超过您设置的缓冲区大小,则 s3 的每个文件都有一对括号([])和逗号。

                  import base64
                  
                  print('Loading function')
                  
                  
                  def lambda_handler(event, context):
                      output = []
                  
                      for record in event['records']:
                          print(record['recordId'])
                          payload = base64.b64decode(record['data']).decode('utf-8')+',\n'
                  
                          # Do custom processing on the payload here
                  
                          output_record = {
                              'recordId': record['recordId'],
                              'result': 'Ok',
                              'data': base64.b64encode(payload.encode('utf-8'))
                          }
                          output.append(output_record)
                  
                      last = len(event['records'])-1
                      print('Successfully processed {} records.'.format(len(event['records'])))
                      
                      start = '['+base64.b64decode(output[0]['data']).decode('utf-8')
                      end = base64.b64decode(output[last]['data']).decode('utf-8')+']'
                      
                      output[0]['data'] = base64.b64encode(start.encode('utf-8'))
                      output[last]['data'] = base64.b64encode(end.encode('utf-8'))
                      return {'records': output}
                  
                  

                  【讨论】:

                    【解决方案12】:

                    使用 JavaScript 正则表达式。

                    JSON.parse(`[${item.replace(/}\s*{/g, '},{')}]`);
                    

                    【讨论】:

                      猜你喜欢
                      • 2018-04-02
                      • 2016-02-03
                      • 2021-05-11
                      • 2019-09-06
                      • 1970-01-01
                      • 2020-07-26
                      • 1970-01-01
                      • 1970-01-01
                      • 1970-01-01
                      相关资源
                      最近更新 更多