【问题标题】:Create a CSV file from marklogic using Java Client Api(DMSDK)使用 Java Client Api(DMSDK) 从 marklogic 创建 CSV 文件
【发布时间】:2018-02-05 15:46:55
【问题描述】:

我想为我的 marklogic db 中的 130 万条记录创建一个 csv 文件。我尝试为此使用 CORB,但它花费的时间比我预期的要多。 我的数据是这样的

{
"One": {
"Name": "One",
"Country": "US"
}, 
"Two": {
"State": "kentucky"
}, 
"Three": {
"Element1": "value1", 
"Element2": "value2", 
"Element3": "value3", 
"Element4": "value4",
so on ...
}
}

以下是我的 Corb 模块

选择器.xqy

var total = cts.uris("", null, cts.collectionQuery("data"));
fn.insertBefore(total,0,fn.count(total))

Transform.xqy(我将所有元素保存在一个数组中)

var name = fn.tokenize(URI, ";");
const node = cts.doc(name);
var a= node.xpath("/One/*");
var b= node.xpath("/Two/*");
var c= node.xpath("/Three/*");
fn.stringJoin([a, b, c,name], " , ")

我的属性文件

THREAD-COUNT=16
BATCH-SIZE=1000
URIS-MODULE=selector.sjs|ADHOC
PROCESS-MODULE=transform.sjs|ADHOC
PROCESS-TASK=com.marklogic.developer.corb.ExportBatchToFileTask
EXPORT-FILE-NAME=Report.csv
PRE-BATCH-TASK=com.marklogic.developer.corb.PreBatchUpdateFileTask
EXPORT-FILE-TOP-CONTENT=Col1,col2,....col16 -- i have 16 columns 

创建 csv 文件需要 1 个多小时。而且为了在集群中尝试,我需要先配置一个负载均衡器。而 Java Client api 将在没有任何负载均衡器的情况下在所有节点之间分配工作。

如何在 Java Client APi 中实现相同的功能,我知道我可以使用 ServerTransformApplyTransformListener 触发转换模块。

public static void main(String[] args) {
  // TODO Auto-generated method stub

  DatabaseClient client = DatabaseClientFactory.newClient
            ("localhost", pwd, "x", "x",  DatabaseClientFactory.Authentication.DIGEST);

  ServerTransform txform = new ServerTransform("tsm"); -- Here i am implementing same logic of above `tranform module` .

  QueryManager qm = client.newQueryManager();
  StructuredQueryBuilder query = qm.newStructuredQueryBuilder();
  query.collection();

  DataMovementManager dmm = client.newDataMovementManager();
  QueryBatcher batcher = dmm.newQueryBatcher(query.collections("data"));
  batcher.withBatchSize(2000)
         .withThreadCount(16)
         .withConsistentSnapshot()
         .onUrisReady(
           new ApplyTransformListener().withTransform(txform))
         .onBatchSuccess(batch-> {
                   System.out.println(
                       batch.getTimestamp().getTime() +
                       " documents written: " +
                       batch.getJobWritesSoFar());
         })
         .onBatchFailure((batch,throwable) -> {
           throwable.printStackTrace();
         });

  // start the job and feed input to the batcher
  dmm.startJob(batcher);

  batcher.awaitCompletion();
  dmm.stopJob(batcher);
  client.release();
}

但是我怎样才能在 CORB 中发送 csv 文件头(即 EXPORT-FILE-TOP-CONTENT)。是否有实施CSV 文件的文档?哪个类会实现它?

感谢任何帮助

谢谢

【问题讨论】:

  • 请注意,ApplyTransformListener 通常不用于导出内容,而是用于就地转换内容。

标签: csv marklogic java marklogic-corb


【解决方案1】:

可能最简单的选择是 ml-gradle Exporting data to CSV,它在后台使用 Java 客户端 API 和 DMSDK。

请注意,您可能希望安装服务器端 REST 转换以仅在 CSV 输出中提取所需的数据,而不是下载整个文档内容然后在 Java 端进行提取。

有关使用 DMSDK 和创建聚合 CSV(所有记录一个 CSV)所需代码的工作示例,请参阅ExporToWriterListenerTest.testMassExportToWriter。为方便起见,这里是关键代码 sn-p(进行了一些小的简化更改,包括编写列标题(未经测试的代码)):

try (FileWriter writer = new FileWriter(outputFile)) {
  writer.write("uri,collection,contents");
  writer.flush();
  ExportToWriterListener exportListener = new ExportToWriterListener(writer)
    .withRecordSuffix("\n")
    .withMetadataCategory(DocumentManager.Metadata.COLLECTIONS)
    .onGenerateOutput(
      record -> {
        String uri = record.getUri();
        String collection = record.getMetadata(new DocumentMetadataHandle()).getCollections().iterator().next();
        String contents = record.getContentAs(String.class);
        return uri + "," + collection + "," + contents;
      }
    );

  QueryBatcher queryJob =
    moveMgr.newQueryBatcher(query)
      .withThreadCount(5)
      .withBatchSize(10)
      .onUrisReady(exportListener)
      .onQueryFailure( throwable -> throwable.printStackTrace() );
  moveMgr.startJob( queryJob );
  queryJob.awaitCompletion();
  moveMgr.stopJob( queryJob );
}

但是,除非您知道自己的内容没有双引号、换行符或非 ascii 字符,否则建议使用 CSV 库来确保正确转义您的输出。要使用 CSV 库,您当然可以为您的库使用任何教程。您无需担心线程安全,因为 ExportToWriterListener 在同步块中运行您的侦听器以防止对写入器的重叠写入。这是an example of using one CSV library, Jackson CsvMapper

请注意,您不必使用 ExportToWriterListener 。 . .您可以使用它作为编写自己的侦听器的起点。特别是,由于您主要关心的是性能,您可能希望让您的侦听器每个线程写入一个文件,然后进行后处理以将它们组合在一起。这取决于你。

【讨论】:

  • 正如你所说的 Note that you'll probably want to install a server-side REST transform to extract only the data you want in the CSV output, rather than download the entire doc contents then extract on the Java side 。您的意思不是加载 document(i.e. const node = cts.doc(name)) 并提取内容(i.e.var a= node.xpath("/One/*")) 。你想试试这样var value = cts.doc(name)/One/*
猜你喜欢
  • 2020-03-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-10-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-25
  • 1970-01-01
相关资源
最近更新 更多