【问题标题】:python time sliding window variationpython时间滑动窗口变化
【发布时间】:2017-07-01 02:15:30
【问题描述】:

我遇到了滑动窗口问题的变体!

通常我们设置要滑动的元素数量,但在我的情况下,我想滑动时间!

我想达到的目标是一个函数(在这种情况下是线程) 能够在几秒钟内创建一个“时间”窗口(由用户给出)。

在这种情况下从队列的第一个元素开始:

[datetime.time(7, 6, 14, 537370), 584 add 5 seconds -> 7:6:19.537370 (end point) 并将此区间内的所有元素相加:

 [datetime.time(7, 6, 14, 537370), 584]
 [datetime.time(7, 6, 18, 542798), 761]

总数:584+761=1345

然后用第二个元素创建另一个“窗口”并继续。 重要提示:一个项目可以是多个窗口的一部分。同时生成项目,具有睡眠 n 秒然后刷新队列的功能的 naif 解决方案不适合我的问题。

我认为它是这篇文章的变体: Flexible sliding window (in Python)

但还是不能解决问题!任何帮助或建议将不胜感激。 谢谢!

元素列表示例:

 [datetime.time(7, 6, 14, 537370), 584]
 [datetime.time(7, 6, 18, 542798), 761]
 [datetime.time(7, 6, 20, 546007), 848]
 [datetime.time(7, 6, 24, 550969), 20]
 [datetime.time(7, 6, 27, 554370), 478]
 [datetime.time(7, 6, 27, 554628), 12]
 [datetime.time(7, 6, 31, 558919), 29]
 [datetime.time(7, 6, 31, 559562), 227]
 [datetime.time(7, 6, 32, 560863), 379]
 [datetime.time(7, 6, 35, 564863), 132]
 [datetime.time(7, 6, 37, 567276), 651]
 [datetime.time(7, 6, 38, 568652), 68]
 [datetime.time(7, 6, 40, 569861), 100]
 [datetime.time(7, 6, 41, 571459), 722]
 [datetime.time(7, 6, 44, 574802), 560]

...

代码:

 import random
 import time
 import threading
 import datetime
 from multiprocessing import Queue

 q = Queue()

 #this is a producer that put elements in queue

 def t1():
     element = [0,0]
     while True:
         time.sleep(random.randint(0, 5))
         element[0] = datetime.datetime.now().time()
         element[1] = random.randint(0, 1000)
         q.put(element)


 #this is a consumer that sum elements inside a window of n seconds
 #Ineed something a sliding window time of ten seconds that sum all elements for n seconds

 def t2():
     windowsize = 5 #size of the window 5 seconds
     while not queue.empty():
         e = q.get()
         start = e[0] #the first element is the beginning point
         end = start + datetime.timedelta(seconds=windowsize) #ending point
         sum += e[1]
         #some code that solve the problem :)



 a = threading.Thread(target=t1)
 a.start()

 b = threading.Thread(target=t2)
 b.start()

 while True:
     time.sleep(1)

【问题讨论】:

  • 一个元素可以成为你的几个“窗口”的一部分,还是你想在队列中的一个元素被汇总到一个窗口后立即使用它?如果一个元素可以是多个窗口的一部分,那么最终从队列中删除元素的机制是什么(“这个元素肯定不再需要了,让我们删除它以避免填满内存”)?无论如何,这绝对是可以解决的,但只需要知道你想要实现的究竟是什么。
  • 一个元素可以是多个窗口的一部分,添加到原始帖子描述中,谢谢。
  • 所以我是否理解正确,只要您的“队列”的第一个和最后一个元素的时间差超过 10 秒,那么可以过期最旧的条目,直到一切都适合 10秒帧(或我们决定用作最大值的任何内容?)然后您的“t2”可用于从第一个元素开始查询大小为 0-10 秒的窗口?
  • t0是物品到达的时间,t0+5秒是windows的结束时间。当 t0+5 过去时,元素熄灭,窗口滑动到下一个元素!
  • 最大值基于从第一个元素日期时间开始的窗口大小。

标签: python multithreading iterator producer-consumer sliding-window


【解决方案1】:

这样可以吗?这就是我理解你的问题的方式。它的作用是创建一个跟踪事物的类。您可以通过 tw.insert() 添加或使用 tw.sum_window(seconds) 求和。

当你初始化 TimeWindow 时,你可以给它一个最大尺寸参数,默认是 10 秒。当您添加元素或计算总和时,它会进行清理,以便在每次插入或求和操作之前,第一个元素时间 e[0][0] 和最后一个元素时间 e[n][0] 彼此在 10 秒内.旧条目被删除。有一个“轮询”线程来跟踪您的请求。

我添加了两个队列,因为我不知道您打算如何处理结果。现在,如果您想从现在开始到未来 5 秒请求数据,您可以创建一个请求并将其放入队列中。该请求有一个随机 ID,以便您可以将其与结果相匹配。您的主线程需要监视结果队列,五秒钟后,发送到队列的每个请求都返回相同的 id 和总和。

如果这不是您想要做的,那么我就是不明白您在这里试图实现什么。即使这已经相当复杂,并且可能有一种更简单的方法来实现您打算做的事情。

import random
import time
import threading
import datetime
import Queue
import uuid

from collections import deque

q_lock = threading.RLock()


class TimeWindow(object):
    def __init__(self, max_size=10):
        self.max_size = max_size
        self.q = deque()

    def expire(self):
        time_now = datetime.datetime.now()
        while True:
            try:
                oldest_element = self.q.popleft()
                oe_time = oldest_element[0]
                if oe_time + datetime.timedelta(seconds=self.max_size) > time_now:
                    self.q.appendleft(oldest_element)
                    break

            except IndexError:
                break

    def insert(self,elm):
        self.expire()
        self.q.append(elm)

    def sum_window(self, start, end):
        self.expire()
        try:
            _ = self.q[0]
        except IndexError:
            return 0
        result=0
        for f in self.q:
            if start < f[0] < end:
                result += f[1]
            else:
                pass
        return result


tw = TimeWindow()


def t1():
    while True:
        time.sleep(random.randint(0, 3))
        element = [datetime.datetime.now(), random.randint(0,1000)]
        with q_lock:
            tw.insert(element)


def poller(in_q, out_q):
    pending = []
    while True:
        try:
            new_request = in_q.get(0.1)
            new_request["end"] = new_request["start"] + datetime.timedelta(seconds=new_request["frame"])
            pending.append(new_request)
        except Queue.Empty:
            pass

        new_pending = []
        for a in pending:
            if a["end"] < datetime.datetime.now():
                with q_lock:
                    r_sum = tw.sum_window(a["start"], a["end"])
                r_structure = {"id": a["id"], "result": r_sum}
                out_q.put(r_structure)
            else:
                new_pending.append(a)
        pending = new_pending


a = threading.Thread(target=t1)
a.daemon = True
a.start()
in_queue = Queue.Queue()
result_queue = Queue.Queue()

po = threading.Thread(target=poller, args=(in_queue, result_queue,))
po.daemon = True
po.start()

while True:
    time.sleep(1)
    newr = {"id": uuid.uuid4(), "frame": 5, "start": datetime.datetime.now()}
    in_queue.put(newr)
    try:
        ready = result_queue.get(0)
        print ready
    except Queue.Empty:
        pass

【讨论】:

  • 首先非常感谢您的帮助!嗯,问题是 tw.sum_window(5) 应该在时间窗口结束时的 5 秒结束时执行!
  • 我有点不明白您希望如何计算它。你的意思是你只是想“窥视历史”,这样如果你现在执行 sum_window ,它会选择最近插入的时间并从那里倒数五秒并总结它们?所以基本上我做了同样的事情,但反过来了?
  • 一个例子可能更不言自明)第一个元素到达 12:02:50.164829 值 8 比其他元素......函数“sum”应该在 12:02:55.164829 执行(t0+5 秒) 然后根据后继元素的日期时间继续迭代。
  • sum_window 和 max_size 在您的示例中应该相同。在我的情况下,触发器应该是 (t0+n seconds) :)
  • 好的,我想我现在明白了。让我考虑一下。
