【问题标题】:How do I improve the speed of this parser using python?如何使用 python 提高这个解析器的速度?
【发布时间】:2020-03-10 10:44:41
【问题描述】:

我目前正在解析瑞典公共交通网络的历史延误数据。从 1 月 27 日开始,我有大约 5700 个文件(每 15 秒一个),其中包含网络中正在运行的车辆的瞬时延迟数据。不幸的是,这是很多开销/重复数据,所以我想解析出相关的东西来对其进行可视化。

但是,当我尝试使用下面的脚本解析和过滤掉行程级别的相关延迟数据时,它的执行速度非常慢。它现在已经运行了超过 1.5 小时(在我的 2019 Macbook Pro 15' 上)并且还没有完成。

  • 如何优化/改进这个 python 解析器?
  • 或者我应该减少此任务的文件数量,即数据收集频率?

提前非常感谢您。 ????

from google.transit import gtfs_realtime_pb2
import gzip
import os
import datetime
import csv
import numpy as np

directory = '../data/tripu/27/'
datapoints = np.zeros((0,3), int)
read_trips = set()

# Loop through all files in directory
for filename in os.listdir(directory)[::3]:

    try:
        # Uncompress and parse protobuff-file using gtfs_realtime_pb2
        with gzip.open(directory + filename, 'rb') as file:
            response = file.read()
            feed = gtfs_realtime_pb2.FeedMessage()
            feed.ParseFromString(response)

            print("Filename: " + filename, "Total entities: " + str(len(feed.entity)))

            for trip in feed.entity:
                if trip.trip_update.trip.trip_id not in read_trips:

                    try:
                        if len(trip.trip_update.stop_time_update) == len(stopsOnTrip[trip.trip_update.trip.trip_id]):
                            print("\t","Adding delays for",len(trip.trip_update.stop_time_update),"stops, on trip_id",trip.trip_update.trip.trip_id)

                            for i, stop_time_update in enumerate(trip.trip_update.stop_time_update[:-1]):

                                # Store the delay data point (arrival difference of two ascending nodes)
                                delay = int(trip.trip_update.stop_time_update[i+1].arrival.delay-trip.trip_update.stop_time_update[i].arrival.delay)

                                # Store contextual metadata (timestamp and edgeID) for the unique delay data point
                                ts = int(trip.trip_update.stop_time_update[i+1].arrival.time)
                                key = int(str(trip.trip_update.stop_time_update[i].stop_id) + str(trip.trip_update.stop_time_update[i+1].stop_id))

                                # Append data to numpy array
                                datapoints = np.append(datapoints, np.array([[key,ts,delay]]), axis=0)

                            read_trips.add(trip.trip_update.trip.trip_id)
                    except KeyError:
                        continue
                else:
                    continue
    except OSError:
        continue

【问题讨论】:

  • 很难说,我强烈怀疑大部分时间都花在了ParseFromString,但无法仅从这段代码中知道。此外,readTrips 永远不会更新,因此您的"if ... not in readTrips:" 代码没有任何帮助。 (可能还想让read_trips 成为一个集合而不是一个列表以进行更优化的搜索,但我 99-44/100% 确定这不是您的性能瓶颈所在。)为了获得更好的响应,请发布一个小样本数据文件,以及ParseFromString 的代码。加上实际的分析也会很好。
  • @PaulMcG 感谢您的回复!我的错,我现在将 read_trip 添加为一组。上面编辑。看起来脚本现在开始时读取数据的速度非常快,然后速度变慢了很多。这有什么具体的线索吗? ParseFromString 来自google.transit。我也会上传一些数据。
  • 数据总大小是多少,可用内存是多少?

标签: python numpy parsing data-science gtfs


【解决方案1】:

我怀疑这里的问题是重复调用 np.append 向 numpy 数组添加新行。因为 numpy 数组在创建时的大小是固定的,所以np.append() 必须创建一个新数组,这意味着它必须复制之前的数组。在每个循环中,数组都更大,因此所有这些副本都会为您的执行时间增加一个二次因子。当数组很大时(显然它在您的应用程序中),这变得很重要。

作为替代方案,您可以创建一个普通的 Python 元组列表,然后在必要时将其转换为最后的完整 numpy 数组。

即(仅修改后的行):

datapoints = []
# ...
                            datapoints.append((key,ts,delay))
# ...
npdata = np.array(datapoints, dtype=int)

【讨论】:

  • 很可能是您速度变慢的真正原因。
  • 非常感谢!这显着提高了性能。祝你有美好的一天。
  • 要确认此答案为解决方案,请单击答案左侧的复选标记以表示,并奖励回答者一些甜蜜,甜蜜,代表!
【解决方案2】:

我仍然认为解析例程是您的瓶颈(即使它确实来自 Google),但是所有那些 '.' 都让我死了! (而且它们确实会降低性能。)另外,我将您的 i, i+1 迭代转换为使用两个迭代器来压缩更新列表,这是一种更高级的列表工作方式。加上cur/next_update 的名字帮助我在你想引用一个与另一个时保持直截了当。最后,我删除了结尾的“else: continue”,因为无论如何你都处于 for 循环的末尾。

for trip in feed.entity:
    this_trip_update = trip.trip_update 
    this_trip_id = this_trip_update.trip.trip_id
    if this_trip_id not in read_trips:

        try:
            if len(this_trip_update.stop_time_update) == len(stopsOnTrip[this_trip_id]):
                print("\t", "Adding delays for", len(this_trip_update.stop_time_update), "stops, on trip_id",
                      this_trip_id)

                # create two iterators to walk through the list of updates
                cur_updates = iter(this_trip_update.stop_time_update)
                nxt_updates = iter(this_trip_update.stop_time_update)
                # advance the nxt_updates iter so it is one ahead of cur_updates
                next(nxt_updates)

                for cur_update, next_update in zip(cur_updates, nxt_updates):
                    # Store the delay data point (arrival difference of two ascending nodes)
                    delay = int(nxt_update.arrival.delay - cur_update.arrival.delay)

                    # Store contextual metadata (timestamp and edgeID) for the unique delay data point
                    ts = int(next_update.arrival.time)
                    key = "{}/{}".format(cur_update.stop_id, next_update.stop_id)

                    # Append data to numpy array
                    datapoints = np.append(datapoints, np.array([[key, ts, delay]]), axis=0)

                read_trips.add(this_trip_id)
        except KeyError:
            continue

此代码应该与您发布的代码相同,而且我也不真正期望获得重大的性能提升,但是当您在 6 个月后再次查看它时,这可能会更易于维护.

(这可能更适合 CodeReview,但我几乎从不去那里。)

【讨论】:

  • 原始数据显然在 Google Protobuf 中,解析例程是 protobuf 实现的一部分。它应该得到很好的优化,虽然我不清楚应用程序是使用本机 Python 解析器还是到 C++ 解析器的桥接器(这会更快,但只是线性的)。
  • 非常感谢@PaulMcG,明天早上我会把我的代码改成这个,我更喜欢它。祝你有美好的一天!
猜你喜欢
  • 1970-01-01
  • 2020-04-21
  • 1970-01-01
  • 2021-12-02
  • 1970-01-01
  • 2011-08-03
  • 1970-01-01
  • 2014-05-05
  • 1970-01-01
相关资源
最近更新 更多