【问题标题】:How to speed up monte carlo simulation in python如何在python中加速蒙特卡罗模拟
【发布时间】:2015-10-23 22:37:45
【问题描述】:

我编写了一个扑克模拟器,它通过运行游戏模拟(即蒙特卡罗模拟)来计算在德州扑克中获胜的概率。目前它在 10 秒内运行大约 10000 次模拟,这通常足够好。但是,iPhone 上的某些应用程序的运行速度提高了 100 倍左右。我想知道是否有什么办法可以显着加快程序的速度。我已经用列表替换了字符串,并找到了许多将程序加速大约 2-3 倍的方法。但是我能做些什么来加速它 50x-100x 呢?我用探查器进行了检查,但找不到任何明显的瓶颈。我也将它遵守到 cython (没有做任何改变),但这对速度也没有影响。任何建议表示赞赏。完整列表如下:

__author__ = 'Nicolas Dickreuter'
import time
import numpy as np
from collections import Counter

class MonteCarlo(object):

    def EvalBestHand(self, hands):
        scores = [(i, self.score(hand)) for i, hand in enumerate(hands)]
        winner = sorted(scores, key=lambda x: x[1], reverse=True)[0][0]
        return hands[winner],scores[winner][1][-1]

    def score(self, hand):
        crdRanksOriginal = '23456789TJQKA'
        originalSuits='CDHS'
        rcounts = {crdRanksOriginal.find(r): ''.join(hand).count(r) for r, _ in hand}.items()
        score, crdRanks = zip(*sorted((cnt, rank) for rank, cnt in rcounts)[::-1])

        potentialThreeOfAKind = score[0] == 3
        potentialTwoPair = score == (2, 2, 1, 1, 1)
        potentialPair = score == (2, 1, 1, 1, 1, 1)

        if score[0:2]==(3,2) or score[0:2]==(3,3): # fullhouse (three of a kind and pair, or two three of a kind)
            crdRanks = (crdRanks[0],crdRanks[1])
            score = (3,2)
        elif score[0:4]==(2,2,2,1):
            score=(2,2,1) # three pair are not worth more than two pair
            sortedCrdRanks = sorted(crdRanks,reverse=True) # avoid for example 11,8,6,7
            crdRanks=(sortedCrdRanks[0],sortedCrdRanks[1],sortedCrdRanks[2],sortedCrdRanks[3])

        elif len(score) >= 5:  # high card, flush, straight and straight flush
            # straight
            if 12 in crdRanks:  # adjust if 5 high straight
                crdRanks += (-1,)
            sortedCrdRanks = sorted(crdRanks,reverse=True)  # sort again as if pairs the first rank matches the pair
            for i in range(len(sortedCrdRanks) - 4):
                straight = sortedCrdRanks[i] - sortedCrdRanks[i + 4] == 4
                if straight:
                    crdRanks=(sortedCrdRanks[i],sortedCrdRanks[i+1],sortedCrdRanks[i+2],sortedCrdRanks[i+3],sortedCrdRanks[i+4])
                    break

            # flush
            suits = [s for _, s in hand]
            flush = max(suits.count(s) for s in suits) >= 5
            if flush:
                for flushSuit in originalSuits: # get the suit of the flush
                    if suits.count(flushSuit)>=5:
                        break

                flushHand = [k for k in hand if flushSuit in k]
                rcountsFlush = {crdRanksOriginal.find(r): ''.join(flushHand).count(r) for r, _ in flushHand}.items()
                score, crdRanks = zip(*sorted((cnt, rank) for rank, cnt in rcountsFlush)[::-1])
                crdRanks = tuple(sorted(crdRanks,reverse=True)) # ignore original sorting where pairs had influence

                # check for straight in flush
                if 12 in crdRanks:  # adjust if 5 high straight
                    crdRanks += (-1,)
                for i in range(len(crdRanks) - 4):
                    straight = crdRanks[i] - crdRanks[i + 4] == 4

            # no pair, straight, flush, or straight flush
            score = ([(5,), (2, 1, 2)], [(3, 1, 3), (5,)])[flush][straight]

        if score == (1,) and potentialThreeOfAKind: score = (3, 1)
        elif score == (1,) and potentialTwoPair: score = (2, 2, 1)
        elif score == (1,) and potentialPair: score = (2, 1, 1)

        if score[0]==5:
            handType="StraightFlush"
            #crdRanks=crdRanks[:5] # five card rule makes no difference {:5] would be incorrect
        elif score[0]==4:
            handType="FoufOfAKind"
            crdRanks=crdRanks[:2]
        elif score[0:2]==(3,2):
            handType="FullHouse"
            # crdRanks=crdRanks[:2] # already implmeneted above
        elif score[0:3]==(3,1,3):
            handType="Flush"
            crdRanks=crdRanks[:5] # !! to be verified !!
        elif score[0:3]==(3,1,2):
            handType="Straight"
            crdRanks=crdRanks[:5] # !! to be verified !!
        elif score[0:2]==(3,1):
            handType="ThreeOfAKind"
            crdRanks=crdRanks[:3]
        elif score[0:2]==(2,2):
            handType="TwoPair"
            crdRanks=crdRanks[:3]
        elif score[0]==2:
            handType="Pair"
            crdRanks=crdRanks[:4]
        elif score[0]==1:
            handType="HighCard"
            crdRanks=crdRanks[:5]
        else: raise Exception('Card Type error!')

        return score, crdRanks, handType

    def createCardDeck(self):
        values = "23456789TJQKA"
        suites = "CDHS"
        Deck=[]
        [Deck.append(x+y) for x in values for y in suites]
        return Deck

    def distributeToPlayers(self, Deck, PlayerAmount, PlayerCardList, TableCardsList):
        Players =[]
        CardsOnTable = []
        knownPlayers = 0

        for PlayerCards in PlayerCardList:
            FirstPlayer=[]
            FirstPlayer.append(Deck.pop(Deck.index(PlayerCards[0])))
            FirstPlayer.append(Deck.pop(Deck.index(PlayerCards[1])))
            Players.append(FirstPlayer)

            knownPlayers += 1

        for c in TableCardsList:
            CardsOnTable.append(Deck.pop(Deck.index(c)))  # remove cards that are on the table from the deck

        for n in range(0, PlayerAmount - knownPlayers):
            plr=[]
            plr.append(Deck.pop(np.random.random_integers(0,len(Deck)-1)))
            plr.append(Deck.pop(np.random.random_integers(0,len(Deck)-1)))
            Players.append(plr)

        return Players, Deck

    def distributeToTable(self, Deck, TableCardsList):
        remaningRandoms = 5 - len(TableCardsList)
        for n in range(0, remaningRandoms):
            TableCardsList.append(Deck.pop(np.random.random_integers(0,len(Deck)-1)))

        return TableCardsList

    def RunMonteCarlo(self, originalPlayerCardList, originalTableCardsList, PlayerAmount, gui, maxRuns=6000,maxSecs=5):
        winnerlist = []
        EquityList = []
        winnerCardTypeList=[]
        wins = 0
        runs=0
        timeout_start=time.time()
        for m in range(maxRuns):
            runs+=1
            try:
                if gui.active==True:
                    gui.progress["value"] = int(round(m*100/maxRuns))
            except:
                pass
            Deck = self.createCardDeck()
            PlayerCardList = originalPlayerCardList[:]
            TableCardsList = originalTableCardsList[:]
            Players, Deck = self.distributeToPlayers(Deck, PlayerAmount, PlayerCardList, TableCardsList)
            Deck5Cards = self.distributeToTable(Deck, TableCardsList)
            PlayerFinalCardsWithTableCards = []
            for o in range(0, PlayerAmount):
                PlayerFinalCardsWithTableCards.append(Players[o]+Deck5Cards)

            bestHand,winnerCardType=self.EvalBestHand(PlayerFinalCardsWithTableCards)
            winner = (PlayerFinalCardsWithTableCards.index(bestHand))


            #print (winnerCardType)

            CollusionPlayers = 0
            if winner < CollusionPlayers + 1:
                wins += 1
                winnerCardTypeList.append(winnerCardType)
                # winnerlist.append(winner)
                # self.equity=wins/m
                # if self.equity>0.99: self.equity=0.99
                # EquityList.append(self.equity)
            if time.time()>timeout_start+maxSecs:
                break

        self.equity = wins / runs
        self.winnerCardTypeList = Counter(winnerCardTypeList)
        for key, value in self.winnerCardTypeList.items():
            self.winnerCardTypeList[key] = value / runs

        self.winTypesDict=self.winnerCardTypeList.items()

        # show how the montecarlo converges
        # xaxis=range(500,monteCarloRuns)
        # plt.plot(xaxis,EquityList[499:monteCarloRuns])
        # plt.show()
        return self.equity,self.winTypesDict

