【问题标题】:improving python code in Monte Carlo simulation在蒙特卡罗模拟中改进 python 代码
【发布时间】:2018-01-06 01:54:08
【问题描述】:

我已经为“2d active ising 模型”编写了 Monte Carlo 模拟,并且正在尝试改进运行时间。

我的代码做了什么: 我为粒子数(r)创建一个矩阵,为每个点(rgrid 和 mgrid)创建一个磁化矩阵。粒子的自旋可以是 -1/1,因此磁化强度范围为 [-r, r],步长为 2。

然后选择一个随机点和一个随机粒子(+1 或 -1)。由于概率取决于每个位置的正/负粒子数,我创建了 2 个数组并将它们压缩,因此我可以获得正粒子的拟合数,即 [(-3, 0), (-1, 1), ( 1, 2), (3, 3)]。 使用 3 个粒子,我可以具有 (-3, -1, 1, 3) 的磁化强度,其中具有 (0, 1, 2, 3) +1 个粒子。

之后,我计算该点的概率并选择一个动作:旋转翻转、上/下跳、左/右跳,什么都不做。 现在我必须移动(或不移动)粒子并更改 2 个点的磁体/密度(并检查周期性边界条件)。

这是我的代码:

from __future__ import print_function
from __future__ import division
from datetime import datetime
import numpy as np
import math
import matplotlib.pyplot as plt
import cProfile

pr = cProfile.Profile()
pr.enable()

m = 10  # zeilen, spalten
j = 1000 # finale zeit
r = 3  # platzdichte
b = 1.6  # beta
e = 0.9  # epsilon

M = m * m  # platzanzahl
N = M * r  # teilchenanzahl
dt = 1 / (4 * np.exp(b))  # delta-t
i = 0

rgrid = r * np.ones((m, m)).astype(int)  # dichte-matrix, rho = n(+) + n(-)
magrange = np.arange(-r, r + 1, 2)  # mögliche magnetisierungen [a, b), schrittweite
mgrid = np.random.choice(magrange, (m, m))  # magnetisierungs-matrix m = n(+) - (n-)


def flip():
    mgrid[math.trunc(x / m), x % m] -= 2 * spin

def up():
    y = x - m
    if y < 0:  # periodische randbedingung hoch
        y += m * m
    x1 = math.trunc(x / m)
    x2 = x % m
    y1 = math.trunc(y / m)
    y2 = y % m
    rgrid[x1, x2] -= 1  # [zeile, spalte] masse -1
    rgrid[y1, y2] += 1  # [zeile, spalte] masse +1
    mgrid[x1, x2] -= spin  # [zeile, spalte] spinänderung alter platz
    mgrid[y1, y2] += spin  # [zeile, spalte] spinänderung neuer platz

def down():
    y = x + m
    if y >= m * m:  # periodische randbedingung unten
        y -= m * m
    x1 = math.trunc(x / m)
    x2 = x % m
    y1 = math.trunc(y / m)
    y2 = y % m
    rgrid[x1, x2] -= 1
    rgrid[y1, y2] += 1
    mgrid[x1, x2] -= spin
    mgrid[y1, y2] += spin

def left():
    y = x - 1
    if math.trunc(y / m) < math.trunc(x / m):  # periodische randbedingung links
        y += m
    x1 = math.trunc(x / m)
    x2 = x % m
    y1 = math.trunc(y / m)
    y2 = y % m
    rgrid[x1, x2] -= 1
    rgrid[y1, y2] += 1
    mgrid[x1, x2] -= spin
    mgrid[y1, y2] += spin

def right():
    y = x + 1
    if math.trunc(y / m) > math.trunc(x / m):  # periodische randbedingung rechts
        y -= m
    x1 = math.trunc(x / m)
    x2 = x % m
    y1 = math.trunc(y / m)
    y2 = y % m
    rgrid[x1, x2] -= 1
    rgrid[y1, y2] += 1
    mgrid[x1, x2] -= spin
    mgrid[y1, y2] += spin


