为了让您开始,这里有一个递归分支和绑定回溯 - 并且可能是详尽的 - 搜索。排序启发式可以对这些方法的效率产生巨大影响,但是如果没有大量的“现实生活”数据来进行测试,那么选择一个比另一个的基础很少。这包含了可能是最明显的排序规则。
因为它是一项正在进行中的工作,所以它会在执行过程中打印内容:找到的所有解决方案,无论何时遇到或超过当前最佳解决方案;以及在发生这种情况时提前中断搜索的索引(因为很明显,此时的部分解决方案无法扩展以满足或击败迄今为止已知的最佳完整解决方案)。
例如,
>>> crunch([[5, 6, 7], [8, 0, 3], [2, 8, 7], [8, 2, 3]])
展示
new best
L2[0:1] = [2] 2
L1[1:2] = [0] 2
L3[2:3] = [3] 5
sum 5
cut at 2
L2[0:1] = [2] 2
L1[1:3] = [0, 3] 5
sum 5
cut at 2
cut at 2
cut at 2
cut at 1
cut at 1
cut at 2
cut at 2
cut at 2
cut at 1
cut at 1
cut at 1
cut at 0
cut at 0
因此它找到了两种获得最小和 5 的方法,并且简单的排序启发式足够有效,以至于所有其他通向完整解决方案的路径都被提前切断。
def disp(lists, ixs):
from itertools import groupby
total = 0
i = 0
for k, g in groupby(ixs):
j = i + len(list(g))
chunk = lists[k][i:j]
total += sum(chunk)
print(f"L{k}[{i}:{j}] = {chunk} {total}")
i = j
def crunch(lists):
n = len(lists[0])
assert all(len(L) == n for L in lists)
# Start with a sum we know can be beat.
smallest_sum = sum(lists[0]) + 1
smallest_ixs = [None] * n
ixsofar = [None] * n
def inner(i, sumsofar, freelists):
nonlocal smallest_sum
assert sumsofar <= smallest_sum
if i == n:
print()
if sumsofar < smallest_sum:
smallest_sum = sumsofar
smallest_ixs[:] = ixsofar
print("new best")
disp(lists, ixsofar)
print("sum", sumsofar)
return
# Simple greedy heuristic: try available lists in the order
# of smallest-to-largest at index i.
for lix in sorted(freelists, key=lambda lix: lists[lix][i]):
L = lists[lix]
newsum = sumsofar
freelists.remove(lix)
# Try all slices in L starting at i.
for j in range(i, n):
newsum += L[j]
# ">" to find all smallest answers;
# ">=" to find just one (potentially faster)
if newsum > smallest_sum:
print("cut at", j)
break
ixsofar[j] = lix
inner(j + 1, newsum, freelists)
freelists.add(lix)
inner(0, 0, set(range(len(lists))))
蛮力有多糟糕?
不好。一种蛮力的计算方法:假设有 n 列表,每个列表都有 p 元素。代码的ixsofar 向量在range(n) 中包含p 整数。唯一的限制是出现在其中的任何整数的所有出现都必须是连续的。因此,计算此类向量总数的蛮力方法是生成 all p-tuples 并计算满足约束的数量。这是非常低效的,花费O(n**p) 时间,但真的很容易,很难出错:
def countb(n, p):
from itertools import product, groupby
result = 0
seen = set()
for t in product(range(n), repeat=p):
seen.clear()
for k, g in groupby(t):
if k in seen:
break
seen.add(k)
else:
#print(t)
result += 1
return result
对于小参数,我们可以将其用作对下一个函数的健全性检查,这是有效的。这建立在常见的“星形和条形”组合参数的基础上来推断结果:
def count(n, p):
# n lists of length p
# for r regions, r from 1 through min(p, n)
# number of ways to split up: comb((p - r) + r - 1, r - 1)
# for each, ff(n, r) ways to spray in list indices = comb(n, r) * r!
from math import comb, prod
total = 0
for r in range(1, min(n, p) + 1):
total += comb(p-1, r-1) * prod(range(n, n-r, -1))
return total
更快
以下是迄今为止我最好的代码。它为我之前发布的代码构建了更多“智能”。从某种意义上说,它非常有效。例如,对于随机的p = n = 20 输入,它通常在一秒钟内完成。这没什么好打喷嚏的,因为:
>>> count(20, 20)
1399496554158060983080
>>> _.bit_length()
71
也就是说,尝试所有可能的方法实际上会花费很长时间。要尝试的案例数甚至不适合 64 位 int。
另一方面,将n(列表数量)提升到 30,这可能需要一个小时。在 50 岁的时候,我还没有看到一个非人为的案例完成,即使让它在一夜之间运行。组合爆炸最终变得势不可挡。
OTOH,我正在寻找 最小的总和,句号。如果您需要在现实生活中解决此类问题,您要么需要更智能的方法,要么满足于迭代逼近算法。
注意:这仍然是一项正在进行的工作,因此尚未完善,并且会在进行过程中打印一些内容。大多数情况下,这已简化为运行一个“看门狗”线程,该线程每 10 分钟唤醒一次以显示 ixsofar 向量的当前状态。
def crunch(lists):
import datetime
now = datetime.datetime.now
start = now()
n = len(lists[0])
assert all(len(L) == n for L in lists)
# Start with a sum we know can be beat.
smallest_sum = min(map(sum, lists)) + 1
smallest_ixs = [None] * n
ixsofar = [None] * n
import threading
def watcher(stop):
if stop.wait(60):
return
lix = ixsofar[:]
while not stop.wait(timeout=600):
print("watch", now() - start, smallest_sum)
nlix = ixsofar[:]
for i, (a, b) in enumerate(zip(lix, nlix)):
if a != b:
nlix.insert(i,"--- " + str(i) + " -->")
print(nlix)
del nlix[i]
break
lix = nlix
stop = threading.Event()
w = threading.Thread(target=watcher, args=[stop])
w.start()
def inner(i, sumsofar, freelists):
nonlocal smallest_sum
assert sumsofar <= smallest_sum
if i == n:
print()
if sumsofar < smallest_sum:
smallest_sum = sumsofar
smallest_ixs[:] = ixsofar
print("new best")
disp(lists, ixsofar)
print("sum", sumsofar, now() - start)
return
# If only one input list is still free, we have to take all
# of its tail. This code block isn't necessary, but gives a
# minor speedup (skips layers of do-nothing calls),
# especially when the length of the lists is greater than
# the number of lists.
if len(freelists) == 1:
lix = freelists.pop()
L = lists[lix]
for j in range(i, n):
ixsofar[j] = lix
sumsofar += L[j]
if sumsofar >= smallest_sum:
break
else:
inner(n, sumsofar, freelists)
freelists.add(lix)
return
# Peek ahead. The smallest completion we could possibly get
# would come from picking the smallest element in each
# remaining column (restricted to the lists - rows - still
# available). This probably isn't achievable, but is an
# absolute lower bound on what's possible, so can be used to
# cut off searches early.
newsum = sumsofar
for j in range(i, n): # pick smallest from column j
newsum += min(lists[lix][j] for lix in freelists)
if newsum >= smallest_sum:
return
# Simple greedy heuristic: try available lists in the order
# of smallest-to-largest at index i.
sortedlix = sorted(freelists, key=lambda lix: lists[lix][i])
# What's the next int in the previous slice? As soon as we
# hit an int at least that large, we can do at least as well
# by just returning, to let the caller extend the previous
# slice instead.
if i:
prev = lists[ixsofar[i-1]][i]
else:
prev = lists[sortedlix[-1]][i] + 1
for lix in sortedlix:
L = lists[lix]
if prev <= L[i]:
return
freelists.remove(lix)
newsum = sumsofar
# Try all non-empty slices in L starting at i.
for j in range(i, n):
newsum += L[j]
if newsum >= smallest_sum:
break
ixsofar[j] = lix
inner(j + 1, newsum, freelists)
freelists.add(lix)
inner(0, 0, set(range(len(lists))))
stop.set()
w.join()
以 DP 为界
我从中获得了很多乐趣 :-) 这是他们可能正在寻找的方法,使用动态编程 (DP)。我有几个程序在“小”情况下运行得更快,但没有一个程序可以真正在非人为的 20x50 情况下竞争。运行时是O(2**n * n**2 * p)。是的,这不仅仅是n 的指数!但它仍然只是蛮力所需的一小部分(见上文),并且是一个硬上限。
注意:这只是一个循环嵌套机器大小的整数,并且没有使用“花哨”的 Python 功能。在 C 中重新编码很容易,它运行得更快。事实上,这段代码在 PyPy 下的运行速度提高了 10 倍以上(与标准 CPython 解释器相反)。
关键见解:假设我们从左到右,到达第 j 列,我们从中选择的最后一个列表是 D,在此之前我们从列表 A、B 和 C 中选择列。我们该如何继续?好吧,我们也可以从 D 中选择下一列,并且“已使用”集合 {A, B, C} 不会改变。或者我们可以选择其他列表 E,“已使用”集合更改为 {A, B, C, D},E 成为我们选择的最后一个列表。
现在,在所有这些情况下,我们如何达到状态“使用集合 {A,B,C} 和列 j 的最后一个列表 D”的详细信息对可能完成的集合没有影响.我们从每列中选择了多少列,或者使用 A、B、C 的顺序无关紧要:对未来选择重要的是 A、B 和 C 不能再次使用,而 D 可以be but - 如果是的话 - 必须立即使用。
由于达到此状态的所有方式都具有相同的可能完成,因此最便宜的完整解决方案必须具有达到此状态的最便宜的方式。
所以我们只是从左到右,一次一列,并为列中的每个状态记住到达该状态的最小总和。
这并不便宜,但它是有限的 ;-) 由于状态是行索引的子集,结合最后使用的列表(的索引),有2**n * n 可能的状态需要跟踪。事实上,只有一半,因为上面勾勒的方式从不包括已使用集合中最后使用列表的索引,但满足这一点可能会花费更多而不是节省。
按原样,这里的状态没有明确表示。取而代之的是,到目前为止,只有一个长度为2**n * n 的总和列表。状态由列表索引隐含:索引i 表示状态:
-
i >> n 是最后使用列表的索引。
-
i 的最后一个 n 位是一个位集,其中当且仅当列表索引 j 位于已用列表索引的集合中时,才会设置位 2**j。
例如,您可以通过 dicts 将 (frozenset, index) 对映射到总和来表示这些,但随后内存使用量激增,运行时缩放,并且 PyPy 在加速它方面变得不那么有效了。
可悲但真实:与大多数 DP 算法一样,这会找到“最佳”答案,但对如何达到它的记忆很少。添加代码以实现这一点比这里更难,并且通常会增加内存需求。这里可能最简单:在每次外循环迭代结束时将new 写入磁盘,每列一个文件。然后内存使用不受影响。完成后,可以以相反的顺序再次读回这些文件,并且稍微乏味的代码可以重建达到获胜状态所必须经过的路径,从最后一次向后工作一列。
def dumbdp(lists):
import datetime
_min = min
now = datetime.datetime.now
start = now()
n = len(lists)
p = len(lists[0])
assert all(len(L) == p for L in lists)
rangen = range(n)
USEDMASK = (1 << n) - 1
HUGE = sum(sum(L) for L in lists) + 1
new = [HUGE] * (2**n * n)
for i in rangen:
new[i << n] = lists[i][0]
for j in range(1, p):
print("working on", j, now() - start)
old = new
new = [HUGE] * (2**n * n)
for key, g in enumerate(old):
if g == HUGE:
continue
i = key >> n
new[key] = _min(new[key], g + lists[i][j])
newused = (key & USEDMASK) | (1 << i)
for i in rangen:
mask = 1 << i
if newused & mask == 0:
newkey = newused | (i << n)
new[newkey] = _min(new[newkey],
g + lists[i][j])
result = min(new)
print("DONE", result, now() - start)
return result