if __name__ == '__main__':
    Simulation = MonteCarlo()
    mycards=[['AS', 'KS']]
    cardsOnTable = []
    players = 3
    start_time = time.time()
    Simulation.RunMonteCarlo(mycards, cardsOnTable, players, 1, maxRuns=200000, maxSecs=120)
    print("--- %s seconds ---" % (time.time() - start_time))
    equity = Simulation.equity # considering draws as wins
    print (equity)

【问题讨论】:

  • 这可能是您的算法的运行时问题。
  • 是的,我已经检查过它们并且输出匹配。我创建了单元测试以确保所有结果都是正确的。我的问题确实是一个技术问题。为什么我的 python 程序比做同样事情的其他程序慢得多?当然,他们可能正在使用查找表,但我想知道是否有任何方法可以加快我编写的 python 程序。
  • 您引用的其他程序使用编译代码,而您使用动态解释语言。在 python 中,每个操作的成本都比在目标 C 或 Java 中高得多。因此,加快速度的唯一方法是使用在编译后的 C 中在 scrne 后面有效实现的表达式,这或多或少意味着在您的情况下是 numpy
  • 由于您有一个工作代码并希望提高性能,这个问题是Code Review 的一个很好的候选者。也许您可以尝试将这个问题迁移到那里。
  • 我不是任何相关领域的专家,但我记得 2+2 论坛上关于扑克模拟的讨论。一些张贴者正在比较通过卡片迭代来排列手牌的算法。然后另一个人带着温和的嘲讽评论说,手牌太少了,很明显要把它们全部缓存起来,把牌局排名变成一个查找问题,而不是逻辑问题。并且速度急剧增加。但下面的其他答案可能更重要,我不知道。

