【问题标题】:Identify current thread in concurrent.futures.ThreadPoolExecutor在 concurrent.futures.ThreadPoolExecutor 中识别当前线程
【发布时间】:2021-10-09 08:43:34
【问题描述】:

以下代码有 5 个工人 .... 每个工人都打开自己的 worker_task()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(worker_task, command_, site_): site_ for site_ in URLS}

    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try: data = future.result()

但是......在每个 worker_task() ......我无法识别...... 5个工人中的哪一个当前正在使用( Worker_ID)

如果我想在 worker_task()print('worker 3 has finished') ..... 我不能这样做,因为 executor.submit 不允许

有什么解决办法吗?

【问题讨论】:

  • stackoverflow.com/questions/65480812/… --> 这应该会有所帮助。
  • 谢谢,这不会将 worker 分配给我的 URLS 列表...我需要创建一个 done/pending 变量对于每个 URLS ... 甚至不能放入 executor.submit() 语句中......这个链接提供了跟踪线程 ID 的能力......但是如果不链接任务 URL,这样做是没有用的与工人
  • 有什么意见吗?

标签: python python-3.x multithreading threadpoolexecutor concurrent.futures


【解决方案1】:

您可以借助threading.current_thread() 函数获取工作线程的名称。请在下面找到一些示例:

from concurrent.futures import ThreadPoolExecutor, Future
from threading import current_thread
from time import sleep
from random import randint

# imagine these are urls
URLS = [i for i in range(100)]


def do_some_work(url, a, b):
    """Simulates some work"""
    sleep(2)
    rand_num = randint(a, b)
    if rand_num == 5:
        raise ValueError("No! 5 found!")
    r = f"{current_thread().getName()}||: {url}_{rand_num}\n"
    return r


def show_fut_results(fut: Future):
    """Callback for future shows results or shows error"""
    if not fut.exception():
        print(fut.result())
    else:
        print(f"{current_thread().getName()}|  Error: {fut.exception()}\n")


if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=10) as pool:
        for i in URLS:
            _fut = pool.submit(do_some_work, i, 1, 10)
            _fut.add_done_callback(show_fut_results)

如果您想对线程进行更多控制,请使用threading 模块:

from threading import Thread
from queue import Queue
from time import sleep
from random import randint
import logging

# imagine these are urls
URLS = [f"URL-{i}" for i in range(100)]

# number of worker threads
WORKER_NUM = 10


def do_some_work(url: str, a: int, b: int) -> str:
    """Simulates some work"""
    sleep(2)
    rand_num = randint(a, b)
    if rand_num == 5:
        raise ValueError(f"No! 5 found in URL: {url}")
    r = f"{url} = {rand_num}"
    return r


def thread_worker_func(q: Queue, a: int, b: int) -> None:
    """Target function for Worker threads"""
    logging.info("Started working")
    while True:
        try:
            url = q.get()

            # if poison pill - stop worker thread
            if url is None:
                break

            r = do_some_work(url, a, b)
            logging.info(f"Result: {r}")
        except ValueError as ex:
            logging.error(ex)
        except Exception as ex:
            logging.error(f"Unexpected error: {ex}")

    logging.info("Finished working")


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.INFO,
        format="%(levelname)s | %(threadName)s | %(asctime)s | %(message)s",
    )
    in_q = Queue(50)
    workers = [
        Thread(target=thread_worker_func, args=(in_q, 1, 10, ), name=f"MyWorkerThread-{i+1}")
        for i in range(WORKER_NUM)
    ]
    [w.start() for w in workers]

    # start distributing tasks
    for _url in URLS:
        in_q.put(_url)

    # send poison pills to worker-threads
    for w in workers:
        in_q.put(None)

    # wait worker thread to join Main Thread
    logging.info("Main Thread waiting for Worker Threads")
    [w.join() for w in workers]

    logging.info("Workers joined")
    sleep(10)
    logging.info("App finished")

【讨论】:

  • 我想将线程名称thread1 发送到do_some_work() ..........我正在尝试了解名称是什么,以及如何召唤它
  • 然而,我现在意识到......我可以创建一个列出 [1,10] 个空闲线程。然后 randomly select 一个 idle 线程 ...并在函数调用 submit() 中分配它
  • 你的随机数使用情况...也许是答案的关键...。希望看到[空闲数]的选择,例如thread1 [busy] , thread2 [busy], thread3 [selected] .......而不是if rand_num == 5: ...... Thread_Assignment 用于在do_some_work() 内打印[thread1 loaded texture]
  • @Rhys 如果你想控制线程的名字,你必须使用threading.Thread 而不是concurrent.futuretes.ThreadPoolExecutor。如果您查看ThreadPoolExecutor 源代码,您会很快意识到它是threading.Thread 的包装器:github.com/python/cpython/blob/3.9/Lib/concurrent/futures/… 如果您真的需要更多控制,例如控制线程名称,选择threading 模块。如果您愿意,我可以为您提供一些示例。
  • 感谢 Artiom ...如果您确实提供了 threading.Thread 示例,我会将其标记为正确...否则我确实有可用示例...我曾在 threading.Thread 工作过。 .
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-04-01
  • 2021-11-20
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多