【问题标题】:JPA/EclipseLink Concurrency issue with MySQLMySQL 的 JPA/EclipseLink 并发问题
【发布时间】:2014-07-11 07:31:32
【问题描述】:

我的应用程序使用 JPA/EclipseLink 和 MySQL 作为数据库时遇到了并发问题。我在这里需要一个建议。

我有一个在文件中生成字符串块的客户端。每个块可能包含几行到 100 或 200 行。生成数据后,客户端立即开始将一组块发送到服务器,服务器对每个块进行相应的解析并存储在数据库中。服务器使用 JPA/EclipseLink 与 MySQL 通信。

当客户端以同步方式(通过 REST)发送一组块时,就解析数据并将它们存储在数据库中而言,一切都按预期工作。但是,当客户端并行发送数据(在我的情况下为 15 个并发线程)时,一些数据(几行)只是在第一次存储在数据库中时被跳过。当再次并行发送相同的数据(第二次、第三次等)时,它会按预期工作(存储期间不会跳过任何行)。

在并行发送的情况下仅出于测试目的,我将所有行存储在并发映射中。我没有在地图中看到任何跳过的行,所以这纯粹是 JPA 和/或 MySQL 问题。我尝试在 JPA 中使用乐观/悲观锁,但它们没有帮助。

你能告诉我,如果有人遇到这种情况怎么办?

这是我的三个相关实体类:

@Entity
@Table(name = "PROJECT")
@NamedQueries({
        @NamedQuery(name = "Project.findByKey", query = "SELECT p FROM Project p WHERE p.key = :P_KEY",
                hints = {
                        @QueryHint(name = QueryHints.QUERY_RESULTS_CACHE, value = HintValues.TRUE),
                        @QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_SIZE, value = "1000"),
                        @QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_IGNORE_NULL, value = HintValues.TRUE),
                        @QueryHint(name = QueryHints.BIND_PARAMETERS, value = HintValues.TRUE)
                }
        )
})
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType
@CascadeOnDelete
@OptimisticLocking
public class Project implements Serializable {

    @Id
    @TableGenerator(name = "p-table-gen", table = "SEQ", pkColumnName = "SEQ_NAME", valueColumnName = "SEQ_COUNT", pkColumnValue = "PROJECT_SEQ", allocationSize = 50)
    @GeneratedValue(strategy = GenerationType.TABLE, generator = "p-table-gen")
    private Long id;

    @Column(name = "DURATION")
    private Long duration;

    @Column(name = "PROGRAM")
    private String program;

    @Embedded
    @AttributeOverrides(
        {
            @AttributeOverride(name = "parentHash", column = @Column(name = "PARENT_CMD_HASH")),
            @AttributeOverride(name = "hash", column = @Column(name = "CMD_HASH")),
            @AttributeOverride(name = "pathHash", column = @Column(name = "PATH_HASH"))
        }
    )
    ProjectKey key = new ProjectKey();

    @BatchFetch(BatchFetchType.EXISTS)
    @OneToMany(mappedBy="project", cascade = CascadeType.ALL, fetch = FetchType.LAZY, orphanRemoval = true )
    @XmlTransient
    @CascadeOnDelete
    private List<Task> tasks = new Vector<Task>();

    @BatchFetch(BatchFetchType.EXISTS)
    @OneToOne(mappedBy="project", cascade = CascadeType.ALL, fetch = FetchType.LAZY, orphanRemoval = true )
    @XmlTransient
    @CascadeOnDelete
    private Environment environment;

    @Version
    @Column(name = "VERSION")
    private Long version;
}



@Entity
@Table(name = "TASK")
@NamedQueries({
        @NamedQuery(name = "Task.findTaskByBuildAndPathName",
                query = "SELECT t FROM Task t WHERE t.operation = 'W' AND t.key.pathName = :PATH_NAME ORDER BY t.startTime",
                hints = {
                        @QueryHint(name = QueryHints.BIND_PARAMETERS, value = HintValues.TRUE)
                }
        )
})
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType
@CascadeOnDelete
@OptimisticLocking
public class Task implements Serializable, Constants {
    private static final long serialVersionUID = 1L;

    @Id
    @TableGenerator(name = "t-table-gen", table = "SEQ", pkColumnName = "SEQ_NAME", valueColumnName = "SEQ_COUNT", pkColumnValue = "TASK_SEQ", allocationSize = 200)
    @GeneratedValue(strategy = GenerationType.TABLE, generator = "t-table-gen")
    private Long id;

    @Column(name = "HASH")
    private String hash;

    @Column(name = "OPERATION")
    private String operation;

    @Column(name = "PROCESS_ID")
    private BigInteger processId;

    @Embedded
    @AttributeOverride(name = "nanoseconds", column = @Column(name = "START_TIME"))
    private Moment startTime;

    @Column(name = "THREAD_ID")
    private BigInteger threadId;

    @Column(name = "THROUGHPUT")
    private Float throughput;

