【问题标题】:How to copy ThreadLocal from parent thread to multiple child threads?如何将 ThreadLocal 从父线程复制到多个子线程?
【发布时间】:2019-07-23 16:33:51
【问题描述】:

我正在开发一个通过注解声明并发聚合的支持库。

但是我有一个很难解决的问题。

当项目中大量使用 ThreadLocal 时,并发聚合将不起作用,因为 ThreadLocal 的值在多线程中丢失。

举例

public class RequestContext {
    private static ThreadLocal<Long> TENANT_ID = new ThreadLocal<>();

    public static Long getTenantId() {
        return TENANT_ID.get();
    }

    public static void setTenantId(Long tenantId) {
        TENANT_ID.set(tenantId);
    }

    public static void removeTenantId() {
        TENANT_ID.remove();
    }
}
@Service
public class HomepageServiceImpl implements HomepageService {
    @DataProvider("topMenu")
    @Override
    public List<Category> topMenu() {
        /* will be null */
        Long tenantId = RequestContext.getTenantId();
        Assert.notNull(tenantId,"tenantId must be not null");
        // ... The content hereafter will be omitted.
    }

    @DataProvider("postList")
    @Override
    public List<Post> postList() {
        /* will be null */
        Long tenantId = RequestContext.getTenantId();
        Assert.notNull(tenantId,"tenantId must be not null");
        // ... The content hereafter will be omitted.
    }

    @DataProvider("allFollowers")
    @Override
    public List<User> allFollowers() {
        /* will be null */
        Long tenantId = RequestContext.getTenantId();
        Assert.notNull(tenantId,"tenantId must be not null");
        // ... The content hereafter will be omitted.
    }
}

并发聚合查询

@Test
public void testThreadLocal() throws Exception {
    try {
        RequestContext.setTenantId(10000L);
        Object result = dataBeanAggregateQueryFacade.get(null,
                new Function3<List<Category>, List<Post>, List<User>, Object>() {
            @Override
            public Object apply(
                    @DataConsumer("topMenu") List<Category> categories,
                    @DataConsumer("postList") List<Post> posts,
                    @DataConsumer("allFollowers") List<User> users) {
                return new Object[] {
                        categories,posts,users
                };
            }
        });
    } finally {
        RequestContext.removeTenantId();
    }
}

以下方法会在不同的线程中被调用。

  • topMenu()
  • postList()
  • allFollowers()

有什么问题?

问题是上面三个方法都没有得到tenantId

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at io.github.lvyahui8.spring.aggregate.service.impl.DataBeanAggregateQueryServiceImpl.get(DataBeanAggregateQueryServiceImpl.java:85)
    at io.github.lvyahui8.spring.aggregate.service.impl.DataBeanAggregateQueryServiceImpl.get(DataBeanAggregateQueryServiceImpl.java:47)
    at io.github.lvyahui8.spring.aggregate.service.impl.DataBeanAggregateQueryServiceImpl.lambda$getDependObjectMap$0(DataBeanAggregateQueryServiceImpl.java:112)
    at io.github.lvyahui8.spring.aggregate.service.impl.DataBeanAggregateQueryServiceImpl$$Lambda$197/1921553024.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: tenantId must be not null
    at org.springframework.util.Assert.notNull(Assert.java:198)
    at io.github.lvyahui8.spring.example.service.impl.HomepageServiceImpl.postList(HomepageServiceImpl.java:36)
    ... 12 more

我有几个解决方案,我想知道有没有更好的解决方案?

  • 按参数传递
  • ThreadLocal 替换为InheritableThreadLocal
  • 通过反射复制ThreadLocal

这里是所有可以执行的代码。 https://github.com/lvyahui8/spring-boot-data-aggregator/blob/master/spring-boot-data-aggregator-example/src/test/java/io/github/lvyahui8/spring/example/DataBeanAggregateQueryFacadeTest.java

【问题讨论】:

  • 我目前的情况是ThreadLocal在一个项目中被大量使用,导致并行处理请求比较困难。我无法完全删除ThreadLocal,这需要太多工作。

标签: java spring spring-boot concurrency java.util.concurrent


【解决方案1】:

您的问题是您将HomepageServiceImpl 声明为单例。但要发挥作用,它需要 tenantId 可以采用不同的值。所以实际上,您需要 HomepageServiceImpl 的不同实例,并在创建时设置字段 tenantId。然后,可以方便地简化dataBeanAggregateQueryFacade.get() 的调用:传递数据数组而不是 lambda。像这样的:

public class HomepageServiceImpl implements HomepageService {
    final Long tenantId;
    public HomepageServiceImpl(Long tenantId) {
       Assert.notNull(tenantId,"tenantId must be not null");
       this.tenantId=tenantId;
    }

    @Override
    public List<Category> topMenu() {
       /* will not be null */
       ... this.tenantId...
    }

    // declare also postList and allFollowers

    public Object[] getLists() {
       return new Object[]{topMenu(), postList(), allFollowers()};
    }
}

@Test
public void testThreadLocal() throws Exception {
    HomepageServiceImpl homepage = new HomepageServiceImpl(10000L);
    Object[] lists = homepage.getLists();
    Object result = dataBeanAggregateQueryFacade.get(null, lists);
}

【讨论】:

    【解决方案2】:

    我注意到您正在 aggregateExecutorService bean 中创建 ThreadPoolExecutor 的实例。使用 InheritableThreadLocal 对您没有帮助,因为 ThreadPoolExecutor 使用它自己的线程池。这些线程不会从调用者线程继承任何东西。 How to use MDC with thread pools? 展示了如何将 Threadlocal 值从调用者线程复制到线程池线程。 MDC 是一种 ThreadLocal 对象。 基本上你想扩展 ThreadPoolExecutor 并在 aggregateExecutorService bean 中使用它

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-17
      • 2021-01-16
      • 1970-01-01
      • 2017-11-11
      • 2011-09-13
      • 2012-11-01
      相关资源
      最近更新 更多