【问题标题】:In Python, for loop one file to search another在 Python 中,for 循环一个文件来搜索另一个文件
【发布时间】:2015-03-10 19:33:45
【问题描述】:

希望能提供一些关于如何增强我的 python 脚本以解决问题的线索。我有一个文件,其中列出了数千个移动工作站和 IP 地址,每五分钟更新一次。我使用 Paramiko 将 ssh 连接到每个工作站以验证服务是否正在运行(在此示例中为 crond)。我遇到的问题是,当我启动我的 python 脚本时,它会将大文件读入内存,当它下降 1/3 时,IP 地址已经改变,大部分 IP 地址不再有效。有没有办法让 python 打开然后在工作站每次搜索之前关闭文件?这将确保 IP 是当前 IP。我在下面编写的 python 脚本可以工作,但我又遇到了旧 IP 信息的问题。谢谢。

The contents of WKSIPS.txt are in the format:

WORK  1234  Cell IP: 10.10.10.10
WORK  4567  Cell IP: 10.10.10.11

#!/usr/bin/python
import paramiko, os, string, threading
import getpass
import socket
import sys

FileName=open('WKSIPS.txt', 'r')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

for line in FileName.readlines():
    WKSid = line.split()
    try:
        if WKSid[0] == 'WORK' :
            WKSip = WKSid[4]   
            ssh.connect(WKSip, username='user', password='password', timeout='3')
            stdin, stdout, stderr = ssh.exec_command('service crond status')
            Out =  stdout.readlines()
            print ("WORK  " + WKSid[1], Out)
            ssh.close()
            FileName.close
   except paramiko.SSHException, e:
       print ('WORK' + WKSid, WKSip, "Invalid Password")

【问题讨论】:

  • 如果您按照您的建议关闭并重新打开文件,您想从“新”文件的开头重新开始吗?还是在您离开的同一行?从同一行开始会导致任何问题(例如行数改变)吗?
  • 您可以使用multiprocessing module 并行执行一些工作。至少,您可以让子进程执行 ssh 工作,这样您就不会阻止文件读取。
  • 因为这个工作站名称是静态的,而且只有 IP 地址发生变化,我需要它从中断的地方继续。
  • 行的顺序有变化吗?行数有变化吗?
  • 如果在无效 IP 地址上运行命令会发生什么?这是您可以识别并选择忽略的事件吗?

标签: python paramiko readlines


【解决方案1】:

我建议使用multiprocessing pool 来创建可以为您处理文件的每一行的工作人员,这样您可以更快地处理文件。

我使用此代码 sn-p 的目标是通过使您的脚本足够快以在 5 分钟文件刷新之前完成,从而完全回避文件重新加载问题

#!/usr/bin/python

import paramiko, os, string, threading
import multiprocessing
import getpass
import socket
import sys

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

def worker(line)
    WKSid = line.split()
    try:
        if WKSid[0] == 'WORK' :
            WKSip = WKSid[4]

            ssh.connect(WKSip, username='user', password='password', timeout='3')
            stdin, stdout, stderr = ssh.exec_command('service crond status')
            Out =  stdout.readlines()
            print ("WORK  " + WKSid[1], Out)
            ssh.close()
    except paramiko.SSHException, e:
        print ('WORK' + WKSid, WKSip, "Invalid Password")

# set up processing pool
pool = multiprocessing.Pool()

with open('WKSIPS.txt') as infile:
    pool.map(worker, infile)

pool.close()
pool.join()

