【问题标题】:Ingest Data in batches to RDBMS from Marklogic with DMSDK使用 DMSDK 从 Marklogic 批量摄取数据到 RDBMS
【发布时间】:2019-10-01 01:11:04
【问题描述】:

我需要使用 DMSDK 将大量数据从 MarkLogic 插入到 RDBMS

下面是我的代码示例

ArrayList<ArrayList<String>> batch = new ArrayList<ArrayList<String>>();
DatabaseClient client = DatabaseClientFactory.newClient(config.getmlHost(), config.getmlPort(), new DatabaseClientFactory.BasicAuthContext(dbConfig.getuser(), dbConfig.getpassword()));
QueryManager queryMgr = client.newQueryManager();
StructuredQueryBuilder sb = queryMgr.newStructuredQueryBuilder();
StructuredQueryDefinition criteria = sb.and(sb.collection("collection1"),sb.collection("collection2"))
DataMovementManager dmm = client.newDataMovementManager();
QueryBatcher batcher = dmm.newQueryBatcher(criteria)
        .withBatchSize(10)
        .withThreadCount(12)
        .onUrisReady(
                        new ExportListener()
                        .onDocumentReady(doc -> {
                    logger.info("URI received : " + doc.getUri());
                    try {
                        //Getting data From xml and adding it into a arraylist for batch creation
                        ArrayList<String> getDataXml = new GetDataXml().GetDatafromXml(doc.getContent(new DOMHandle()),
                                dbuilder, xPath, ColumnNames);
                        batch.add(getDataXml);

                    } catch (Exception e) {
                        logger.error("Error in the Code", e);
                    }
                })).onQueryFailure(exception -> {
                    logger.error(exception);
                });
        dmm.startJob(batcher);
        batcher.awaitCompletion();
        dmm.stopJob(batcher);
        Class.forName("Driver Name");

        //connecting to RDBMS
        Connection conn = DriverManager.getConnection(DB_URL, USER, PASS)
        PreparedStatement pstmt = conn.prepareStatement("INSERT INTO DBNAME VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)");

        //Creating Batches PreparedStatement.addBatch()
        for(ArrayList<String> eachObject : batch) {             
            createPreparedStatement(pstmt, eachObject).addBatch();
        }

        //
        int[] result = pstmt.executeBatch();
        logger.info("Total Records Inserted " + result.length);
        oracle.closeConnect(oracleConn);

public PreparedStatement createPreparedStatement(PreparedStatement pstmt, ArrayList<String> eachObject)
            throws SQLException {
        for (int i = 0; i < eachObject.size(); i++) {
            pstmt.setString(i + 1, eachObject.get(i));
        }
        return pstmt;
    }

此代码仅从 MarkLogic 获取数据,并且在 1 批完成后不会插入到 RDBMS 数据库中,任何人都可以指出我在代码中的错误。 提前致谢。

【问题讨论】:

标签: java marklogic marklogic-9


【解决方案1】:

考虑在开始工作之前创建一个准备好的语句,并在 onDocumentReady() 监听器中:

  1. 从文档中提取一个或多个值,
  2. 将准备好的语句上的占位符设置为值,并
  3. 执行准备好的语句。

在数组中累积所有文档的策略的缺点是数组可能会耗尽所有可用内存,如果数据库操作交错,吞吐量应该会更好。

希望对您有所帮助,

【讨论】:

  • 谢谢您的回复,您上面说的这些步骤是在RDBMS中逐个插入数据而不是批量插入数据?
  • 可以在一个准备好的语句中插入多行——参见stackoverflow.com/questions/1176352/…——但最好的方法通常是一次插入一批行,而不是尝试插入所有行一次通过的行数。
  • 正是我需要分批插入,但在 DMSDK 中没有发生
  • 我不太明白。根据您所说,DMSDK 代码正在检索数据。问题在于准备好的声明。对于一批中的每个检索到的文档,准备好的语句应提供一个括号行,如上面链接的 SO 答案中所述。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-01-15
  • 1970-01-01
  • 1970-01-01
  • 2022-07-19
  • 1970-01-01
  • 2016-10-21
  • 1970-01-01
相关资源
最近更新 更多