【发布时间】:2023-11-10 03:28:02
【问题描述】:
我试图模仿这里所做的事情(python、influxdb、yfinance(与 googlefinance),最后是 graphana))。我终于弄明白了如何使用bucket vs 下面两个教程中描述的数据库方法
https://frobots.net/post/2016-03-01/monitoring-stocks-with-grafana
https://medium.com/analytics-vidhya/watch-your-stock-shares-with-grafana-and-influxdb-4df7a99c6d64
尝试在此处遵循 influxdb 2.0 API 说明
https://github.com/influxdata/influxdb-client-python#how-to-use-jupyter-pandas-influxdb-2
https://pypi.org/project/influxdb-client/
我将我的列表合并到一个数据框并添加了一个日期和符号列(上面的方法使用了 json 文件,但没有给我适当的例子......而且 yfinance 没有给我一个 json。它给了我一个数据框,我只有一个存储桶,所以我不确定我应该如何将每只股票写成一个单独的数据框,所以我将它们合并到一个数据框添加一个符号列。这使我的索引关闭,因为索引是日期,所以我创建了一个新索引)。当我编写每个数据框(以结果日期作为索引)时,我取得了成功……但恐怕我只是在用每只股票写相同的数据。所以我将这些股票合并到一个巨大的数据框中,添加一个日期和符号列(让表格有点沮丧)。
df = pd.concat(self.stock_prices, ignore_index=True)
我要做的是下载 # 个股票(本例中为 10 个),然后将其上传到存储桶中,以便最终绘制图表。我不确定我是否应该按照我现在的方式或其他方法(我想我想为每只股票写一个新表,但是我有 w 桶的例子没有显示给我如何指定一个表。我相信我在 api 1.8 Write pandas DF with tags to influxdb vs https://www.influxdata.com/blog/getting-started-with-influxdb-and-pandas/ 中看到了这样的方法。现在我似乎卡在索引上
_now = df.index
这似乎是正确的,但如果我执行打印(df.index)。什么都没有出现。但如果我做一个 len(df.index) 它显示正确。如果我尝试选择手动索引...
_now = list(range(0,len(df.index),1))
我遇到同样的错误
这是我的代码
import concurrent.futures
import json
import time, random
import pandas as pd
import yfinance as yf
import urllib.request
import influxdb_client
import numpy as np
from numpy import inf
from influxdb_client.client.write_api import SYNCHRONOUS
import pandas_market_calendars as mcal
start="2017-01-01"
end="2020-04-30"
nyse = mcal.get_calendar('NYSE')
trading_dates= nyse.schedule(start_date=start, end_date=end)
idx2 = trading_dates.index
influx_bucket = "local"
influx_org = "localdomain"
influx_token = "H60IaJ-9t64Zws296bpkJZNIu7ylUbJxfD6RH-thenW2cvWgxXUpADJN1R4o9PsiUtZAE7KjuK0Wzcu_DypdbQ=="
# Store the URL of your InfluxDB instance
influx_url="http://192.168.3.114:8086"
client = influxdb_client.InfluxDBClient(
url=influx_url,
token=influx_token,
org=influx_org
)
write_api = client.write_api(write_options=SYNCHRONOUS)
url = 'ftp://ftp.nasdaqtrader.com/symboldirectory/nasdaqtraded.txt'
urllib.request.urlretrieve(url, 'nasdaqtraded.txt')
df = pd.read_csv('nasdaqtraded.txt', sep='|')
size=10
symbols = df["Symbol"][0:-1].sample(n=size)
class Stocks():
def __init__(self, db="stocks", stocks_file="stocks.json"):
self.stocks=None
self.db=db
self.stock_prices = list()
#column_names = ["Open", "High", "Low", "Close", "Volume", "Dividends", "Stock Splits", "Symbol"]
#df = pd.DataFrame(columns = column_names)
#self.load_stocks_from_json(stocks_file)
self.stocks=symbols
with concurrent.futures.ProcessPoolExecutor() as executor:
for out1 in executor.map(self.dl_stocks, self.stocks):
self.stock_prices.append(out1)
def load_stocks_from_json(self, stocks_file):
with open(stocks_file) as f:
data=json.load(f)
self.stocks=data['stocks']
def dl_stocks(self, stock):
ticker = yf.Ticker(stock)
_data_frame = ticker.history(start=start, end=end, interval="1d", auto_adjust=True)
#history = ticker.history()
#last_quote = (history.tail(1)['Close'].iloc[0])
idx1 = _data_frame.index
merged = idx1.union(idx2)
s = _data_frame.reindex(merged)
#s['New_Price'] = (1/s['Stock Splits']).replace(inf,1).cumprod() * s['Close']
df = s.interpolate()
df['Symbol'] = stock
df['Date'] = s.index
return (df)
def print_stocks(self):
print(self.stock_prices)
def write_db(self):
df = pd.concat(self.stock_prices, ignore_index=True)
#df.reset_index()
#_now = list(range(0,len(df.index),1))
_now = df.index
print(_now)
_data_frame = pd.DataFrame(data=df,index=_now,columns=['Date','Open','High','Low','Close','Volume','Symbol'])
#print(_data_frame)
#_write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet', data_frame_tag_columns=['location'])
#return(_data_frame)
write_api.write(influx_bucket, record=_data_frame, data_frame_measurement_name=['Date','Open','High','Low','Close','Volume','Symbol'],data_frame_tag_columns=['Date','Open','High','Low','Close','Volume','Symbol'])
def return_stocks(self):
return(self.stock_prices)
def main():
s=Stocks()
#set = s.return_stocks()
s.write_db()
"""
Prepare data
"""
"""
Query: using Table structure
"""
#query_api = client.query_api()
#tables = query_api.query('from(bucket:"local") |> range(start: -1y)')
#for table in tables:
#print(table)
#for record in table.records:
#print(record.values)
#result = client.query_api().query(org=influx_org, query=query)
#results = []
#for table in result:
#for record in table.records:
#results.append((record.get_field(), record.get_value()))
#print(results)
if __name__ == '__main__':
main()
有人建议执行器多处理可能导致问题......但即使我使用简单的 for 循环下载股票,我也会遇到同样的错误
def __init__(self, db="stocks", stocks_file="stocks.json"):
self.stocks=None
self.db=db
self.stock_prices = list()
#column_names = ["Open", "High", "Low", "Close", "Volume", "Dividends", "Stock Splits", "Symbol"]
#df = pd.DataFrame(columns = column_names)
#self.load_stocks_from_json(stocks_file)
self.stocks=symbols
for i in self.stocks:
stock = self.dl_stocks(i)
self.stock_prices.append(stock)
#with concurrent.futures.ProcessPoolExecutor() as executor:
#for out1 in executor.map(self.dl_stocks, self.stocks):
#self.stock_prices.append(out1)
【问题讨论】:
标签: python indexing influxdb bucket