【发布时间】:2020-02-15 20:28:52
【问题描述】:
问题陈述
我的目标是:
- 读取 CSV 文件行 --> 转换为 DTO 对象(此步骤与我的问题无关,为了完整起见包含在内)
- 收集“n”个 DTO 对象 --> 调用 save()/update() 服务方法。 (在相同的事务环境下)
预期:当一次 save()/update() 调用失败时 --> 回滚整个批次。
实际情况:当一次 save()/update() 调用失败时 --> Spring 提交之前调用的记录。
实施
下面是我的代码实现。 (为了简洁,省略了辅助方法和不相关的属性)
进口商类:
根据操作类型对 List(Patient) 进行分组并调用 save()/update()。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import com.github.swapnil.hims.bo.importer.BulkImporter;
import com.github.swapnil.hims.bo.importer.Importer;
import com.github.swapnil.hims.dto.PatientDetail;
import com.github.swapnil.hims.service.PatientService;
@Component
public class PatientImporter extends Importer implements BulkImporter {
@Autowired
private PatientService patientSvc;
private final TransactionTemplate txTmpl;
public PatientImporter(PlatformTransactionManager transactionManager) {
this.txTmpl = new TransactionTemplate(transactionManager);
}
@Override
public PatientDetail rowToObject(CSVRecord record) {
// Convert row to DTO object.
return patientDetail;
}
@Override
public void saveToDb(List<Object> patients) {
List<PatientDetail> patientList = patients.stream().map(patient -> (PatientDetail) patient)
.collect(Collectors.toList());
saveAll(patientList, this.importType, BATCH_SIZE);
}
private void saveAll(List<PatientDetail> patients, Importer.Type action, Integer batchSize) {
txTmpl.execute(new TransactionCallbackWithoutResult() {
protected void doInTransactionWithoutResult(TransactionStatus status) {
createPatients(patients, action, batchSize);
}
});
}
private void createPatients(List<PatientDetail> patients, Importer.Type action, Integer batchSize) {
int startAt = 0;
while (true) {
List<PatientDetail> patientBatch = patients.stream().skip(startAt).limit(batchSize)
.collect(Collectors.toList());
for (PatientDetail patient : patientBatch) {
if (action == Importer.Type.CREATE) {
patientSvc.save(patient);
} else {
patientSvc.update(patient);
}
}
if (patientBatch.size() != batchSize) {
break;
}
startAt += batchSize;
}
}
}
PatientService 类:
患者的存储库/服务类。
import javax.transaction.Transactional;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.github.swapnil.hims.dao.PatientDao;
import com.github.swapnil.hims.dto.PatientDetail;
import com.github.swapnil.hims.entities.Patient;
import com.github.swapnil.hims.exception.PatientException;
import com.github.swapnil.hims.service.PatientService;
@Service
public class PatientServiceImpl implements PatientService {
private final static Log logger = LogFactory.getLog(PatientServiceImpl.class);
@Autowired
private PatientDao patientDao;
@Override
@Transactional
public void save(PatientDetail patientDetail) {
patientDao.save(getPatient(patientDetail));
}
@Override
@Transactional
public void update(Long id, PatientDetail patientDetail) {
patientDao.save(getPatient(id, patientDetail));
}
@Override
public void update(PatientDetail patientDetail) {
update(patientDetail.getIdentifier(), patientDetail);
}
private Patient getPatient(PatientDetail patientDetail) {
if (StringUtils.isEmpty(patientDetail.getPatientId())) {
throw new PatientException("Patient ID is required!");
}
if (StringUtils.isEmpty(patientDetail.getName())) {
throw new PatientException("Patient name is required!");
}
if (StringUtils.isEmpty(patientDetail.getEmail())) {
throw new PatientException("Patient e-mail is required!");
}
Patient dbPatient = patientDao.findByPatientId(patientDetail.getPatientId());
if (dbPatient != null) {
throw new PatientException(String.format("Patient with the same pid '%s' already exists!", dbPatient.getPatientId()));
}
return patientDetail.toPatient();
}
private Patient getPatient(Long id, PatientDetail patientDetail) {
Patient patient = patientDao.findById(id);
if (patient == null) {
throw new PatientException("Patient with id " + id + " does not exists.");
}
patient.setName(StringUtils.isEmpty(patientDetail.getName()) ? patient.getName() : patientDetail.getName());
patient.setCity(StringUtils.isEmpty(patientDetail.getCity()) ? patient.getCity() : patientDetail.getCity());
patient.setAddress(StringUtils.isEmpty(patientDetail.getAddress()) ? patient.getAddress() : patientDetail.getAddress());
patient.setEmail(StringUtils.isEmpty(patientDetail.getEmail()) ? patient.getEmail() : patientDetail.getEmail());
patient.setContactNumber(patientDetail.getContactNumber() != null ? patient.getContactNumber() : patientDetail.getContactNumber());
patient.setPatientId(StringUtils.isEmpty(patientDetail.getPatientId()) ? patient.getPatientId() : patientDetail.getPatientId());
return patient;
}
}
PatientDao.java
import org.springframework.data.jpa.repository.JpaRepository;
import com.github.swapnil.hims.entities.Patient;
public interface PatientDao extends JpaRepository<Patient, String> {
public Patient findById(Long id);
public void deleteById(Long id);
public Patient findByPatientId(String patientId);
}
我试过了:
try {createPatients(patients, action, batchSize);} catch() {status.setRollbackOnly()}- 将 PropagationBehaviour 设置为 PROPAGATION_REQUIRED。
我无法理解我在这里缺少什么,我已经投入了大量时间来解决这个问题,并且非常非常感谢任何帮助。我正在使用 MySQL v8.0.16。
(GitHub 仓库URL)
编辑
在调试时,我发现Patient dbPatient = patientDao.findByPatientId(patientDetail.getPatientId()); 调用会导致休眠刷新会话并将之前的对象提交到数据库。我不知道如何防止这种情况。我尝试了一些方法,但没有成功,我会继续寻找和更新。
【问题讨论】:
-
如果
save()/update()抛出异常,是否记录了任何异常? -
是的,抛出并记录了运行时异常。
Error while bulk-importing patients... com.github.swapnil.hims.exception.PatientException: Patient with the same pid 'PID_HINIC_1' already exists! at com.github.swapnil.hims.service.impl.PatientServiceImpl.getPatient(PatientServiceImpl.java:85) at com.github.swapnil.hims.service.impl.PatientServiceImpl.save(PatientServiceImpl.java:43) at com.github.swapnil.hims.service.impl.PatientServiceImpl$$FastClassBySpringCGLIB$$c348e5ac.invoke(<generated>) at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) -
调用导致休眠刷新会话并将前一个对象提交到数据库。刷新和提交不是一回事。在刷新时,SQL 被发送到数据库,但是在事务完成之前不会提交。
-
为什么要将手动事务与 Spring 的事务注释混合在一起?使用
@Transactional注释saveToDB 并删除txTmpl.execute(...),如下面的答案所示。 -
是的,理想情况下它应该已刷新且未提交。但是,从日志消息中,它说它首先刷新然后插入对象,然后调用
findByPatientId()。请查看日志messages。
标签: java spring hibernate spring-mvc transactions