【问题标题】:java multithreading and sqlitejava多线程和sqlite
【发布时间】:2013-12-06 09:42:41
【问题描述】:

我有大量数据存储在 sqlite 数据库中。我正在使用java(jdbc驱动程序)批量从sqlite表中检索数据,然后处理数据。最后,处理后的数据被重写为表(数据库)中的新列。由于数据的处理相当简单,我尝试在 java 中使用多线程来加快计算速度。

我遵循的步骤是:

  1. 产生子线程
  2. 然后每个孩子从 sqlite db 读取数据并处理数据
  3. 数据处理完成后,将使用同步函数(插入和提交)将其重写到数据库中。

我发现处理速度(计算)没有任何改进。事实上,随着线程数量的增加,速度会降低。

没有多线程:

1000 条记录 ~ 2 分钟

2 个线程:1000 条记录 ~ 2 分钟:3 秒

4 个线程:1000 条记录 ~ 2 分钟:30 秒

10 个线程:1000 条记录 ~ 2 分钟:52 秒

我正在使用 Mac book pro:Mountain Lion; 2.4 GHz Intel core 2 Duo (4GB 1067 MHz DDR3)。

代码如下:

package org.openscience.jch.diversity;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openscience.cdk.DefaultChemObjectBuilder;
import org.openscience.cdk.fingerprint.MACCSFingerprinter;
import org.openscience.cdk.interfaces.IAtomContainer;
import org.openscience.jch.utilities.IteratingMolTableReader;

/**
 *
 * @author chandu
 */
public class MultiThreadCalculator {
    // Main Class
    public static void main(String args[]) throws SQLException {
        int range = 0;
        int start = 0;
        int stop = 0;
        int a = 0;
        int numberOfThreads = 4;
        int count = 10000;
        Connection connection = connectDb("Zinc.db");
        connection.setAutoCommit(false);
        range = (int) Math.ceil(count /(double)(numberOfThreads));

        // generate the child threads and assigns them the range of rows to read from the db

        for (int i = 1; i <= numberOfThreads; i++) {
            stop = range * i;
            System.out.println(start + "," + stop);
            new NewThread(start, stop, i,connection);
            start = stop + 1;
        }
        System.out.println("Main thread exiting." + a);
    }

    // method to connect to db
    private static Connection connectDb(String path) {
         Connection c = null;
        try {
            Class.forName("org.sqlite.JDBC");
            c = DriverManager.getConnection("jdbc:sqlite:" + path);
        } catch (Exception e) {
            System.err.println(e.getClass().getName() + ": " + e.getMessage());
            System.exit(0);
        }
        System.out.println("Opened database successfully");
        return c;
    }

    // Child thread
    public static class NewThread implements Runnable {
        Thread t;
        int ii;
        int tStart = 0;
        int tStop = 0;
        static int ince = 0;
        int a = 0;
        Connection connection = null;

        NewThread(int start, int stop, int threadID, Connection c) {
            tStart = start;
            tStop = stop;
            ii = threadID;
            System.out.println("child thread"+ii);
            t = new Thread(this, "Demo Thread");
            connection = c;
            t.setPriority( Thread.NORM_PRIORITY + 1 ); 
            t.start(); 
        }

