【问题标题】:Implement LRU cache with using @functools.lru_decorator in python在 python 中使用 @functools.lru_decorator 实现 LRU 缓存
【发布时间】:2021-06-02 10:18:49
【问题描述】:

所以我一直在尝试为我的项目实现 LRU 缓存, 使用 python functools lru_cache。 作为参考,我使用了this。 以下是引用的代码。

def timed_lru_cache(maxsize, seconds):
    def wrapper_cache(func):
        func = lru_cache(maxsize=maxsize)(func)
        func.lifetime = timedelta(seconds=seconds)
        func.expiration = datetime.utcnow() + func.lifetime

        @wraps(func)
        def wrapped_func(*args, **kwargs):
            if datetime.utcnow() >= func.expiration:
                func.cache_clear()
                func.expiration = datetime.utcnow() + func.lifetime

            return func(*args, **kwargs)

        return wrapped_func

    return wrapper_cache

    @timed_lru_cache(maxsize=config.cache_size, seconds=config.ttl)
    def load_into_cache(id):
        return object

在包装的 func 部分中,func.cache_clear() 会清除整个缓存以及所有项目。 我需要帮助才能在插入后仅删除超过过期时间的元素。 有什么解决办法吗?

【问题讨论】:

    标签: python functools lru


    【解决方案1】:

    我觉得改编现有的lru_cache不是那么容易,而且我认为链接的方法不是很清楚。

    相反,我从头开始实现了一个定时 lru 缓存。用法见顶部的文档字符串。

    它根据输入的argskwargs 存储一个密钥,并管理两个结构:

    • key => (expiry, result) 的映射
    • 最近使用的列表,其中第一项是最近最少使用的

    每次您尝试获取项目时,都会在“最近使用”列表中查找该键。如果它不存在,它将被添加到列表和映射中。如果它在那里,我们检查到期是否在过去。如果是,我们重新计算结果并更新。否则我们可以返回映射中的任何内容。

    from datetime import datetime, timedelta
    from functools import wraps
    from typing import Any, Dict, List, Optional, Tuple
    
    
    class TimedLRUCache:
        """ Cache that caches results based on an expiry time, and on least recently used.
        
            Items are eliminated first if they expire, and then if too many "recent" items are being
            stored. 
            
            There are two methods of using this cache, either the `get` method`, or calling this as a
            decorator. The `get` method accepts any arbitrary function, but on the parameters are
            considered in the key, so it is advisable not to mix function.
            
            >>> cache = TimedLRUCache(5)
            >>> def foo(i):
            ...     return i + 1
            
            >>> cache.get(foo, 1)  # runs foo
            >>> cache.get(foo, 1)  # returns the previously calculated result
            
            As a decorator is more familiar:
            
            >>> @TimedLRUCache(5)
            ... def foo(i):
            ...     return i + 1
            
            >>> foo(1)  # runs foo
            >>> foo(1)  # returns the previously calculated result
            
            
            Either method can allow for fine-grained control of the cache:
            
            >>> five_second_cache = TimedLRUCache(5)
            >>> @five_second_cache
            ... def foo(i):
            ...     return i + 1
            
            >>> five_second_cache.clear_cache()  # resets the cache (clear every item)
            >>> five_second_cache.prune()  # clear invalid items
        """
        _items: Dict[int, Tuple[datetime, Any]]
        _recently_added: List[int]
    
        delta: timedelta
        max_size: int
    
        def __init__(self, seconds: Optional[int] = None, max_size: Optional[int] = None):
            self.delta = timedelta(seconds=seconds) if seconds else None
            self.max_size = max_size
    
            self._items = {}
            self._recently_added = []
            
        def __call__(self, func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                return self.get(func, args, kwargs)
            return wrapper
    
        @staticmethod
        def _get_key(args, kwargs) -> int:
            """ Get the thing we're going to use to lookup items in the cache. """
            key = (args, tuple(sorted(kwargs.items())))
            return hash(key)
    
        def _update(self, key: int, item: Any) -> None:
            """ Make sure an item is up to date. """
            if key in self._recently_added:
                self._recently_added.remove(key)
            # the first item in the list is the least recently used
            self._recently_added.append(key)
            self._items[key] = (datetime.now() + self.delta, item)
    
            # when this function is called, something has changed, so we can also sort out the cache
            self.prune()
    
        def prune(self):
            """ Clear out everything that no longer belongs in the cache
    
                First delete everything that has expired. Then delete everything that isn't recent (only
                if there is a `max_size`).
            """
            # clear out anything that no longer belongs in the cache.
            current_time = datetime.now()
            # first get rid of things which have expired
            for key, (expiry, item) in self._items.items():
                if expiry < current_time:
                    del self._items[key]
                    self._recently_added.remove(key)
            # then make sure there aren't too many recent items
            if self.max_size:
                self._recently_added[:-self.max_size] = []
    
        def clear_cache(self):
            """ Clear everything from the cache """
            self._items = {}
            self._recently_added = []
    
        def get(self, func, args, kwargs):
            """ Given a function and its arguments, get the result using the cache
    
                Get the key from the arguments of the function. If the key is in the cache, and the
                expiry time of that key hasn't passed, return the result from the cache.
    
                If the key *has* expired, or there are too many "recent" items, recalculate the result,
                add it to the cache, and then return the result.
            """
            key = self._get_key(args, kwargs)
            current_time = datetime.now()
            if key in self._recently_added:
                # there is something in the cache
                expiry, item = self._items.get(key)
                if expiry < current_time:
                    # the item has expired, so we need to get the new value
                    new_item = func(*args, **kwargs)
                    self._update(key, new_item)
                    return new_item
                else:
                    # we can use the existing value
                    return item
            else:
                # never seen this before, so add it
                new_item = func(*args, **kwargs)
                self._update(key, new_item)
                return new_item
    

    【讨论】:

      猜你喜欢
      • 2015-02-13
      • 2021-03-05
      • 1970-01-01
      • 2010-11-03
      • 2015-07-19
      • 2023-03-26
      • 2021-01-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多