【问题标题】:Python: trying to write dataframe to influxdb and receive the message "IndexError: list index out of range"Python:尝试将数据帧写入 influxdb 并收到消息“IndexError:列表索引超出范围”
【发布时间】:2023-11-10 03:28:02
【问题描述】:

我试图模仿这里所做的事情(python、influxdb、yfinance(与 googlefinance),最后是 graphana))。我终于弄明白了如何使用bucket vs 下面两个教程中描述的数据库方法

https://frobots.net/post/2016-03-01/monitoring-stocks-with-grafana

https://medium.com/analytics-vidhya/watch-your-stock-shares-with-grafana-and-influxdb-4df7a99c6d64

尝试在此处遵循 influxdb 2.0 API 说明

https://github.com/influxdata/influxdb-client-python#how-to-use-jupyter-pandas-influxdb-2

https://pypi.org/project/influxdb-client/

我将我的列表合并到一个数据框并添加了一个日期和符号列(上面的方法使用了 json 文件,但没有给我适当的例子......而且 yfinance 没有给我一个 json。它给了我一个数据框,我只有一个存储桶,所以我不确定我应该如何将每只股票写成一个单独的数据框,所以我将它们合并到一个数据框添加一个符号列。这使我的索引关闭,因为索引是日期,所以我创建了一个新索引)。当我编写每个数据框(以结果日期作为索引)时,我取得了成功……但恐怕我只是在用每只股票写相同的数据。所以我将这些股票合并到一个巨大的数据框中,添加一个日期和符号列(让表格有点沮丧)。

df = pd.concat(self.stock_prices, ignore_index=True)

我要做的是下载 # 个股票(本例中为 10 个),然后将其上传到存储桶中,以便最终绘制图表。我不确定我是否应该按照我现在的方式或其他方法(我想我想为每只股票写一个新表,但是我有 w 桶的例子没有显示给我如何指定一个表。我相信我在 api 1.8 Write pandas DF with tags to influxdb vs https://www.influxdata.com/blog/getting-started-with-influxdb-and-pandas/ 中看到了这样的方法。现在我似乎卡在索引上

_now = df.index

这似乎是正确的,但如果我执行打印(df.index)。什么都没有出现。但如果我做一个 len(df.index) 它显示正确。如果我尝试选择手动索引...

_now = list(range(0,len(df.index),1))

我遇到同样的错误

这是我的代码

import concurrent.futures
import json
import time, random
import pandas as pd
import yfinance as yf
import urllib.request
import influxdb_client
import numpy as np
from numpy import inf
from influxdb_client.client.write_api import SYNCHRONOUS
import pandas_market_calendars as mcal

start="2017-01-01"
end="2020-04-30"

nyse = mcal.get_calendar('NYSE')
trading_dates= nyse.schedule(start_date=start, end_date=end)
idx2 = trading_dates.index

influx_bucket = "local"
influx_org = "localdomain"
influx_token = "H60IaJ-9t64Zws296bpkJZNIu7ylUbJxfD6RH-thenW2cvWgxXUpADJN1R4o9PsiUtZAE7KjuK0Wzcu_DypdbQ=="
# Store the URL of your InfluxDB instance
influx_url="http://192.168.3.114:8086"

client = influxdb_client.InfluxDBClient(
    url=influx_url,
    token=influx_token,
    org=influx_org
)

write_api = client.write_api(write_options=SYNCHRONOUS)

url = 'ftp://ftp.nasdaqtrader.com/symboldirectory/nasdaqtraded.txt'
urllib.request.urlretrieve(url, 'nasdaqtraded.txt')
df = pd.read_csv('nasdaqtraded.txt', sep='|')
size=10
symbols = df["Symbol"][0:-1].sample(n=size)

