【问题标题】:Scan every possible port in a host using Python使用 Python 扫描主机中所有可能的端口
【发布时间】:2018-03-06 04:20:54
【问题描述】:

我正在编写一个程序,它需要扫描主机中的所有 65535 个端口,以搜索打开的端口。这是我目前所拥有的,并且它可以工作,但是每次执行脚本都会产生不同的结果,为什么会发生这种情况?

def check_open_port(host, port):
    s = socket.socket()
    s.settimeout(0.1)
    # the SO_REUSEADDR flag tells the kernel to reuse a local 
    # socket in TIME_WAIT state, without waiting for its natural
    # timeout to expire.
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    try:
        code = s.connect_ex((host, port))
        s.close()

        if code == 0:
            return True
        else:
            return False
    except socket.error:
        return False


def get_open_ports(host, max_port=65535):
    open_ports = []

    def worker(port):
        if check_open_port(host, port):
            open_ports.append(port)


    pool = ThreadPoolExecutor(max_workers=10000)
    [pool.submit(worker, port) for port in range(1, max_port + 1)]
    pool.shutdown(wait=True)

    return open_ports

例如,在打开了 22、80 和 443 端口的主机中,有时我会收到以下响应:

[22, 80]

有时我会得到:

[22, 80, 443]

甚至:

[22]

拥有更多开放端口的主机会产生更多组合。

我玩过max_workerssettimeout() 值,但我无法让它正常工作。它唯一有效的时候是不使用线程,但显然它需要很长时间才能完成,我需要使用它们。

我错过了什么吗?还有其他方法可以实现吗?

【问题讨论】:

  • 您只想使用此代码执行此操作是否愿意使用nmap python 包? github.com/johanlundberg/python-nmap/blob/master/nmap/…
  • 我更喜欢使用这种代码,但如果没有其他选择,我会使用任何可行的代码。
  • 我相信您在添加 open_ports 时会遇到赛车状况。由于多个线程访问此变量,您应该使用 threading.Lock 确保一次只追加一个...但是,我无法在本地重现问题!?
  • 试过锁,问题依旧。顺便说一句,列表是线程安全的。

标签: python tcp ip port python-multithreading


【解决方案1】:

这里有 2 个问题:

  1. 我错过了什么
  2. 还有其他方法可以实现吗?

我错过了什么

我认为值得在这里检查错误代码:

if code == 0: 
    return True
else:
    return False

假设您正在尝试运行 池! 10K 个线程在这里可能会出现各种错误 - 即达到某些系统/您的用户限制(查看ulimit -a),您会将此类错误视为关闭的端口,恕不另行通知。它可能会解释您遇到的不稳定结果。

顺便说一句,在我的 MacBook 上,结果是一致的(检查我在 VPS 主机上的实时服务器)

我也会选择更少的线程数 - 10K 是多余的。例如,这里是pythondocs中建议的默认值:

在 3.5 版更改:如果 max_workers 为 None 或未给出,它将 默认为机器上的处理器数量,乘以 5, 假设 ThreadPoolExecutor 经常用于重叠 I/O 而不是 CPU工作的数量和worker的数量应该高于数量 ProcessPoolExecutor 的工人数

还有其他方法可以实现吗?

首先,没有必要使用threads/processes - 非阻塞套接字+ 事件multiplexorsepoll 已经存在多年,因此您无需额外的线程/处理任何东西就可以逃脱。

连接/关闭的方法也不是最理想的,因为您只需要检查端口是否打开 - 这里不需要完整的 TCP 连接。

在最简单的情况下,您只需要发送一个 SYN 段并检查服务器会响应什么。

这是一篇good 的文章,其中包含使用scapy 的十几种方法

Scapy 是一个强大的交互式数据包处理程序。这是 能够伪造或解码大量协议的数据包,发送 他们在线上,捕获他们,匹配请求和回复,等等 更多的。它可以轻松处理大多数经典任务,例如扫描, 跟踪路由、探测、单元测试、攻击或网络发现(它 可以替代hping、85%的nmap、arpspoof、arp-sk、arping、tcpdump、 tethereal、p0f 等)。

这是其中一种方法的描述(“TCP 连接扫描”):

客户端使用 SYN 标志和端口发送第一次握手到 在 TCP 数据包中连接到服务器。如果服务器响应 RST 而不是 SYN-ACK,则该特定端口在 服务器