while i < j:

    # 1. platz aussuchen
    x = np.random.randint(M)  # wähle zufälligen platz aus

    if rgrid.item(x) != 0:

        i += dt / N

        # 2. teilchen aussuchen
        li1 = np.arange(-abs(rgrid.item(x)), abs(rgrid.item(x)) + 1, 2)
        li2 = np.arange(0, abs(rgrid.item(x)) + 1)
        li3 = zip(li1, li2)  # list1 und list2 als tupel in list3

        results = [item[1] for item in li3 if item[0] == mgrid.item(x)]  # gebe 2. element von tupel aus für passende magnetisierung
        num = int(''.join(map(str, results)))  # wandle listeneintrag in int um
        spin = 1.0 if np.random.random() < num / rgrid.item(x) else -1.0

        # 3. ereignis aussuchen
        p = np.random.random()
        p1 = np.exp(- spin * b * mgrid.item(x) / rgrid.item(x)) * dt  # flip
        p2 = dt  # hoch
        p3 = dt  # runter
        p4 = (1 - spin * e) * dt  # links
        p5 = (1 + spin * e) * dt  # rechts
        p6 = 1 - (4 + p1) * dt  # nichts


        if p < p6:
            continue
        elif p < p6 + p1:
            flip()
            continue
        elif p < p6 + p1 + p2:
            up()
            continue
        elif p < p6 + p1 + p2 + p3:
            down()
            continue
        elif p < p6 + p1 + p2 + p3 + p4:
            left()
            continue
        else:
            right()
            continue

pr.disable()
pr.print_stats(sort='cumtime')

我已经做了什么来加快速度:

  • 使用 cProfile 发现我使用的 random.choice 非常慢,并将其更改为自旋和概率 p 的 if 条件
  • 安装了 cython 并使用import pyximport; pyximport.install() 创建了一个已编译的 cython 文件。这并没有改善,在检查了cython -a script.py 之后,我发现我需要更多的静态变量来获得一些改进。

运行 cProfile 现在向我显示:

188939207 function calls in 151.834 seconds

   Ordered by: cumulative time

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
  5943639   10.582    0.000   40.678    0.000 {method 'join' of 'str' objects}
  5943639   32.543    0.000   37.503    0.000 script.py:107(<listcomp>)
  5943639    4.722    0.000   30.096    0.000 numeric.py:1905(array_str)
  8276949   25.852    0.000   25.852    0.000 {method 'randint' of 'mtrand.RandomState' objects}
  5943639   11.855    0.000   25.374    0.000 arrayprint.py:381(wrapper)
 11887279   14.403    0.000   14.403    0.000 {built-in method numpy.core.multiarray.arange}
 80651998   13.559    0.000   13.559    0.000 {method 'item' of 'numpy.ndarray' objects}
  5943639    8.427    0.000    9.364    0.000 arrayprint.py:399(array2string)
 11887278    8.817    0.000    8.817    0.000 {method 'random_sample' of 'mtrand.RandomState' objects}
   579016    7.351    0.000    7.866    0.000 script.py:79(right)
   300021    3.669    0.000    3.840    0.000 script.py:40(up)
   152838    1.950    0.000    2.086    0.000 script.py:66(left)
 17830917    1.910    0.000    1.910    0.000 {built-in method builtins.abs}
   176346    1.147    0.000    1.217    0.000 script.py:37(flip)
  5943639    1.131    0.000    1.131    0.000 {method 'discard' of 'set' objects}
  5943639    1.054    0.000    1.054    0.000 {built-in method _thread.get_ident}
  5943639    1.010    0.000    1.010    0.000 {method 'add' of 'set' objects}
  5943639    0.961    0.000    0.961    0.000 {built-in method builtins.id}
  3703804    0.892    0.000    0.892    0.000 {built-in method math.trunc}

我使用join 来获取该点上+1 粒子数的整数值,因为我的概率需要它。