class Stocks():
    def __init__(self, db="stocks", stocks_file="stocks.json"):
        self.stocks=None
        self.db=db
        self.stock_prices = list()
        #column_names = ["Open", "High", "Low", "Close", "Volume", "Dividends", "Stock Splits", "Symbol"]

        #df = pd.DataFrame(columns = column_names)        
        #self.load_stocks_from_json(stocks_file)
        self.stocks=symbols
        with concurrent.futures.ProcessPoolExecutor() as executor:
            for out1 in executor.map(self.dl_stocks, self.stocks):            
                self.stock_prices.append(out1)                
        
    def load_stocks_from_json(self, stocks_file):
        with open(stocks_file) as f:
            data=json.load(f)
            self.stocks=data['stocks'] 
            
    def dl_stocks(self, stock):
        ticker = yf.Ticker(stock)
        _data_frame = ticker.history(start=start, end=end, interval="1d", auto_adjust=True)
        #history = ticker.history()
        #last_quote = (history.tail(1)['Close'].iloc[0])
    
        idx1 = _data_frame.index  
        
        merged = idx1.union(idx2)
        s = _data_frame.reindex(merged)
        #s['New_Price'] = (1/s['Stock Splits']).replace(inf,1).cumprod() * s['Close']
        df = s.interpolate()  
        df['Symbol'] = stock
        df['Date'] = s.index
        
        return (df)
    
    def print_stocks(self):
        print(self.stock_prices) 
        
    def write_db(self):
        df = pd.concat(self.stock_prices, ignore_index=True)
        
        #df.reset_index()
        #_now = list(range(0,len(df.index),1))
        _now = df.index
        print(_now)
        _data_frame = pd.DataFrame(data=df,index=_now,columns=['Date','Open','High','Low','Close','Volume','Symbol'])
        #print(_data_frame)
        #_write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet', data_frame_tag_columns=['location'])
        #return(_data_frame)
        write_api.write(influx_bucket, record=_data_frame, data_frame_measurement_name=['Date','Open','High','Low','Close','Volume','Symbol'],data_frame_tag_columns=['Date','Open','High','Low','Close','Volume','Symbol'])

    def return_stocks(self):
        return(self.stock_prices)

def main():
    s=Stocks()
    #set = s.return_stocks()
    s.write_db()
    
    """
    Prepare data
    """

    """
    Query: using Table structure
    """
    #query_api = client.query_api()
    #tables = query_api.query('from(bucket:"local") |> range(start: -1y)')

    #for table in tables:
        #print(table)
        #for record in table.records:
            #print(record.values)
    
    #result = client.query_api().query(org=influx_org, query=query)
    #results = []
    #for table in result:
        #for record in table.records:
            #results.append((record.get_field(), record.get_value()))

    #print(results)

if __name__ == '__main__':
    main()
    

有人建议执行器多处理可能导致问题......但即使我使用简单的 for 循环下载股票,我也会遇到同样的错误

   def __init__(self, db="stocks", stocks_file="stocks.json"):
        self.stocks=None
        self.db=db
        self.stock_prices = list()
        #column_names = ["Open", "High", "Low", "Close", "Volume", "Dividends", "Stock Splits", "Symbol"]

        #df = pd.DataFrame(columns = column_names)        
        #self.load_stocks_from_json(stocks_file)
        self.stocks=symbols
        for i in self.stocks:
            stock = self.dl_stocks(i)
            self.stock_prices.append(stock)
        
        #with concurrent.futures.ProcessPoolExecutor() as executor:
            #for out1 in executor.map(self.dl_stocks, self.stocks):            
                #self.stock_prices.append(out1)  

【问题讨论】:

    标签: python indexing influxdb bucket


    【解决方案1】:

    self.stock_prices 是并发池的输出。在流程完成并发回其结果之前,这些值将不存在。你不是在等他们。你真的需要那些同时运行吗?为什么是单独的进程,而不是线程?

    无论如何,我建议您在开始时不使用并发。一旦你让事情正常运行,你就可以决定它是否运行得足够快。如果确实切换回并发,则必须使用 .wait 或 .as_completed 或 .result 来等待实际结果。

    【讨论】:

    • 我计划扩大规模,并行处理是一项硬性要求。事后我运行写函数的事实......我认为并行工作会完成。这就是问题所在吗?在并行过程完成之前,python 是否会进行某种(索引)库存检查?我可以尝试删除并行部分,看看是否能解决我的问题
    • 我做了一个简单的 for 循环并得到了完全相同的错误(将编辑原始帖子以反映这一点)