【问题标题】:How to pluck data every 30s from a stream in python如何从python中的流中每30秒提取一次数据
【发布时间】:2022-03-02 10:33:41
【问题描述】:

我的问题: 如何从 python 的流中每 30 次提取一次数据。我正在从股票经纪人 Alpaca Api 流式传输盘中数据。 alpaca api 每隔几秒就提供数百条消息,但我只想要每 30 秒一个数据点(以加快分析速度)。

我的方法: 我的方法是让我的数据处理程序休眠 30 秒,然后当它唤醒时,for range 语句会阻止它在再次进入休眠状态之前收集多个数据点。

实际结果与预期结果: 消息处理程序在只记录一条消息时效果很好但是,而不是从它唤醒的第二条开始记录一条消息,它正在记录似乎是队列中的下一条消息,即使我收到了30 秒前的那条消息,从那时起已经有 1000 条消息。

可能性:也许有办法只访问流中的最后一项?

#Create data handler for stream 
def on__message(message):

    #for range ensures only one message is recoreded when function is awake
    for i in range(1):

        with open("C:/Users/micha/github/trade_bot/intraday_data.csv", "a") as outfile:

            #pull variables from message
            timestamp = q.timestamp
            bid_price = q.bid_price
            ask_price = q.ask_price
        
            #create writer object & point at outfile
            writer = csv.writer(outfile)

            #write/append variables to outfile
            writer.writerow([timestamp,bid_price, ask_price])

    #put recorder to sleep for 30s
    time.sleep(30)


#Create stream object 
stream = Stream(APCA_API_KEY_ID,
            APCA_API_SECRET_KEY,
            APCA_API_DATA_URL,
            data_feed='iex')  


#Subscribe to stream and point stream to my handler - 'on_message' 
stream.subscribe_quotes(on_message, 'AAPL')
stream.run()

【问题讨论】:

    标签: stream event-handling handler event-listener


    【解决方案1】:

    一种方法是仅在新消息和最后处理消息的经过时间大于您指定的“采样间隔”时才处理消息。否则,流处理程序应忽略更新。

    示例代码如下:

    from datetime import time, datetime
    
    class SamplingStreamReader:
    
        def __init__(self, sampling_interval):
            self.sampling_interval = sampling_interval
            self.last_processed_timestamp = datetime.now()
    
        async def on_message(self, message):
            time_delta = datetime.now() - self.last_processed_timestamp
            if time_delta.total_seconds() > self.sampling_interval:
                print(f"{datetime.now()} - {message}")
                self.last_processed_timestamp = datetime.now()
    
    if __name__ == '__main__':
        # Create stream object
        stream = Stream(APCA_API_KEY_ID,
                        APCA_API_SECRET_KEY,
                        APCA_API_DATA_URL,
                        data_feed='iex')
    
        # Subscribe to stream and point stream to my handler - 'on_message'
        stream_reader = SamplingStreamReader(30)
        stream.subscribe_quotes(stream_reader.on_message, 'AAPL')
        stream.run()
    

    其他选项包括:

    • 轮询(例如每 n 秒发出一次请求)而不是流式传输数据
    • 每隔 n 秒订阅和取消订阅流(我不建议这样做)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-04-11
      • 1970-01-01
      • 2011-11-06
      • 1970-01-01
      • 2014-05-05
      • 1970-01-01
      • 1970-01-01
      • 2012-06-13
      相关资源
      最近更新 更多