【问题标题】:Problem with entity manager in Spring WebfluxSpring Webflux 中的实体管理器问题
【发布时间】:2020-11-10 23:11:19
【问题描述】:

在我开始使用 Spring WebFlux 的 Web 客户端中,出现以下异常:

reactor.core.Exceptions$ErrorCallbackNotImplemented: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process 'persist' call
Caused by: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process 'persist' call

代码:

@Service
@Transactional
@Slf4j
public class PersonSyncServiceImpl
        extends HotelApiCommunicationService implements PersonSyncService {

    private static final String REST_ENDPOINT_PATH = "/api/sync/person";

    @PersistenceContext
    private EntityManager em;
    @Autowired
    private SynchronizationService syncMgr;


    @Override
    public int initialSyncPersonData(ObiektDTO obiekt) {

        WebClient client = WebClient.create("http://" + obiekt.getAdresIp());
        AtomicInteger count = new AtomicInteger(0);

        Flux<Person> personFlux = client.get()
                .uri(REST_ENDPOINT_PATH)
                .retrieve()
                .bodyToFlux(Person.class);

        personFlux.subscribe(person -> {
            CNPerson cnPerson = syncMgr.convertObject(person, CNPerson.class);
            cnPerson.setObiektLoid(obiekt.getLoid());
            em.persist(cnPerson);
            count.getAndIncrement();
        });

        return count.get();
    }
}

我知道问题出在这条线上,因为 reactor 无法获取实体管理器。

em.persist(cnPerson);

服务器上的控制器方法:

    @GetMapping(value = "/person", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Person> getPersonList() {

        return Flux.fromStream(
                personDao.findAll().stream());
    }

如何解决?如何使用反应器访问事务并使用 JPA 将记录保存到数据库?

【问题讨论】:

  • 您需要调用外部服务上的方法来进行保存,请注意,如果您有 100 个人,那么您也有 100 个人交易!。
  • @M.Deinum 是否可以在一笔交易中完成?
  • 不,除非您使用 Spring 5.2 并使用整个反应式堆栈,这不适用于 JPA,因为它使用的是阻塞的 JDBC。因此,如果没有完全反应式堆栈是不可能的。
  • @M.Deinum: 有没有其他方法可以在客户端获取流并将每条记录在一个事务中保存到数据库中而不使用 Webflux?
  • 您可以使用 toIterable() 方法,它返回您的人员的 Iterable,但该方法是阻塞的,但是我可以看到您的方法 initialSyncPersonData 返回不正确的计数,因为订阅是异步工作的。

标签: java spring spring-boot spring-webflux entitymanager


【解决方案1】:

首先,Spring Data R2DBC 不适用于 JPA/Hibernate 和阻塞 JDBC 驱动程序。因此,要么,您需要摆脱 webflux 或 JPA。

  1. 使用 JPA

如果假设您决定继续使用 JPA,那么您可以使用 Spring RestTemplate 或 WebClient Blocking 调用来调用 Person Sync API,如下所示:

   List<Person> persons = client.get()
                .uri(REST_ENDPOINT_PATH)
                .retrieve()
                .bodyToFlux(Person.class)
                .collectList()
                .block();

像你正在做的那样改变它们;

List<CNPerson> cnPersons = new ArrayList();
persons.forEach(person -> {
  CNPerson cnPerson = syncMgr.convertObject(person, CNPerson.class);
  cnPerson.setObiektLoid(obiekt.getLoid());
  cnPersons.add(cnPerson);
});

然后使用您的org.springframework.data.jpa.repository.JpaRepository.saveAll() 一次性保存它们[为此,您需要为CNPerson 实体创建一个存储库。]

或者手动管理事务并将每个CNPerson保存在其中。

最后,回答您的问题还有其他方法可以在客户端获取流吗? - 如果您想在一个事务中完成,答案是 。因为您必须将 Person Sync API 的整个响应加载到内存中。

  1. 没有 JPA(使用 Spring Webflux 和 R2DBC)

您仍然可以使用 Spring Data R2DBC 进行事务处理。你可以阅读更多关于它的信息here。请注意,对于 R2DBC,您必须为所使用的任何类型的数据库使用非阻塞驱动程序。给出了一个列表here

【讨论】:

    【解决方案2】:

    如果您在反应式上下文中使用 JPA,并且不需要在事务中执行所有持久性操作,则可以尝试为每个操作手动创建一个新事务。

    1 - 注入一个 PlatformTransactionManager

    @Autowired
    private PlatformTransactionManager transactionManager;
    

    2 - 创建一个新的 TransactionTemplate(这可以在构造函数中完成)

    this.transactionTemplate = new TransactionTemplate(transactionManager);
    

    3 - 在对 transactionTemplate.execute 的调用中执行持久调用:

    personFlux.subscribe(person -> 
      transactionTemplate.execute(transactionStatus -> {
                CNPerson cnPerson = syncMgr.convertObject(person, CNPerson.class);
                cnPerson.setObiektLoid(obiekt.getLoid());
                em.persist(cnPerson);
                count.getAndIncrement();
    });
    

    请注意,这将为每次调用 em.persist 创建一个新事务。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2012-12-20
      • 1970-01-01
      • 2015-01-24
      • 2012-10-01
      • 2020-02-09
      • 1970-01-01
      • 2015-06-22
      相关资源
      最近更新 更多