【问题标题】:How to use Connection Pooling with Hazelcast jet JDBC?如何在 Hazelcast jet JDBC 中使用连接池?
【发布时间】:2021-10-30 06:00:57
【问题描述】:

我正在尝试将连接池与 hazelcast jet jdbc 一起使用,但它不允许我这样做。 我有从中获取连接的数据源 bean,但它不工作。 这是我的代码:

Connection conn = ((DataSource)Ds.getBean("dataSourceName").getConnection();
BatchSource<Object> jdbcSource = Sources
                .jdbc(() -> conn,
                    (con, parallelism, index) -> {
                       // query execution
               }, r -> this.mapResultSet1(r, metaData));

但是当我执行它给我下面的错误:

java.lang.IllegalArgumentException: "newConnectionFn" must be serializable
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:203)
at com.hazelcast.jet.impl.connector.ReadJdbcP.supplier(ReadJdbcP.java:77)
at com.hazelcast.jet.core.processor.SourceProcessors.readJdbcP(SourceProcessors.java:433)
at com.hazelcast.jet.pipeline.Sources.jdbc(Sources.java:1327)
at com.aivdata.impl.JDBCDataSource.readSource(JDBCDataSource.java:70)
at com.aivdata.services.AivDataFactory.lambda$1(AivDataFactory.java:123)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
Caused by: java.io.NotSerializableException: org.apache.tomcat.dbcp.dbcp2.PoolingDataSource$PoolGuardConnectionWrapper
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:201)

如何通过 Hazelcast jet 使用连接池来实现这一点?

【问题讨论】:

    标签: java java-8 hazelcast hazelcast-jet


    【解决方案1】:

    如果我在 Source Jdbc 中建立连接但不确定它是使用池化还是创建新的,这对我有用。

    BatchSource<Object> jdbcSource = Sources
                    .jdbc(() -> {
                          Connection conn = ((DataSource)Ds.getBean("dataSourceName").getConnection();
                          return conn;
                      },
                        (con, parallelism, index) -> {
                           // query execution
                   }, r -> this.mapResultSet1(r, metaData));
    

    【讨论】:

    • 是的,如果 Ds.getBean 每次调用它时都返回相同的实例,那么这或多或少是“使连接池实例可从可序列化的 newConnectionFn 函数中获得”的意思
    • @FrantišekHartman 感谢您的确认 :)
    【解决方案2】:

    异常告诉你以下函数不可序列化:

    () -> conn
    

    相反,您应该使用类似于:

    () -> DriverManager.getConnection(dbConnectionUrl)
    

    JDBC 源使用提供的函数为每个处理器实例创建一个连接——这意味着它将在每个集群成员上创建n 连接,其中n 等于 JDBC 源的本地并行度。本地并行度的默认值为 1。您可以通过在 JDBC 源上调用 com.hazelcast.jet.pipeline.BatchStage#setLocalParallelism 来修改此值。

    如果您的作业是一次性批处理作业或长时间运行的批处理作业,那么使用连接池没有意义。

    如果您执行一个作业的许多小实例,则使用连接池可能是有意义的,其中为每个作业执行创建一个连接会导致过多的开销。在这种情况下,您需要使您的连接池实例可从可序列化的newConnectionFn 函数中使用,例如通过使其成为静态字段中的单例,延迟初始化。

    【讨论】:

    • 我有 Spring Boot 应用程序,它们可以是多个请求,因为我需要连接池我无法在每个请求上创建新连接。回答抱歉,我没有明白这一点“在这种情况下,您需要使您的连接池实例可从可序列化的 newConnectionFn 函数中使用,例如,通过使其成为静态字段中的单例,延迟初始化。”
    猜你喜欢
    • 2011-01-11
    • 2012-10-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-11-27
    • 2016-07-06
    • 2011-02-19
    • 2011-03-06
    相关资源
    最近更新 更多