【发布时间】:2013-12-06 09:42:41
【问题描述】:
我有大量数据存储在 sqlite 数据库中。我正在使用java(jdbc驱动程序)批量从sqlite表中检索数据,然后处理数据。最后,处理后的数据被重写为表(数据库)中的新列。由于数据的处理相当简单,我尝试在 java 中使用多线程来加快计算速度。
我遵循的步骤是:
- 产生子线程
- 然后每个孩子从 sqlite db 读取数据并处理数据
- 数据处理完成后,将使用同步函数(插入和提交)将其重写到数据库中。
我发现处理速度(计算)没有任何改进。事实上,随着线程数量的增加,速度会降低。
没有多线程:
1000 条记录 ~ 2 分钟
2 个线程:1000 条记录 ~ 2 分钟:3 秒
4 个线程:1000 条记录 ~ 2 分钟:30 秒
10 个线程:1000 条记录 ~ 2 分钟:52 秒
我正在使用 Mac book pro:Mountain Lion; 2.4 GHz Intel core 2 Duo (4GB 1067 MHz DDR3)。
代码如下:
package org.openscience.jch.diversity;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openscience.cdk.DefaultChemObjectBuilder;
import org.openscience.cdk.fingerprint.MACCSFingerprinter;
import org.openscience.cdk.interfaces.IAtomContainer;
import org.openscience.jch.utilities.IteratingMolTableReader;
/**
*
* @author chandu
*/
public class MultiThreadCalculator {
// Main Class
public static void main(String args[]) throws SQLException {
int range = 0;
int start = 0;
int stop = 0;
int a = 0;
int numberOfThreads = 4;
int count = 10000;
Connection connection = connectDb("Zinc.db");
connection.setAutoCommit(false);
range = (int) Math.ceil(count /(double)(numberOfThreads));
// generate the child threads and assigns them the range of rows to read from the db
for (int i = 1; i <= numberOfThreads; i++) {
stop = range * i;
System.out.println(start + "," + stop);
new NewThread(start, stop, i,connection);
start = stop + 1;
}
System.out.println("Main thread exiting." + a);
}
// method to connect to db
private static Connection connectDb(String path) {
Connection c = null;
try {
Class.forName("org.sqlite.JDBC");
c = DriverManager.getConnection("jdbc:sqlite:" + path);
} catch (Exception e) {
System.err.println(e.getClass().getName() + ": " + e.getMessage());
System.exit(0);
}
System.out.println("Opened database successfully");
return c;
}
// Child thread
public static class NewThread implements Runnable {
Thread t;
int ii;
int tStart = 0;
int tStop = 0;
static int ince = 0;
int a = 0;
Connection connection = null;
NewThread(int start, int stop, int threadID, Connection c) {
tStart = start;
tStop = stop;
ii = threadID;
System.out.println("child thread"+ii);
t = new Thread(this, "Demo Thread");
connection = c;
t.setPriority( Thread.NORM_PRIORITY + 1 );
t.start();
}
// This is the data processing part
public void run() {
Map< Integer, byte[]> map = new HashMap< Integer, byte[]>();
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM MOLDATA WHERE ID>=" + tStart + " and ID<=" + tStop + ";")) {
//SmilesGenerator sg = new SmilesGenerator(true);
MACCSFingerprinter mp = new MACCSFingerprinter();
while (rs.next()) {
IAtomContainer molecule = null;
int id = rs.getInt("ID");
InputStream is = new ByteArrayInputStream(rs.getString("STUCTURE").getBytes());
IteratingMolTableReader reader = new IteratingMolTableReader(is, DefaultChemObjectBuilder.getInstance(), true);
while (reader.hasNext()) {
molecule = reader.next();
break;
}
byte[] bi = mp.getBitFingerprint(molecule).asBitSet().toByteArray();
//System.out.println(bi.length);
//String smiles = sg.createSMILES(molecule);
map.put(id, bi);
System.out.println(id);
}
stmt.close();
} catch (Exception e) {
System.err.println(e.getClass().getName() + ": " + e.getMessage());
System.exit(0);
}
try {
writer(connection, map);
} catch (SQLException ex) {
Logger.getLogger(MultiThreadCalculator.class.getName()).log(Level.SEVERE, null, ex);
}
System.out.println("Exiting child thread." + a);
}
// Synchronised method to insert processed data and commit changes.
public synchronized static void writer(Connection connection, Map<Integer, byte[]> mp) throws SQLException {
String sql = "UPDATE MOLDATA SET FP = ? WHERE ID = ?";
PreparedStatement psUpdateRecord = connection.prepareStatement(sql);
int[] iNoRows = null;
for (int a : mp.keySet()) {
byte[] bi = mp.get(a);
psUpdateRecord.setBytes(1, bi);
psUpdateRecord.setInt(2, a);
psUpdateRecord.addBatch();
}
iNoRows = psUpdateRecord.executeBatch();
connection.commit();
System.out.println("Commit Done");
}
}
}
【问题讨论】:
-
你做过一些分析吗?什么占用了时间;数据库活动或计算?一般来说,您应该避免在多个线程之间共享相同的
Connection,并且每个线程只有一个连接。 -
不,我还没有完成分析。我尝试每个线程一个连接,但它给出了错误:[SQLITE_BUSY] 数据库文件被锁定(数据库被锁定)。我认为 Sqlite 不允许在多个线程之间有多个连接。 stackoverflow.com/a/10707791/2995634 。我认为问题不在于数据库查询调用,而在于处理数据的多个线程..
-
计算耗时较长(例如:为数据库中的化学结构生成 MACCS 结构键)...
标签: java multithreading sqlite jdbc