【问题标题】:Parallel inserts into database using Java threads使用 Java 线程并行插入数据库
【发布时间】:2012-03-13 00:01:33
【问题描述】:

我有一个 Java 程序需要将大量较大的行插入 SQL Server 数据库。行数为 800k,每行大小约为 200 字节。

目前它们被分成 50 个批次,然后使用单个语句插入每个批次。 (我们已经通过 JTDS 记录确认,每个批次都使用一个 sp_exec 调用。)将批次大小调整为 25 到 250 之间似乎没有任何显着效果,50 大约是最佳值。

我已经尝试将批次分成(比如说)5 个组,并使用线程并行处理每个组。这明显更快 - 5 个线程的速度是前者的两倍多。

我的问题是关于使线程使用更加健壮。特别是,如果任何批次失败,都会抛出异常。我希望捕获该异常并将其传递给调用者,并且我希望在传递它之前 100% 确定其他线程已完成(中止或完成)。因为在程序稍后从异常中恢复时,我们不希望意外的行继续到达表中。

这是我所做的:

/** Method to insert a single batch. */
private void insertBatchPostings(Collection<Posting> postings) throws PostingUpdateException
{
    // insert the batch using a single INSERT invokation
    // throw a PostingUpdateException if anything goes wrong
}

private static final int insertionThreads = 5;

/** Method to insert a collection of batches in parallel, using the above. */
protected void insertBatchPostingsThreaded(Collection<Collection<Posting>> batches) throws PostingUpdateException
{
    ExecutorService pool = Executors.newFixedThreadPool(insertionThreads);
    Collection<Future> futures = new ArrayList<Future>(batches.size());

    for (final Collection<Posting> batch : batches) {
        Callable c = new Callable() {
            public Object call() throws PostingUpdateException {
                insertBatchPostings(batch);
                return null;
            }            
        };
        /* So we submit each batch to the pool, and keep a note of its Future so we can check it later. */
        futures.add(pool.submit(c));
    }

    /* Pool is running, indicate that no further work will be submitted to it. */
    pool.shutdown();

    /* Check all the futures for problems. */
    for (Future f : futures) {
        try {
            f.get();
        } catch (InterruptedException ex) {
            throw new PostingUpdateException("Interrupted while processing insert results: " + ex.getMessage(), ex);
        } catch (ExecutionException ex) {
            pool.shutdownNow();
            throw (PostingUpdateException) ex.getCause();
        }
    }
}

当它返回时,我想保证所有线程都处于休眠状态。

问题

(我试图澄清我到底在问什么。)

  1. 上述代码是否完全健壮,在insertBatchPostingsThreaded 返回后不会继续进行任何线程插入操作?
  2. 是否有更好、更简单的方法来使用 Java 并发特性来实现这一目标?我的代码在我看来过于复杂了(令人怀疑错过了边缘情况)。
  3. 一旦任何一个线程失败,最好的方法是什么?

我不是一个天生的 Java 程序员,所以我希望最终能得到一些不宣传这一事实的东西。 :)

【问题讨论】:

  • 啊。你可以使用泛型让你的代码更具可读性吗?
  • @Edmund 为批量插入禁用表索引可以提高速度。您必须触发索引重新计算。
  • @Louis - 我从工作程序中逐字复制它以确保它是准确的;这是一个遗留应用程序。但我试图将它翻译成现代 Java。我认为 for 循环最冒犯你,但我也翻译了集合类型。
  • @hidralisk - 这个问题只是对性能进行更大调查的一部分。目标表很大(大约 10 亿行),重建所有索引不会需要很长时间吗?它也被其他进程使用,这些进程在选择现有行时受益于索引。我们的一个想法是插入一个临时表,然后在一个语句中从该表复制到目标。您认为这会导致更有效的索引更新吗?
  • @Edmund 在插入时更新索引效率不高。如果你有一个大更新(10k+),通常最好禁用索引、插入数据、重建索引。这是数据库 101。我从来没有一个 10 亿行的表,所以你应该做一些基准测试。如果您在插入时有其他进程在读取表,那么禁用索引可能不是一个选项。在我工作的项目中,这些批量插入是在非高峰时间完成的,因此我们暂停了受影响表上的所有活动(您可以通过在单独的表中设置一个标志来做到这一点,感兴趣的进程首先检查它)。

标签: java multithreading jdbc


【解决方案1】:

Guava 的Futures.successfulAsList 将一个期货列表作为输入并返回一个未来“其值是一个包含所有成功输入期货的值的列表”。您可以在生成的Future 上调用get(),然后遍历您原来的未来列表以检查是否有任何故障。

【讨论】:

  • 我的另一个要求(我已经添加到问题中)是,如果有任何失败,可以取消或中止池中的剩余任务,以便它快速失败。番石榴中有什么可以帮助解决这个问题的吗?
  • 啊。我没有看到您希望所有其他线程都失败。不过,使用ListenableFuture... 为每个未来添加回调以取消所有其他未来并不难
  • 那么 ListenableFuture 会调用监听器,而监听器又会调用池上的 shutdownNow 吗?查看 Java 源代码,shutdownNow 似乎在努力取消所有排队的任务,所以它可能已经在我的代码中这样做了,但是如果我可以使用 Guava 中的某些东西使代码更清晰,那么我完全赞成。跨度>
  • 我在想你会在池上调用 shutdownNow。不过,我不太肯定这代表了一种进步。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-10-31
  • 1970-01-01
  • 2021-02-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多