【问题标题】:Large JSON while exporting causing Memory Issues导出时导致内存问题的大型 JSON
【发布时间】:2019-03-25 21:28:49
【问题描述】:

问题:

我有一个 API,它从弹性多个索引中获取数据并将其组合成一个 JSON 记录,并在调用 API 时返回。此外,从 API 获取的结果通常是巨大的。

所以我有一个包装脚本,可以从 API 获取一天的所有数据。但是在我的代码中,我有一个名为results 的数组,当当天的数据较少时,我没有遇到问题。但是当一天获取的数据很大时,整个数组都在 RAM 中,导致系统变慢。

我创建这个数组的主要目的是在另一个网络中导出一个 mongo,我可以直接从我的网络中复制它。

代码片段:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division, print_function, absolute_import
import argparse
import sys
import logging
import MySQLdb
import requests
import json
import time



_logger = logging.getLogger(__name__)


def get_samples(date,end):
    """
    Get Samples hashes form Database

    :param date: date of sample arrival
    :return list_of_hashes
    """
    try:
        results = []
        cur_time = time.time()
        with open('config.json','r') as c:
            config = json.load(c)
        _logger.info('Entering into database {}'.format(date))
        connection = MySQLdb.connect(config['malware_mysql'],"root","root","meta")
        cursor = connection.cursor()
        cursor.execute("SELECT MD5 from some where `Last_Seen` BETWEEN '{} 00:00:00' AND '{} 23:59:59'".format(date,end))
        hashes = cursor.fetchall()
        for hash in hashes:
            _logger.info('Hash {}'.format(hash[0]))
            try:
                response = requests.get('http://{}:{}/some/{}'.format(config['a'],config['b'],hash[0]))
                _logger.info('Result from API {}'.format(response))
                if response.status_code == 200:
                    results.append(json.loads(response.text))
                else:
                    _logger.error('Error in Querying API {} for hash {}'.format(response.status_code,hash))
            except Exception as e:
                _logger.error('Error in querying database {} - {}'.format(hash,e))
        connection.close()
        with open('{}_{}.json'.format(date,end),'w') as f:
            f.write(json.dumps(results))
    except KeyboardInterrupt:
        print('Bye')
    except Exception as e:
        _logger.error('Error in querying database final {}'.format(e))
    return '{} completed'.format(date)


def parse_args(args):
    """
    Parse command line parameters

    :param args: command line parameters as list of strings
    :return: command line parameters as :obj:`airgparse.Namespace`
    """
    parser = argparse.ArgumentParser(
        description="Enter date to Export")
    parser.add_argument(
        dest="date",
        help="Date of Sample Arrival in format 2018-08-16",
        )
    parser.add_argument(
        dest="end",
        help="Date of Sample Arrival end",
        )
    return parser.parse_args(args)


def main(args):
    args = parse_args(args)
    print("{} Samples are quiered -- {}".format(args.date, get_samples(args.date,args.end)))
    _logger.info("Script ends here")


def run():
    logging.basicConfig(level=logging.INFO, stream=sys.stdout)
    main(sys.argv[1:])


if __name__ == "__main__":
    run()

我为什么要这样做? 我想从 API 导出一整天的记录,并使用 mongoimport 将此 JSON 文件传输到 mongo。

我需要什么? 防止整个阵列位于 RAM 中并导致系统速度变慢的替代解决方案。使解决方案更高效的其他解决方案。

【问题讨论】:

  • 内存问题是由哪一部分引起的?从数据库加载?保存到 JSON 文件?用 json.loads 解析?

标签: python json mongodb api elasticsearch


【解决方案1】:

根据我收集的信息,您无法直接连接到您的 Mongo DB,对吗?你能在本地启动一个 MongoDB 吗?通过这种方式,您可以使用 Mongo Python 库在获取结果时保存结果,使用mongoexport 将它们提取为 JSON 文件,然后将它们导入您的最终数据库?

现在回到你的问题,这里有几个建议:

  • 在获得所需信息后立即关闭与 MySQL connection.close() 的连接,然后在 hashes = cursor.fetchall() 之后立即关闭
  • json.loads(response.text) 也被称为更好的 API response.json()
  • 您可以直接写入文件,而不是追加到内存中的results 列表中

把它们放在一起,没有键盘中断处理,只改变get_samples函数:

def get_samples(date, end):
    with open('{}_{}.json'.format(date, end), 'w') as out_file:
        out_file.write('[\n')
        with open('config.json','r') as c:
            config = json.load(c)
        _logger.info('Entering into database {}'.format(date))
        connection = MySQLdb.connect(config['malware_mysql'],"root","root","meta")
        cursor = connection.cursor()
        cursor.execute(
            "SELECT MD5 from some where `Last_Seen` BETWEEN '{} 00:00:00' AND '{} 23:59:59'".format(date, end)
        )
        hashes = cursor.fetchall()
        connection.close()
        for hash in hashes:
            _logger.info('Hash {}'.format(hash[0]))
            try:
                response = requests.get('http://{}:{}/some/{}'.format(config['a'],config['b'],hash[0]))
                _logger.info('Result from API {}'.format(response))
                if response.status_code == 200:
                    out_file.write(response.json() + ',\n')
                else:
                    _logger.error('Error in Querying API {} for hash {}'.format(response.status_code,hash))
            except Exception as e:
                _logger.error('Error in querying database {} - {}'.format(hash,e))
        out_file.write(']\n')

我没有尝试过这段代码,所以某处可能存在语法错误。希望这能让你离得足够近。

如果内存使用率仍然很高,请注意请求库有a streaming mode,这可能会进一步提供帮助。

【讨论】:

    猜你喜欢
    • 2015-11-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-10
    • 2020-10-18
    • 2017-03-14
    • 1970-01-01
    相关资源
    最近更新 更多