【问题标题】:Python Twisted Conch - How to stop Reactor with multiple Connections?Python Twisted Conch - 如何通过多个连接停止反应器?
【发布时间】:2014-10-09 22:17:56
【问题描述】:

让我从这个开始……我根本不懂 Python;我在兜圈子,我根本不明白。我对替代和更简单的方法完全开放。

我的目标:连接到不同的服务器,在每个服务器上运行相同的命令,然后(如现在/还没有)将输出用于生产性事情。太棒了。

我有什么:在某处找到了一些代码(我会尝试找到一个链接并更新它)。我稍微修改了一下。它连接到不同的服务器,运行相同的命令。

问题:我不知道一旦一切完成后如何停止反应器。我真的想在不按cntrl+c 的情况下停止它。我想我需要推迟一些事情,但我不知道什么或在哪里。我觉得当 SSHChannel 关闭时,需要以某种方式冒泡到 SSHConnection,以停止服务......所以传输可以知道发生了什么?而且我一直想以某种方式将每个 reactor.connectTCP(server, 22, factory) 包装成一个 deferred 。而且我觉得我可能需要一个控制器类。我尝试了这些东西,但没有正确尝试。也许gatherResults 可能会有所帮助,但是,我不知道到底该放什么。

from twisted.conch.ssh import transport, connection, userauth, channel, common
from twisted.internet import defer, protocol, reactor
import sys, struct  

USER = 'username'
PASS = 'thisisforpersonalusesoicanstoreit!' 
CMD  = 'echo "merely this and nothing more"'


from twisted.python import log
import sys
log.startLogging(sys.stdout)


class ClientCommandTransport(transport.SSHClientTransport):
    def __init__(self, username, password, command):
        self.username = username
        self.password = password
        self.command  = command

    def verifyHostKey(self, pubKey, fingerprint):
        print fingerprint 
        return defer.succeed(True)

    def connectionSecure(self):
        self.requestService(
            PasswordAuth(self.username, self.password,
                         ClientConnection(self.command)))    

class PasswordAuth(userauth.SSHUserAuthClient):
    def __init__(self, user, password, connection):
        userauth.SSHUserAuthClient.__init__(self, user, connection)
        self.password = password

    def getPassword(self, prompt=None):
        return defer.succeed(self.password)

class ClientConnection(connection.SSHConnection):
    def __init__(self, cmd, *args, **kwargs):
        connection.SSHConnection.__init__(self)
        self.command = cmd

    def serviceStarted(self):
        self.openChannel(CommandChannel(self.command, conn=self))  

class CommandChannel(channel.SSHChannel):
    name = 'session'

    def __init__(self, command, *args, **kwargs):
        channel.SSHChannel.__init__(self, *args, **kwargs)
        self.command = command
        self.data = ''

    def channelOpen(self, data):
        self.conn.sendRequest(
            self, 'exec', common.NS(self.command), wantReply=True).addCallback(
                                                            self._gotResponse)

    def _gotResponse(self, _):
        self.conn.sendEOF(self) 
        self.loseConnection() 

    def dataReceived(self, data):
        #self.data += data
        print data 

    def request_exit_status(self, data):
        (status,) = struct.unpack('>L', data)
        # print 'exit status = ', status  

class ClientCommandFactory(protocol.ClientFactory):
    def __init__(self, command=CMD):
        self.username = USER
        self.password = PASS
        self.command  = command

    def buildProtocol(self, addr):
        protocol = ClientCommandTransport(
            self.username, self.password, self.command)
        return protocol    


masters = ['server1','server2','server3','server4','server5']

factory = ClientCommandFactory()

for server in masters:
    print server
    reactor.connectTCP(server, 22, factory)

reactor.run()

我确实玩过延迟 getPage 的 http 请求(确实有效),但我似乎无法通过反应器和 ssh 连接重新应用它。

这些是我真正希望我能理解的资源:


