【问题标题】:Python extract data from files on ftp server and append to pandas data frame in offline and real timePython 从 ftp 服务器上的文件中提取数据并离线和实时附加到 pandas 数据帧
【发布时间】:2021-04-04 03:48:58
【问题描述】:

我正在尝试编写一个脚本,该脚本可以从 ftp 服务器上已经存在的文件中提取数据(如果有的话),并继续监视 ftp 目录中是否有任何新传入文件也从新出现的文件中提取数据并附加到数据中框架。更详细:

  • 登录到给定文件夹中的 FTP 服务器
  • 如果文件夹中存在任何文件,使用某些函数从每个文件中提取数据并附加到 pandas 数据框
  • 继续查看目录中出现的任何新文件,从新出现的文件中提取数据并附加到 pandas 数据框
  • 如果等待超过时间限制退出,则等待新文件出现

到目前为止我所写的:

import pandas as pd
import numpy as np
import os
from ftplib import FTP
from time import sleep
import time

# Here I define my empty data frame to which I will append my extracted data
cols = [  'Channel' , 'Voltage' , 'Amplitude',  'Time_(ms)', 'Bubble_period_(ms)']
all_results =  pd.DataFrame(columns = cols)

# Here I define my empty list which will be used to append the 
data = []


#Function to monitor FTP and extract data
def extract_ftp_results(ftp_folder_path):
    global all_results

    ftp = FTP()
    ftp.connect('10.199.44.240', 21)
    ftp.login('display')

  
    ftp.cwd(str(ftp_folder_path))
    print("Connection Established {}".format(ftp.getwelcome()))
    

    #Local directory where I copy each file for extracting attributes/data
    direct = 'C:\\Users\\QC\\Desktop\\ftp_local\\'

    #Create a list with one element to compare with the contents of ftp directory
    # it will to be used in running for loop later in the function  
    old_files = ['1']

    #Start a while loop to monitor the ftp directory 
    while True:                       #Start a while loop to monitor FTP directory 
        new_files = ftp.nlst()        #List the filenames of FTP directory and store in variable 
        if len(old_files) != 0 and new_files != old_files:    # Check if filenames match with old_files
            changes = [i for i in new_files if i not in old_files] # store the contents which don't match
        
            for x in changes: #for each filename that was not in the old_files
                filename = str(direct + x) # Define a filename where it will be written 
                localfile = open(filename, 'wb') #Open that filename in write mode
                ftp.retrbinary('RETR'+' ' + x , localfile.write, 1024) #Fetch from FTP and write
                localfile.close() # Close file
                print("updating data ***************************************************")
                print("found new file---> {}".format(str(filename).split('\\')[-1]))
                print("")
                print("Calculating  Attributes")
                print("*****************************************************************")
                sensor_arr , nfh_arr, gcs, mask, chan  = extract_data(filename) #extract data in np.array
                i=0
                num_cluster = 18
                sequence = gcs[11:19]
                shot = gcs[25:29]
                while i < num_cluster:  #loop through the numpy array extracted from file
                    poa, pot, bp = bubble_attributes(apply_filter(nfh_arr[15:500, i]))#extract attributes
                    values = [shot, chan, poa, pot, bp]
                    zipped = zip(cols, values) # zip attributes with column name 
                    a_dictionary = dict(zipped) # convert to dictionary 
                    data.append(a_dictionary) # Append dictionary to data list 
                    chan = chan +1
                    i += 1 
               os.remove(filename) # remove file from local machine 
               all_results = all_results.append(data, True) # append list of dictionaries to dataframe
            old_files = new_files
            a = time.perf_counter() #start time counter
            
            
            if time.perf_counter() > a + 100:
               print("Done Waiting") # break if wait for new file appearing exceeds
               break

问题是我得到了一个数据框,其中包含来自文件的重复值一次又一次地附加到数据框中,就像对于 new_file 列表的每个元素,循环每次都从开始运行。 有人可以帮忙吗

