【问题标题】:Apache Storm - Accessing database from SPOUT - connection poolingApache Storm - 从 SPOUT 访问数据库 - 连接池
【发布时间】:2018-03-03 16:32:21
【问题描述】:

有一个喷口,每次滴答都会进入 Postgre 数据库并读取额外的一行。 spout 代码如下所示:

class RawDataLevelSpout extends BaseRichSpout implements Serializable {


private int counter;

SpoutOutputCollector collector;


@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("col1", "col2"));
}

@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
    collector = spoutOutputCollector;
}

private Connection initializeDatabaseConnection() {

    try {
        Class.forName("org.postgresql.Driver");
        Connection connection = null;
        connection = DriverManager.getConnection(
                DATABASE_URI,"root", "root");
        return connection;
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return null;
}

@Override
public void close() {

}

@Override
public void nextTuple() {
    List<String> values = new ArrayList<>();

    PreparedStatement statement = null;
    try {
        Connection connection = initializeDatabaseConnection();
        statement = connection.prepareStatement("SELECT * FROM table1 ORDER BY col1 LIMIT 1 OFFSET ?");
        statement.setInt(1, counter++);
        ResultSet resultSet = statement.executeQuery();
        resultSet.next();
        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
        int totalColumns = resultSetMetaData.getColumnCount();
        for (int i = 1; i <= totalColumns; i++) {
            String value = resultSet.getString(i);
            values.add(value);
        }


        connection.close();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    collector.emit(new Values(values.stream().toArray(String[]::new)));
}

}

在 Apache Storm 中如何处理 Spouts 中的连接池的标准方法是什么?此外,是否有可能以某种方式在集群拓扑内的多个运行实例之间同步 coutner 变量?

【问题讨论】:

    标签: database connection-pooling apache-storm


    【解决方案1】:

    关于连接池,如果需要,您可以通过静态变量来池连接,但由于不能保证所有 spout 实例都在同一个 JVM 中运行,我认为没有任何意义。

    不,没有办法同步计数器。 spout 实例可能在不同的 JVM 上运行,并且您不希望它们都阻塞,而 spout 同意计数器值是什么。不过,我认为您的 spout 实现没有意义。如果您只想一次读取一行,为什么不只运行一个 spout 实例而不是尝试同步多个 spout?

    您似乎正在尝试将关系数据库用作队列系统,这可能不合适。考虑例如取而代之的是卡夫卡。我认为您应该能够使用https://www.confluent.io/product/connectors/http://debezium.io/ 之一将数据从 Postgres 流式传输到 Kafka。

    【讨论】:

      猜你喜欢
      • 2018-08-14
      • 2019-03-14
      • 1970-01-01
      • 2020-05-01
      • 1970-01-01
      • 1970-01-01
      • 2015-04-22
      • 2018-09-19
      • 2014-07-01
      相关资源
      最近更新 更多