如果我想运行像m=400r=3j=300000(j:最后一次)这样严肃的事情,我正在考虑以我目前的速度运行大约 4 年。

非常感谢任何帮助。

【问题讨论】:

  • 我在物理本科时做过这个。它太酷了。我建议的一件事是在任何循环之外生成所有随机数。如果您需要 1000 个随机数,请在开始时间步之前生成 1000 个数字元组。
  • This thread 讨论快速随机数生成。
  • 工作代码的改进也可以作为codereview.SE 的主题,但如果您在那里询问,请确保it's welcome there
  • 至于您的问题,将 listcomps 和 zips 以及字符串操作与 numpy 混合在一起是一个好兆头,表明您的代码可能会得到很大改进。
  • @wrkyle 在已接受答案的 cmets 中,我看到了我已经担心的情况,内存不足。对于我在底部提到的大运行,它将是大约 135 亿次迭代,每次迭代我需要 2 个随机数。

标签: python performance


【解决方案1】:

蒙特卡洛模拟

起初我摆脱了列表,后来我使用了即时编译器 (numba)。没有编译得到 196s(你的版本),编译得到 0.44s。因此,通过使用 jit 编译器和一些简单的优化,可以改善因子 435

还有一个主要优点是,这里也释放了 GIL(全局解释器锁)。如果代码受处理器限制且不受内存带宽限制,则可以在另一个线程中计算随机数,同时在另一个线程中运行模拟(可以使用多个内核)。这也可以进一步提高性能,工作原理如下:

  1. 计算第一块随机数(足够小,整个问题很容易放入处理器缓存(至少是 L3 缓存)。
  2. 使用该随机数开始迭代。在运行迭代时,会计算另一块随机数。
  3. 重复 (2) 直到完成。

代码

import numba as nb
import numpy as np

def calc (m,j,e,r,dt,b,rgrid,mgrid):
    M=m*m
    N = M * r
    i=0
    while i < j:
        # 1. platz aussuchen
        x = np.random.randint(M)  # wähle zufälligen platz aus

        if rgrid[x] != 0:
            i += dt / N

            ########
            # 2. teilchen aussuchen
            #li1 = np.arange(-abs(rgrid[x]), abs(rgrid[x]) + 1, 2)
            #li2 = np.arange(0, abs(rgrid[x]) + 1)

            #li3 = zip(li1, li2)  # list1 und list2 als tupel in list3
            #results = [item[1] for item in li3 if item[0] == mgrid[x]]  # gebe 2. element von tupel aus für passende magnetisierung
            #num = int(''.join(map(str, results)))  # wandle listeneintrag in int um
            #######

            # This should be equivalent
            jj=0 #li2 starts with 0 and has a increment of 1

            for ii in range(-abs(rgrid[x]),abs(rgrid[x])+1, 2):
                if (ii==mgrid[x]):
                    num=jj
                    break

                jj+=1

            spin = 1.0 if np.random.random() < num / rgrid[x] else -1.0

            # 3. ereignis aussuchen
            p = np.random.random()
            p1 = np.exp(- spin * b * mgrid[x] / rgrid[x]) * dt  # flip
            p2 = dt  # hoch
            p3 = dt  # runter
            p4 = (1 - spin * e) * dt  # links
            p5 = (1 + spin * e) * dt  # rechts
            p6 = 1 - (4 + p1) * dt  # nichts


            if p < p6:
                continue
            elif p < p6 + p1:
                #flip()
                mgrid[x] -= 2 * spin 
                continue
            elif p < p6 + p1 + p2:
                #up()
                y = x - m
                if y < 0:  # periodische randbedingung hoch
                    y += m * m
                rgrid[x] -= 1  # [zeile, spalte] masse -1
                rgrid[y] += 1  # [zeile, spalte] masse +1
                mgrid[x] -= spin  # [zeile, spalte] spinänderung alter platz
                mgrid[y] += spin  # [zeile, spalte] spinänderung neuer platz
                continue
            elif p < p6 + p1 + p2:
                #down()
                y = x + m
                if y >= m * m:  # periodische randbedingung unten
                    y -= m * m
                rgrid[x] -= 1
                rgrid[y] += 1
                mgrid[x] -= spin
                mgrid[y] += spin
                continue
            elif p < p6 + p2 + p3:
                #left()
                y = x - 1
                if (y // m) < (x // m):  # periodische randbedingung links
                    y += m
                rgrid[x] -= 1
                rgrid[y] += 1
                mgrid[x] -= spin
                mgrid[y] += spin
                continue
            else:
                #right()
                y = x + 1
                if (y // m) > (x // m):  # periodische randbedingung rechts
                    y -= m
                rgrid[x] -= 1
                rgrid[y] += 1
                mgrid[x] -= spin
                mgrid[y] += spin
                continue
    return (mgrid,rgrid)

现在是测试的主要功能:

def main():
    m = 10  # zeilen, spalten
    j = 1000 # finale zeit
    r = 3  # platzdichte
    b = 1.6  # beta
    e = 0.9  # epsilon

    M = m * m  # platzanzahl
    N = M * r  # teilchenanzahl
    dt = 1 / (4 * np.exp(b))  # delta-t
    i = 0

    rgrid = r * np.ones((m, m),dtype=np.int) #don't convert the array build it up with the right datatype  # dichte-matrix, rho = n(+) + n(-)
    magrange = np.arange(-r, r + 1, 2)  # mögliche magnetisierungen [a, b), schrittweite
    mgrid = np.random.choice(magrange, (m, m))  # magnetisierungs-matrix m = n(+) - (n-)

    #Compile the function
    nb_calc = nb.njit(nb.types.Tuple((nb.int32[:], nb.int32[:]))(nb.int32, nb.int32,nb.float32,nb.int32,nb.float32,nb.float32,nb.int32[:], nb.int32[:]),nogil=True)(calc)

    Results=nb_calc(m,j,e,r,dt,b,rgrid.flatten(),mgrid.flatten())

    #Get the results
    mgrid_new=Results[0].reshape(mgrid.shape)
    rgrid_new=Results[1].reshape(rgrid.shape)

编辑 我已经重写了代码“2.Teilchen aussuchen”并重新编写了代码,以便它可以与标量索引一起使用。这样可以将速度提高 4 倍。

【讨论】:

  • num 的大小应始终为 1,因为它是该点上自旋 1 粒子的数量。我正在尝试安装 numba 进行测试。
  • 哦,好的。这就说得通了。我建议使用 Anaconda。 continuum.io/downloads 。对于许多其他 python 包,安装过程会更容易。 (conda install numba) 就是这样 ;) 如果你不想使用 anaconda 并且你在 Windows 上,这个网站将帮助lfd.uci.edu/~gohlke/pythonlibs/#numba
  • 现在在 pycharm 上使用 anaconda 和 virtenv 安装它:D。有几个问题: 1) 在 nb_calc 行中,你最后有 (calc)。 pycharm 将此标记为错误,并且在删除它后工作正常。 2) nb_calc 行是做什么的? 3) 为什么 b 不是 in calc 函数?我现在添加它,因为它被用于自旋计算 4)我从 numba 导入 jit 并将“@jit”放在“def calc”之前。这就是你编译的意思吗?
  • nb_calc 行进行编译。你应该把那行放在calc的函数定义之后。有一些方法可以完成计算。 numba 主页上也有一些很好的例子。我忘记了b。 Njit 或带有 noPython 的 jit 您可以获得一些速度。装饰类型也是有益的,就像在 nb_calc 行中所做的那样。
  • 我需要调查一下,看起来很神奇。 nb_calc 行中的 (calc) 是错误的吗?最后一个问题:p6 是什么都不做的机会,它的发生率最高。我先检查一下,如果是这样的话,下一个周期可以立即开始。如果我进行数万亿次或迭代,这对节省时间有重要意义吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-01-02
  • 2019-01-21
  • 2012-04-26
  • 2015-02-10
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多