有了下面的一个答案...我测试了传递对工厂的引用,如果工厂在其数组中没有更多连接(或任何 python调用数组)。

我将工厂更新为现在也包含此方法:

class ClientCommandFactory(protocol.ClientFactory): 

    def clientConnectionLost(self, connector, reason):
        print reason

我查看了日志记录,因为我通常对正在发生的事情感兴趣并且...(其中一些是我自己的陈述,一些是默认的)

014-10-16 13:42:58-0500 [SSHChannel session (0) on SSHService ssh-connection on ClientCommandTransport,client] closed last TCP connection
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] service stopped 
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] connection lost
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionLost'>: Connection to the other side was lost in a non-clean fashion: Connection lost.
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] ]
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] connection lost
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionLost'>: Connection to the other side was lost in a non-clean fashion: Connection lost.
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] ]
2014-10-16 13:42:58-0500 [ClientCommandTransport,client] Stopping factory <__main__.ClientCommandFactory instance at 0x02323030>
2014-10-16 13:42:58-0500 [-] Main loop terminated.

所以...它说连接以不干净的方式丢失。有没有更好的方法让我停下来..?

【问题讨论】:

  • 我大约一周前写了这个问题......决定放弃它,因为 paramiko。刚刚意识到我需要使用相同的连接运行多个命令(&amp;&amp; 连接还不够)。我仍然真的不知道该怎么办。最终,我想根据命令的响应来处理断开连接,但这些都不重要。
  • 你检查this了吗?可能你必须自己实现一些东西才能优雅地关闭连接。无论如何,这与工作完成后停止反应堆无关。
  • @koleS - 谢谢,我没看到。我尝试在下面有人给我的答案中应用这些东西,但反应堆偶尔会过早停止,因为在第一个或两个完成后并非所有连接都已添加。到处都是这么多事情,我不确定在哪里或如何捕获或管理事情。

标签: python ssh twisted


【解决方案1】:

所以首先这是行不通的,因为connectTCP 接受带有IP address 作为第一个参数的字符串,并且您正在传递此列表中的元素:

masters = ['server1','server2','server3','server4','server5']

在所有任务完成后停止反应器是扭曲的非常常见的用例。一种方法是存储要在工厂中执行的任务计数器。每次实例化这个工厂的协议实例时,将该数字加一,每次协议实例(任务)返回结果时,减少计数器,当计数器达到 0 时停止反应器。示例代码:

from twisted.conch.ssh import transport, connection, userauth, channel, common
from twisted.internet import defer, protocol, reactor
import sys, struct  

USER = 'username'
PASS = 'thisisforpersonalusesoicanstoreit!' 
CMD  = 'echo "merely this and nothing more"'


from twisted.python import log
import sys
log.startLogging(sys.stdout)


class ClientCommandTransport(transport.SSHClientTransport):
    def __init__(self, username, password, command, factory):
        self.username = username
        self.password = password
        self.command  = command
        self.factory = factory

    def verifyHostKey(self, pubKey, fingerprint):
        print fingerprint 
        return defer.succeed(True)

    def connectionSecure(self):
        self.requestService(
            PasswordAuth(self.username, self.password,
                         ClientConnection(self.command, self.factory)))

class PasswordAuth(userauth.SSHUserAuthClient):
    def __init__(self, user, password, connection):
        userauth.SSHUserAuthClient.__init__(self, user, connection)
        self.password = password

    def getPassword(self, prompt=None):
        return defer.succeed(self.password)

class ClientConnection(connection.SSHConnection):
    def __init__(self, cmd, *args, **kwargs):
        connection.SSHConnection.__init__(self)
        self.command = cmd
        self.factory = factory

    def serviceStarted(self):
        self.openChannel(CommandChannel(self.command, self.factory, conn=self))