标签: performance python-3.x cython


【解决方案1】:

对于除通用建议之外的任何明智建议而言,该算法太长且太复杂。所以你开始了:尽可能矢量化所有东西,并使用数据向量而不是循环、列表、插入、删除。蒙特卡罗模拟很好地实现了这一点,因为所有样本都是独立的。

例如,您想要 100 万次重试,在一行中生成一个由 100 万个随机值组成的 numpy 数组。您想测试结果,将 100 万个结果放入 numpy 数组中,并针对给定条件一次测试所有结果,生成 numpy 布尔数组。

如果您设法将其矢量化,您将获得巨大的性能提升。

【讨论】:

  • 意思是一切都表示为一个numpy数组?
  • 是的,并且您尽可能使用数组操作而不是遍历它们。
  • 这里不是这样,但是如果您必须在每个循环中生成大量随机数(比如说 200)并进行 1M 模拟,您会遇到内存问题吗?在这种情况下,有哪些选择可以加快速度?
【解决方案2】:

低效的数组操作可能是导致速度缓慢的主要原因。特别是,我看到很多这样的行:

Deck.pop(some_index)

每次执行此操作时,列表都必须移动该索引之后的所有元素。您可以使用以下方法消除这种情况(以及重复的随机函数):

from random import shuffle

# Setup
originalDeck = createCardDeck()

# Individual run
Deck = originalDeck[:]
shuffle(Deck) # shuffles in place

# Draw a card from the end so nothing needs to shift
card = Deck.pop()

# Concise and efficient!

您还可以做一些其他事情,例如将长 if-else 链更改为使用 dict 或自定义类的恒定时间查找。现在它每手牌都先检查同花顺,然后再检查高牌,这似乎相当低效。不过,我怀疑这会产生很大的不同。

【讨论】:

  • 刚刚尝试按照您的建议用 shuffle 替换随机数生成器,但这会使整个过程减慢 25% 左右。我假设 shuffle 也必须重新排列整个数组。
  • @Nicolas: 如果len(Deck) 很大,那么优化Deck.pop()Deck.index() 是有意义的,它们是O(n) 操作例如,您可以对Deck 进行排序并使用@987654328 @ 模块在 O(log n) 时间内查找元素(如果 n=1000000log n ~ 10 和 ~10 步比一百万步好很多)或者你可以使用 set() 来表示 @987654333 @ 然后搜索/删除是O(1)(恒定时间)。对于小的 n (~10) - 常数因素可能很重要,即对于小的 n,更优化的算法可能会更慢。
  • 有趣,我认为洗牌会更有效,因为它不需要多次重新分配或调整数组大小。你试过 numpy.random.shuffle 吗?
  • @Nicolas: 明确一点:你应该在每副牌上调用一次random.shuffle(Deck)(而不是每张牌一次)。之后,您只需为每张卡拨打Deck.pop()
  • 是的,我试过了。但由于某种原因,这似乎更慢。改组似乎需要很长时间。
猜你喜欢
  • 2020-03-26
  • 1970-01-01
  • 1970-01-01
  • 2013-01-02
  • 2019-01-21
  • 1970-01-01
  • 2012-04-26
  • 2015-02-10
  • 1970-01-01
相关资源
最近更新 更多