还有一种方法(“TCP 隐形扫描”):

这种技术类似于 TCP 连接扫描。客户端发送一个 设置了 SYN 标志和要连接的端口号的 TCP 数据包。如果 端口打开,服务器以 SYN 和 ACK 标志响应 在 TCP 数据包中。但是这一次客户端发送一个 RST 标志 TCP 数据包而不是 RST+ACK,这是 TCP 连接中的情况 扫描。该技术用于避免端口扫描检测 防火墙

当然,如果只是想使用套接字/线程,即使没有 pcap/scapy,你的方法也可以

【讨论】:

    【解决方案2】:

    我在 jupyter notebook 上试过你的代码,我总是得到相同的端口集:

    get_open_ports('127.0.0.1')
    

    输出:

    [133, 200, 144...60700]

    是否有可能在特定时间为被查询的主机打开了不同数量的端口?

    为了验证一小组端口,我将max_port 减少到10000,但每次仍然得到相同的端口集:

    def get_open_ports(host, max_port=10000):
    open_ports = []
    
    def worker(port):
        if check_open_port(host, port):
            open_ports.append(port)
    
    with ThreadPoolExecutor(max_workers=10000) as executor:
        [executor.submit(worker, port) for port in range(1, max_port + 1)]
        executor.shutdown(wait=True)
    return open_ports
    
    get_open_ports('127.0.0.1')
    

    输出:[150, 900, 1035, 7789]

    注意:为了安全起见,我更改了端口号。

    编辑:

    def get_open_ports(host, max_port=65535):
        open_ports = []
    
        def worker(port):
            if check_open_port(host, port):
                open_ports.append(port)
    
    # We can use a with statement to ensure threads are cleaned up promptly
        with ThreadPoolExecutor(max_workers=100) as executor:
            print('main:starting')
            wait_for=[executor.submit(worker,port) for port in range(1, max_port + 1)]
            for f in as_completed(wait_for):
                print('main: result: {}'.format(f.result())) #check result on each thread execution
    
    #         executor.shutdown(wait=True)  #not required when using the 'with' statement
        return len(open_ports)
    
    test = get_open_ports('45.60.112.163') #hostname for www.indracompany.com
    
    #max_workers not defined & max_port=10000
    # len(test)     #test1: 148
    # len(test)     #test 2: 79
    
    #max_workers = 10000 & max_port=65535
    # len(test)      #test1: 1
    # len(test)      #test2:1
    # len(test)      #test3:1
    
    #max_workers = 20000 & max_port=65535
    
    # len(test)  #test1: 14
    # len(test)  #test2:1
    # len(test)  #test3: 1
    # len(test)  #test4:1
    
    #max_workers not defined & max_port=65535 #quite time-consuming
    # len(test)   #test1: 63
    

    编辑 2:更可靠的解决方案

    正如@Tarun 所建议的,Python 的python-nmap 库在扫描主机方面做得更好。

    以下解决方案给出了准确的结果,但是,我观察到随着端口发现范围的增加,性能会显着降低。也许,可以将线程合并到代码中以提高性能。最后我还导入了时间库来获取程序的执行时间。这可用于在测试性能时进行比较。

    # The python-nmap library helps to programmatically manipulate scanned results of nmap to automate port scanning tasks. 
    # To use this library you must have the Nmap software installed. This can be installed from https://nmap.org/download.html.
    # Network Mapper (Nmap) is a free and open-source tool used for network discovery and security auditing. 
    # It runs on all major computer operating systems, and official binary packages are available for Linux, Windows, and Mac OS X.
    # For Windows 7 and later, you must also upgrade 'NCap' from https://nmap.org/npcap/ 
    # For Windows, make sure nmap.exe is added to PATH.
    # When you're ready, pip install python-nmap
    
    import time
    import nmap
    nm = nmap.PortScanner() #initialize PortScanner object
    host = '45.60.112.163'  #specify host
    nm.scan(host, '1-100') #run the scan, specify host and range of ports to scan
    
    #Optional steps for verification:
    
    #Output: nmap -oX - -p 1-100 -sV 45.60.112.163
    print(nm.command_line()) #command_line command to execute on nmap command prompt
    
    #Output: {'tcp': {'method': 'syn', 'services': '1-100'}}
    print(nm.scaninfo())   #nmap scan information
    
    #Now we can scan all hosts
    #From Official documentation at https://xael.org/pages/python-nmap-en.html
    start_time = time.time()   #To get program execution time
    for host in nm.all_hosts(): 
        print('----------------------------------------------------')
        print('Host : %s (%s)' % (host, nm[host].hostname()))
        print('State : %s' % nm[host].state())
        for proto in nm[host].all_protocols():
            print('----------')
            print('Protocol : %s' % proto)
            lport = nm[host][proto].keys()
            for key in sorted(lport):
                for port in lport:
                    print ('port : %s\tstate : %s' % (port, nm[host][proto][port]['state']))
    print('Execution time: %s seconds' % (time.time() - start_time))
    
        #Output:
        ----------------------------------------------------
        Host : 45.60.112.163 ()
        State : up
        ----------
        Protocol : tcp
        port : 25       state : open
        port : 51       state : open
        port : 53       state : open
        port : 80       state : open
        port : 81       state : open
        port : 85       state : open
        port : 91       state : open
        port : 25       state : open
        port : 51       state : open
        port : 53       state : open
        port : 80       state : open
        port : 81       state : open
        port : 85       state : open
        port : 91       state : open
        port : 25       state : open
        port : 51       state : open
        port : 53       state : open
        port : 80       state : open
        port : 81       state : open
        port : 85       state : open
        port : 91       state : open
        port : 25       state : open
        port : 51       state : open
        port : 53       state : open
        port : 80       state : open
        port : 81       state : open
        port : 85       state : open
        port : 91       state : open
        port : 25       state : open
        port : 51       state : open
        port : 53       state : open
        port : 80       state : open
        port : 81       state : open
        port : 85       state : open
        port : 91       state : open
        port : 25       state : open
        port : 51       state : open
        port : 53       state : open
        port : 80       state : open
        port : 81       state : open
        port : 85       state : open
        port : 91       state : open
        port : 25       state : open
        port : 51       state : open
        port : 53       state : open
        port : 80       state : open
        port : 81       state : open
        port : 85       state : open
        port : 91       state : open
        Execution time: 0.015624761581420898 seconds
    

    要将输出转换为 csv,请使用:

    print(nm.csv())
    

    作为这次调查的结果,Nmap 现在已安装在我的计算机上。只是为了好玩,我还使用以下命令在命令提示符下运行了扫描。此扫描运行范围为“1-1000”,耗时超过 15 分钟(我没有坐完整个会话!)。

    【讨论】:

    • 为什么你的答案会奏效?最后还是和我一样。
    • 其实不行:AttributeError: 'set' object has no attribute 'shutdown'
    • 抱歉,我正在处理多项任务,无法编辑答案。我已经尝试了您的代码,它对于我定义的主机完美执行。
    • 尝试一个有很多开放端口的主机,每次扫描的结果都会有所不同。例如,indracompany.com
    • 我测试了indracompany.com,你是对的,结果差异很大。我用 cmets 修改了代码,并注意到变化主要是由于 max_workers 参数。根据这个答案linkmax_workers 没有最佳数字。我在代码中添加了一个额外的检查来验证每个线程执行的结果。例如,当max_workers=100 时,我得到一个很长的main:result:None 列表。这意味着在大量线程提交时不会返回任何端口号。
    【解决方案3】:

    我在 python 中使用套接字时遇到过这个端口扫描器...

    import socket
    import threading
    from queue import *
    
    print_lock = threading.Lock()
    target = input("Enter websit or IP Adress to scan: ")
    minPort = int(input("Enter minimum Port to scan (1 is the smallest): "))
    maxPort = int(input("Enter maximum Port to scan: "))
    threadNo = int(input("Enter No. of threads to use(500 is a good all around number): "))
    
    def portscan(port):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            con = s.connect((target, port))
            with print_lock:
                print("Port", port, "is open!")
            con.close()
        except:
            pass
    
    def threader():
        while True:
            worker = q.get()
            portscan(worker)
            q.task_done()
    
    q = Queue()
    
    for x in range(threadNo):
        t = threading.Thread(target=threader)
        t.daemon = True
        t.start()
    
    for worker in range (minPort, maxPort):
         q.put(worker)
    
    q.join()
    

    它工作得很好,很容易适应:)

    【讨论】:

      猜你喜欢
      • 2018-06-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-06-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多