class CommandChannel(channel.SSHChannel):
    name = 'session'

    def __init__(self, command, factory, *args, **kwargs):
        channel.SSHChannel.__init__(self, *args, **kwargs)
        self.command = command
        self.data = ''
        self.factory = factory
        self.factory.num_connections += 1
        self.factory.connections.append(self)

    def channelOpen(self, data):
        self.conn.sendRequest(
            self, 'exec', common.NS(self.command), wantReply=True).addCallback(
                                                            self._gotResponse)

    def _gotResponse(self, _):
        self.conn.sendEOF(self) 
        self.loseConnection()
        self.factory.num_connections -= 1
        self.factory.connections.remove(self)
        if self.factory.num_connections == 0:
            reactor.stop()

    def dataReceived(self, data):
        #self.data += data
        print data 

    def request_exit_status(self, data):
        (status,) = struct.unpack('>L', data)
        # print 'exit status = ', status  

class ClientCommandFactory(protocol.ClientFactory):
    def __init__(self, command=CMD):
        self.username = USER
        self.password = PASS
        self.command  = command
        self.connections = []
        self.num_connections = 0

    def buildProtocol(self, addr):
        protocol = ClientCommandTransport(
            self.username, self.password, self.command, self)
        return protocol    


masters = ['server1','server2','server3','server4','server5']

factory = ClientCommandFactory()

for server in masters:
    print server
    reactor.connectTCP(server, 22, factory)

reactor.run()

我在这里所做的是向工厂添加了两个变量self.connectionsself.num_connections 来存储对工厂中连接的引用并计算连接数。然后在工厂的buildProtocol 中,工厂将自己传递给ClientCommandTransport,后者又将工厂的引用传递给ClientConnection,最后将工厂的引用传递给需要它的工厂——CommandChannel。每次实例化CommandChannel 的实例时,它都会引用工厂,因此它会将连接数增加一并将自己添加到存储在工厂中的连接列表中。我假设在完成任务/命令时会触发 _gotResponse 回调。因此,每当它被触发时,它都会像以前一样失去连接,但现在,它还会减少连接计数器并从工厂中删除对自身的引用。它还会检查是否有任何其他打开的连接,如果没有,它会停止反应器。

我没有测试过这段代码,但它是 Twisted 中的一种常见模式,工厂保留了对其创建的协议实例的引用列表,因此每个实例都可以通过工厂访问其他实例并且能够停止反应堆一旦所有实例都完成了他们的工作。

请注意,这个层次结构也有点深,Factory -> ClientCommandTransport -> ClientConnection -> CommandChannel 我不确定它是否是一路传递对工厂的引用的最佳解决方案下来。

其中一个变量实际上是多余的 - 您可以仅存储 self.num_connections 并增加/减少它,也可以存储 self.connections,从列表中添加/删除实例并使用 len(self.connections) 查看是否仍有任何打开的连接.

【讨论】:

  • 哦...所以我发布的内容确实有效。一旦我想完成事情,我就无法停止反应堆。我现在没有时间查看您的全部回复,但我稍后会回来。
  • 您的答案看起来确实不错,虽然...有点像我在想的,但由于某种原因我无法做到。我想我需要将事情向下传递,而不是“冒泡”断开连接。只要您确定您提出的想法,您无需为我测试代码。
  • 如果您认为我的回答does look pretty good,那么您可以接受。我不明白您将事情向下传递(什么事情,向下传递到什么点等)是什么意思。我认为必须将连接数存储在工厂中,因为每次调用buildProtocol 都会实例化所有其他类一次,因此工厂是唯一可以跟踪连接数的实体。 Here 你可以找到一个很棒的扭曲教程。它的一部分,即this one 提出了计算任务的想法。
  • 我要等... bc I am not sure if it was the optimal solution to pass the reference to the factory all way down.
  • 您始终可以将引用存储在全局变量中。 ^^
猜你喜欢
  • 1970-01-01
  • 2013-06-21
  • 2012-07-12
  • 1970-01-01
  • 2013-07-30
  • 2021-10-29
  • 1970-01-01
  • 2012-12-04
  • 1970-01-01
相关资源
最近更新 更多