【问题标题】:Execute beforeExecute(Thread t, Runnable r) in main thread rather than worker thread在主线程而不是工作线程中执行 beforeExecute(Thread t, Runnable r)
【发布时间】:2022-01-13 04:17:06
【问题描述】:

我想要某种方式让beforeExecute() 在每次线程执行之前在主线程上 执行:

我有以下 ThreadPoolExecutor:

public class ContextAwareThreadPool extends ThreadPoolExecutor {

    public ContextAwareThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, @NotNull TimeUnit unit, @NotNull BlockingQueue<Runnable> workQueue, @NotNull ThreadFactory threadFactory, @NotNull RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        ThreadPoolContextHolder.setDatasourceForThread(t, DBContextHolder.getCurrentDb());
    }

}

最大的问题是beforeExecute 在工作线程中被调用,而不是从主线程(请求线程)。 因此DBContextHoldermain 线程中的本地线程,在worker 线程中没有设置。

在线程实际执行之前,beforeExecute 回调是否在 main 线程中执行?

在工作线程中执行此设计决策毫无意义。 如果我想在 Map 中设置某些 ThreadLocals,然后在工作线程中检索它怎么办?

【问题讨论】:

    标签: java multithreading executorservice threadpoolexecutor


    【解决方案1】:

    做你想做的事是不切实际的。

    要在主线程上执行beforeExecute,主线程需要等待“回调对象”队列。但这意味着主线程必须什么都不做。

    解决的办法是安排你需要的信息从主线程的线程本地复制到Runnable,然后再提交给执行器服务。我见过一个例子,他们创建了一个实现 Runnable 子类的类,它执行以下操作:

    • 在构造函数中,捕获线程局部变量的状态。 (构造函数在主线程上执行。)

    • run() 方法中,将捕获的状态复制到当前(工作)线程的线程本地。

    这并不令人愉快……但它确实有效。


    设计决策在工作线程中执行是没有意义的。

    实际上,一旦您了解了实际让回调在主线程上执行所涉及的内容,它就会变得完美。 Java 语言/执行模型不允许您(神奇地)调用另一个线程的调用堆栈上的方法。

    【讨论】:

    • 信息量很大。感谢您的回复。
    • @StephenC 抱歉重复一遍,但我不清楚为什么在工作线程实际启动之前无法执行beforeExecuteworker threads的调度实际上是在main thread中完成的吧? (我的意思是 ExecutorService 在main thread 中运行)所以在main thread 启动工作线程之前,为什么它不能只执行beforeExecute 回调? 它知道要运行的任务以及要运行的线程。
    • @Slaw 是的类似的东西,更具体地说,我希望它是包含 ExecutorService 的同一个线程。
    • @ng.newbie 不,工作线程的调度不在主线程上完成。属于ThreadPoolExecutor 的线程池都在等待BlockingQueue。主线程或您实际调度任务的任何线程,只需将任务放入队列中(并可能导致线程启动,但前提是线程数低于核心池大小,或者它们低于最大池大小并且队列已满)。
    • 其实就是工作线程本身。创建工作线程后,它会立即进入执行器服务队列并尝试接受第一个任务。如果有,它会运行它。如果没有,它会阻塞......等待添加一个......由其他线程向执行程序提交新任务。这是经典的消费者/生产者模式……工人是消费者,服务的客户是生产者。
    【解决方案2】:

    如果我想在 Map 中设置某些 ThreadLocals,然后在工作线程中检索它怎么办

    你可以考虑包装runnable。然后每次在run方法前后设置和清理threadlocal

    public class ContextAwareThreadPool extends ThreadPoolExecutor {
    
        public ContextAwareThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,  RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        @Override
        public Future<?> submit(Runnable task) {
            return super.submit(new RunnableWrapper(task, ThreadPoolContextHolder.get()));
        }
        
        // other submit methods...
    
        static class RunnableWrapper implements Runnable {
    
            private final Runnable target;
            private final String dataSource;
    
            public RunnableWrapper(Runnable target, String dataSource) {
                this.dataSource = dataSource;
                this.target = target;
            }
    
            @Override
            public void run() {
                ThreadPoolContextHolder.set(dataSource);
                target.run();
                ThreadPoolContextHolder.remove();
            }
        }
    }
    
    public class ThreadPoolContextHolder {
    
        static ThreadLocal<String> threadLocal = new ThreadLocal<>();
    
        public static void set(String s) {
            threadLocal.set(s);
        }
    
        public static String get() {
            return threadLocal.get();
        }
    
        public static void remove() {
            threadLocal.remove();
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-06-19
      • 1970-01-01
      • 2018-04-27
      • 2019-09-10
      • 1970-01-01
      • 2018-10-02
      相关资源
      最近更新 更多