【发布时间】:2018-03-03 16:32:21
【问题描述】:
有一个喷口,每次滴答都会进入 Postgre 数据库并读取额外的一行。 spout 代码如下所示:
class RawDataLevelSpout extends BaseRichSpout implements Serializable {
private int counter;
SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("col1", "col2"));
}
@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
collector = spoutOutputCollector;
}
private Connection initializeDatabaseConnection() {
try {
Class.forName("org.postgresql.Driver");
Connection connection = null;
connection = DriverManager.getConnection(
DATABASE_URI,"root", "root");
return connection;
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
@Override
public void close() {
}
@Override
public void nextTuple() {
List<String> values = new ArrayList<>();
PreparedStatement statement = null;
try {
Connection connection = initializeDatabaseConnection();
statement = connection.prepareStatement("SELECT * FROM table1 ORDER BY col1 LIMIT 1 OFFSET ?");
statement.setInt(1, counter++);
ResultSet resultSet = statement.executeQuery();
resultSet.next();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int totalColumns = resultSetMetaData.getColumnCount();
for (int i = 1; i <= totalColumns; i++) {
String value = resultSet.getString(i);
values.add(value);
}
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
collector.emit(new Values(values.stream().toArray(String[]::new)));
}
}
在 Apache Storm 中如何处理 Spouts 中的连接池的标准方法是什么?此外,是否有可能以某种方式在集群拓扑内的多个运行实例之间同步 coutner 变量?
【问题讨论】:
标签: database connection-pooling apache-storm