【解决方案2】:
garim@wof:~$ python solution.py
1 t1 produce element:  16:09:30.472497   1
2 t1 produce element:  16:09:33.475714   9
3 t1 produce element:  16:09:34.476922   10
4 t1 produce element:  16:09:37.480100   7
solution:  16:09:37.481171   {'id': UUID('adff334f-a97a-459d-8dcc-f28309e25574'), 'result': 19}
5 t1 produce element:  16:09:38.481352   10
solution:  16:09:38.482687   {'id': UUID('0a7481e5-e993-439a-9f7e-2c5aeef86155'), 'result': 19}

它仍然有效:(我使用函数 t1 为它插入的每个元素添加一个计数器。目标是此时求和 (result_queue.get):

16:09:35.472497 ---> 16:09:30.472497 + 5 秒

以前没有。只有这样元素才会熄灭。下次总和将在:

16:09:35.475714 ---> 16:09:33.475714 + 5 秒

我知道这很难解释.. 使用您的两个解决方案,时间窗口都会滑动,因此我可以认为问题已解决:) 我将尝试改进函数 sum 何时执行,时间触发器很重要。我获得了很多有用的知识。感谢您的帮助。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-08-04
    • 2013-01-17
    • 1970-01-01
    • 2016-08-01
    • 2014-01-22
    • 1970-01-01
    • 1970-01-01
    • 2015-02-20
    相关资源
    最近更新 更多