注意事项:

  • 我已将脚本功能的主要部分移到一个接受文件行作为输入的函数中。 pool.map() 将为每个工作人员提供一行文件迭代器,他们将独立处理它。它与您的原始代码执行相同的操作,但工作分配给与您机器上的内核数量相等的多个进程。
  • 我没有 paramiko 模块,无法在我当前的环境中安装它,所以我无法真正为您测试此代码。如果有任何错误,我提前道歉。
  • 我不熟悉 paramiko 库,因此在多个进程中同时使用相同的 paramikio.SSHClient() 对象可能会产生一些隐藏的副作用。如果您看到来自 ssh 对象的奇怪错误,请尝试将其实例化移动到工作函数中。
  • 我已将readlines() 更改为使用Python 文件迭代器。将整个文件读入内存是一项耗时的操作,应该避免。

为了清楚起见,这段代码 sn-p 处理文件运行时发生的更改。我做了两个重要的假设:

  1. 此脚本的执行可以与刷新文件的任何操作同步,以便在文件刷新后立即执行。
  2. 它可以在 5 分钟内执行 - 因为我没有 paramiko、访问 ssh 目标或访问 WKSIPS.txt,所以我无法计时。由于这个问题似乎符合embarassingly parallel的定义,我觉得值得一试。如果不符合时间规范,则由 OP 进一步优化。

【讨论】:

  • 多处理池方法似乎是个好主意,但是在运行中对文件的更改不会反映在当前编写的代码中。
  • 确实如此。我假设这个脚本的运行可以与文件刷新同步。我将进行编辑以澄清这一点。
  • 我很欣赏多处理建议,但由于其中一些工作站是通过无线电连接的,因此它能否在五分钟内完成值得怀疑。目前,按照我写的方式,由于建立 ssh 连接的延迟,需要 4 个多小时。
  • @gineraso 哦。该死的。有了这个想法。
  • @gineraso 对于它的价值,我仍然建议您在理顺处理文件刷新之后实现某种并行性。由于多个进程处理 SSH 连接,您仍然可以看到运行时间显着减少。
【解决方案2】:

这是非常昂贵的,但我认为它仍然是了解这种方法是否适合您的第一步:

import re
import paramiko

def verify_service(ssh, work, ip):
    print("Verifying workstation %d at %s" % (work, ip))
    ssh.connect(ip, username='user', password='password', timeout='3')
    stdin, stdout, stderr = ssh.exec_command('service crond status')
    print ("WORK  " + work, stdout.readlines())
    ssh.close()

IP_LIST = 'WKSIPS.txt'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# Get number of lines in WKSIPS -- if you know this, you can just define NUM_LINES
NUM_LINES = None
with open(IP_LIST) as f:
    NUM_LINES = len(f.readlines())

for i in xrange(NUM_LINES):
    with open(IP_LIST) as f:
        #line = f.readlines()[i]
        line = next(islice(f, i, None))
        mat = re.match(r'WORK\s*(\d+)\s*Cell IP: (.*)', line)
        if mat:
            verify_service(ssh, int(mat.group(1)), mat.group(2))

代码首先打开您的文件并计算行数,并将其存储在NUM_LINES 中。如果你知道这个数字,你可以去掉NUM_LINES = None这一行和它下面的两个,直接用NUM_LINES = <number of lines>代替。

然后,对于0到NUM_LINES之间的每个行号i(不包括),它打开文件,将整个文件读入一个列表,拉出对应的行to line number i 遍历行,直到到达 ith 行,对其进行解析并将其传递给 verify_service() 函数 - 您必须使用 paramiko 代码更新该函数。

如果可行,您应该考虑更好的方法来做到这一点。也许您不必每次迭代都重新读取文件,也许每个 n 次迭代都可以正常工作。也许您对文件进行哈希处理并在重新读取之前检查哈希是否更改。也许您检查文件的修改时间等。也许您使用子进程并尝试同时处理多个连接而不是连续处理。

无论如何,如果这可行,您应该考虑对其进行优化 - 因为目前编写的它非常昂贵,但它会按照您所说的去做。

【讨论】:

  • 我认为你是对的,必须有更好的方法来做到这一点。让我给你一个解决方案,看看是否符合我的需要。谢谢。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-10-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-02
相关资源
最近更新 更多