        // This is the data processing part
        public void run() {
            Map< Integer, byte[]> map = new HashMap< Integer, byte[]>();

            try (Statement stmt = connection.createStatement();
                    ResultSet rs = stmt.executeQuery("SELECT * FROM MOLDATA WHERE ID>=" + tStart + " and ID<=" + tStop + ";")) {
                //SmilesGenerator sg = new SmilesGenerator(true);
                MACCSFingerprinter mp = new MACCSFingerprinter();
                while (rs.next()) {
                    IAtomContainer molecule = null;
                    int id = rs.getInt("ID");
                    InputStream is = new ByteArrayInputStream(rs.getString("STUCTURE").getBytes());
                    IteratingMolTableReader reader = new IteratingMolTableReader(is, DefaultChemObjectBuilder.getInstance(), true);
                    while (reader.hasNext()) {
                        molecule = reader.next();
                        break;
                    }

                    byte[] bi = mp.getBitFingerprint(molecule).asBitSet().toByteArray();

                    //System.out.println(bi.length);
                    //String smiles = sg.createSMILES(molecule);
                    map.put(id, bi);
                    System.out.println(id);
                }
                stmt.close();
            } catch (Exception e) {
                System.err.println(e.getClass().getName() + ": " + e.getMessage());
                System.exit(0);
            }
            try {
                writer(connection, map);
            } catch (SQLException ex) {
                Logger.getLogger(MultiThreadCalculator.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println("Exiting child thread." + a);
        }

        // Synchronised method to insert processed data and commit changes.

        public synchronized static void writer(Connection connection, Map<Integer, byte[]> mp) throws SQLException {
            String sql = "UPDATE MOLDATA SET FP = ? WHERE ID = ?";
            PreparedStatement psUpdateRecord = connection.prepareStatement(sql);
            int[] iNoRows = null;
            for (int a : mp.keySet()) {
                byte[] bi = mp.get(a);
                psUpdateRecord.setBytes(1, bi);
                psUpdateRecord.setInt(2, a);

                psUpdateRecord.addBatch();
            }
            iNoRows = psUpdateRecord.executeBatch();
            connection.commit();
            System.out.println("Commit Done");
        }
    }
}

【问题讨论】:

  • 你做过一些分析吗?什么占用了时间;数据库活动或计算?一般来说,您应该避免在多个线程之间共享相同的Connection,并且每个线程只有一个连接。
  • 不,我还没有完成分析。我尝试每个线程一个连接,但它给出了错误:[SQLITE_BUSY] 数据库文件被锁定(数据库被锁定)。我认为 Sqlite 不允许在多个线程之间有多个连接。 stackoverflow.com/a/10707791/2995634 。我认为问题不在于数据库查询调用,而在于处理数据的多个线程..
  • 计算耗时较长(例如:为数据库中的化学结构生成 MACCS 结构键)...

标签: java multithreading sqlite jdbc


【解决方案1】:

请记住,sqlite 是一个非常小的数据库实现,针对大小和单用户/单线程使用进行了优化。您需要详细检查分析,但我期望以下行为。

  1. 每个线程读取一个数据块,同时读取其他块。大多数甚至所有数据都必须从磁盘读取,因为每个线程都读取另一个数据块。在这种情况下,sqlite 中的缓存没有帮助,因为没有数据被读取两次。线程已经在此时有效地串行运行,因为它们在访问磁盘时被序列化。
  2. 每个线程都进行一些复杂的计算。无论多么复杂,它都是在内存中完成的,而 sqlite 在磁盘上工作(读取和写入),磁盘速度要慢很多倍(1000 倍)。
  3. 最后的insert/updatecommit 完成其余的序列化:提交必须写入磁盘并且必须等到写入完成。在该步骤之后,下一个线程可以开始插入/更新其结果。

甚至可以解释更多线程的速度下降:您使用的线程越多,sqlite 必须处理的开销就越大 - 它并没有针对许多用户或线程进行优化。

这就是为什么一些专业数据库变得如此昂贵的原因。他们处理 10000 个用户,并拥有非常聪明的算法,可以让下一个内容已经在内存中读取(95% 的时间)。

但是你现在可以做得更好吗?

  • 最实用的方法是重写代码:预先从数据库中读取所有数据,然后在线程中进行处理,最后让一个线程执行所有更新/插入,只提交一次结束
  • 您可以更改数据库,这是一个相当昂贵的解决方案。好吧,对于这种应用程序,即使是 mySql 也比 sqlite 好得多。一些数据库(Oracle、Teradata 等)可以直接在数据库中运行 Java 代码,这样您就无需在处理前后传输数据(这是常见的性能瓶颈,例如在 SAS 中)

【讨论】:

    猜你喜欢
    • 2012-05-29
    • 1970-01-01
    • 2016-08-02
    • 1970-01-01
    • 2012-05-23
    • 1970-01-01
    • 1970-01-01
    • 2012-02-15
    • 1970-01-01
    相关资源
    最近更新 更多