    @Column(name = "PROCESSED_BYTES")
    private Float processedBytes;

    @BatchFetch(BatchFetchType.JOIN)
    @ManyToOne(cascade = { CascadeType.MERGE, CascadeType.REMOVE}, fetch=FetchType.EAGER)
    @JoinColumn(name = "PATH_STATE_ID", insertable = false, updatable = true)
    @CascadeOnDelete
    private TaskKey key;

    @ManyToOne
    @JoinColumn(name = "PROJECT_ID")
    @XmlTransient
    @CascadeOnDelete
    private Project project;

    @Version
    @Column(name = "VERSION")
    private Long version;
}



@Entity
@Index(name="IDX_TS_SIZE_TIME_INDEX", columnNames={"SIZE","TIME"})
@Table(name = "TASK_STATE")
@NamedQueries({
        @NamedQuery(name = "TaskState.findByKey", query = "SELECT ts FROM TaskState ts WHERE ts.key = :TS_KEY",
                hints = {
                        @QueryHint(name = QueryHints.QUERY_RESULTS_CACHE, value = HintValues.TRUE),
                        @QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_SIZE, value = "2000"),
                        @QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_IGNORE_NULL, value="true"),
                        @QueryHint(name = QueryHints.BIND_PARAMETERS, value = HintValues.TRUE)
                }
        )
})
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType
public class TaskState implements Serializable {

    @Id
    @TableGenerator(name = "ts-table-gen", table = "SEQ", pkColumnName = "SEQ_NAME", valueColumnName = "SEQ_COUNT", pkColumnValue = "TASK_STATE_SEQ", allocationSize = 50)
    @GeneratedValue(strategy = GenerationType.TABLE, generator = "ts-table-gen")
    private Long id;

    @Lob
    @Basic(fetch = FetchType.LAZY)
    private byte[] artifact;

    @Column(name = "MODE")
    private String mode;

    @Embedded
    @AttributeOverrides({
            @AttributeOverride(name = "nanoseconds", column = @Column(name = "TIME")),
            @AttributeOverride(name = "size", column = @Column(name = "SIZE")),
            @AttributeOverride(name = "pathName", column = @Column(name = "NAME")),
            @AttributeOverride(name = "fileHash", column = @Column(name = "HASH"))
    })
    TaskStateKey key = new TaskStateKey();

    @BatchFetch(BatchFetchType.EXISTS)
    @OneToMany(mappedBy = "taskState", orphanRemoval = true, cascade = { CascadeType.REMOVE})
    @XmlTransient
    @CascadeOnDelete
    private List<Task> tasks = new Vector<Task>();
}

更新
现在我想我找到了根本问题,但我还没有找到解决方案。 最初,我在数据库上创建一个项目,然后尝试查找相关的任务状态。最初没有任务状态,所以我一个一个地创建。所以这里是关于 taskState 的操作的结果:

1.尝试通过key查找taskState,如果有则返回,否则返回

2.持久化/插入新的taskState(当有多个线程尝试进行插入操作时,我收到重复条目错误,因此我只是重试插入操作直到成功。)并返回新创建的pathState。
每当我在 pathState 的当前插入操作中收到重复的条目错误时,我都会看到跳过的行,即使在第二次或第三次我成功持久化 pathState 时也是如此。我可以在数据库上启用 SERIALIZABLE 隔离级别,但这将是低效的,我无法承受,而且我怀疑 SERIALIZABLE 隔离能否解决我的问题。

【问题讨论】:

  • 如果您向我们展示了您的实体是什么样子,将会有所帮助。你有@Version 字段吗?
  • 你能定义什么是“跳过”吗?什么是一个块/行?你是如何发送它的,你的日志对跳过的数据显示了什么?数据是否重复或重复实体?
  • 正如我所说,客户端正在向服务器发送一组块,每个块包含几行字符串。在将它们保存到数据库期间,某些行没有保存在某些块中。将每一行视为一个可能包含几百个字符的长字符串。

标签: java mysql jpa concurrency eclipselink


【解决方案1】:

将TaskState的ID生成策略从TABLE改为IDENTITY;这将允许 MySQL 自行设置插入行的 ID。

【讨论】:

  • 不,它没有帮助。实际上我在表中使用了 AUTO_INCREMENT。
  • @GeneratedValue(strategy = GenerationType.TABLE, generator = "p-table-gen") 导致 JPA 为 id 属性分配一个值,从而跳过 MySQL 的 AUTO_INCREMENT 行为。
【解决方案2】:

我找到了解决上述问题的方法。我对 TASK_STATE ORM 类做了些微改动。具体来说,我从课堂上删除了任务列表,它开始按预期工作。似乎这种关系导致在高度并发的线程中跳过几行。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-09-15
    • 1970-01-01
    • 2011-11-15
    • 2014-01-22
    • 2016-08-21
    • 1970-01-01
    • 2011-05-08
    • 2013-01-29
    相关资源
    最近更新 更多