【问题讨论】:

  • changes 中的文件是否包含old_files 文件中的数据?
  • 我实际上将 old_files 定义为一个 hack 来监视目录中出现的任何新文件,ftp 目录的内容将一次又一次地存储在 new_files(Python list) 变量中,它们将随后检查它们是否匹配 old_files (Python 列表)的内容,如果它们不匹配,则不匹配的元素将向下移动到 for 循环。一旦 for 循环完成 new_files 将等于 old_files,我忘了在代码中添加它,现在编辑
  • values = [shot, chan, poa, pot, bp] - 每个字典包含多个数据点?有没有办法区分唯一的数据点 - 你怎么知道是否有重复?每个唯一数据点是否会有唯一的(shot,chan) 值?您需要在保存之前或之后进行过滤,但您需要知道如何确定独特点。
  • 是的,每个 (shot,chan) 值都是唯一的
  • 我尝试使用all_results = all_results.drop_duplicates() 但这不值得,因为循环中的值总是从第一个值或更改元素(Python 列表)变量开始,这使得循环需要很长时间

标签: python python-3.x pandas loops ftp


【解决方案1】:

每个 (shot,chan) 值都是唯一的

保留一组已经看过的(shot,chan) 元组;在保留数据点之前对照该集合检查每个新点。

...
seen = set()

#Function to monitor FTP and extract data
def extract_ftp_results(ftp_folder_path):
    ...

    #Start a while loop to monitor the ftp directory 
    while True:                       #Start a while loop to monitor FTP directory 
       ...

            for x in changes: #for each filename that was not in the old_files
                ...
                while i < num_cluster:  #loop through the numpy array extracted from file
                    poa, pot, bp = bubble_attributes(apply_filter(nfh_arr[15:500, i]))#extract attributes
                    values = [shot, chan, poa, pot, bp]
                    if (shot,chan) in seen:
                        continue
                    seen.add((shot,chan))
                    zipped = zip(cols, values) # zip attributes with column name 
                    a_dictionary = dict(zipped) # convert to dictionary 
                    data.append(a_dictionary) # Append dictionary to data list 
                    chan = chan +1
                    i += 1 
               ...
            

集合非常适合成员资格测试,您可以使用集合改进查找新文件的方式。

...
    old_files = set()

    #Start a while loop to monitor the ftp directory 
    while True:                       #Start a while loop to monitor FTP directory 
        new_files = set(ftp.nlst())        #List the filenames of FTP directory and store in variable 
        changes = new_files - old_files    # get new filenames
        if not changes:
            continue
        old_files.update(changes)
        for x in changes: #for each filename that was not in the old_files:
            ...
            os.remove(filename) # remove file from local machine 
            all_results = all_results.append(data, True) # append list of dictionaries to dataframe
        #old_files = new_files
        a = time.perf_counter() #start time counter
        ...

【讨论】:

  • 感谢@wwii 我会实施并检查,也感谢改进建议
  • 我在哪里定义我的 for 循环,for x in changes,在哪里捕获更改的每个元素,在 continue put else 之后?在 True 或 False boolean 检查集合是否为空之后?
  • @abhishake - 见编辑 - 整个 for 循环可以去凹一级。如果没有任何新文件,if not changes: continue 将返回到 while 循环的顶部。 https://docs.python.org/3/reference/simple_stmts.html#the-continue-statement。您需要删除while True 循环底部附近的old_files = new_files
  • 我仍然遇到一些问题,因为数据集中的行被重复复制,让我创建一个可重现的示例,我的代码中的示例,您可以将手机用作 ftp 服务器和这个实验的客户端,我将创建一些模拟我拥有的文件的虚拟 numpy 文件
猜你喜欢
  • 2015-07-18
  • 2021-10-05
  • 2020-11-10
  • 1970-01-01
  • 1970-01-01
  • 2020-10-28
  • 2023-04-10
  • 1970-01-01
相关资源
最近更新 更多