不确定这是否是最快的方法,但是如果将其转换为 pandas 索引,则可以使用其交集方法。由于它在底层使用低级 c 代码,因此交集步骤可能非常快,但将其转换为 pandas 索引可能需要一些时间
import numpy as np
import pandas as pd
a = np.array([[2, 5], [6, 3], [4, 2], [1, 4]])
b = np.array([[2, 7], [4, 2], [1, 5], [6, 3]])
df_a = pd.DataFrame(a).set_index([0, 1])
df_b = pd.DataFrame(b).set_index([0, 1])
intersection = df_a.index.intersection(df_b.index)
结果如下所示
print(intersection.values)
[(6, 3) (4, 2)]
EDIT2:
出于好奇,我对这些方法进行了比较。现在有一个更大的索引列表。我将我的第一个索引方法与一个稍微改进的方法进行了比较,该方法不需要先创建数据帧,而是立即创建索引,然后再与提出的数据帧合并方法进行比较。
这是代码
from random import randint, seed
import time
import numpy as np
import pandas as pd
seed(0)
n_tuple = 100000
i_min = 0
i_max = 10
a = [[randint(i_min, i_max), randint(i_min, i_max)] for _ in range(n_tuple)]
b = [[randint(i_min, i_max), randint(i_min, i_max)] for _ in range(n_tuple)]
np_a = np.array(a)
np_b = np.array(b)
def method0(a_array, b_array):
index_a = pd.DataFrame(a_array).set_index([0, 1]).index
index_b = pd.DataFrame(b_array).set_index([0, 1]).index
return index_a.intersection(index_b).to_numpy()
def method1(a_array, b_array):
index_a = pd.MultiIndex.from_arrays(a_array.T)
index_b = pd.MultiIndex.from_arrays(b_array.T)
return index_a.intersection(index_b).to_numpy()
def method2(a_array, b_array):
df_a = pd.DataFrame(a_array)
df_b = pd.DataFrame(b_array)
return df_a.merge(df_b).to_numpy()
def method3(a_array, b_array):
set_a = {(_[0], _[1]) for _ in a_array}
set_b = {(_[0], _[1]) for _ in b_array}
return set_a.intersection(set_b)
for cnt, intersect in enumerate([method0, method1, method2, method3]):
t0 = time.time()
if cnt < 3:
intersection = intersect(np_a, np_b)
else:
intersection = intersect(a, b)
print(f"method{cnt}: {time.time() - t0}")
输出如下:
method0: 0.1439347267150879
method1: 0.14012742042541504
method2: 4.740894317626953
method3: 0.05933070182800293
结论:dataframes的merge方法(method2)比在索引上使用交集要慢50倍左右。基于multiindex(method1)的版本只比method0快一点(我的第一个提议)
EDIT2:正如@AKX 的评论所建议的那样:如果你不使用 numpy 而使用纯列表和集合,你可以再次获得大约 3 倍的加速。但很明显你不应该使用合并方法.