【问题标题】:Apache Camel Splitter EIP group byApache Camel Splitter EIP group by
【发布时间】:2018-05-18 22:41:28
【问题描述】:

我有一个 CSV 文件的任务。我需要限制 CSV 的大小,因为后端引擎对有效负载大小有限制。

问题是提取标题,第一条记录/行,将其保存并添加回剩余的拆分数据,从而创建多个文件都具有相同的标题。我希望找到一种优雅的方式来处理这个问题。我有什么作品,但它很好,不如理想的编码。

另外,我需要 group By parm 是可编程的,我现在正试图找出这是否可以由 camelContext 中的属性设置。

这就是我所拥有的,它可以工作,但是......我无法让 groupBy 接受参数。

我的路线

<!--  route on Weekends -->
    <route id="inRouteWkEndBfmt1" routePolicyRef="startPolicyWkEnd" autoStartup="false" >
    <from id="mainProcessingRouteWkEnd" ref="AsciiGatewayBackfillmt1" />
    <convertBodyTo type="java.lang.String" />
    <log message="File ${file:name} was received."/>
    <setHeader headerName="messageDateTime">
      <simple>${date:now:MM-dd-yyyy-HH:mm:ss}</simple>
    </setHeader>
    <split streaming="true" >
    <tokenize token="\n" group="50"/>
        <log message="Split line Body: ${body}"/>
        <process ref="asciiSplitterProcessor" />
        <log loggingLevel="INFO" message="Successfully sent ${file:name} to MT1 Core for Analytics Observation." />
        <to id="windowsShareTargetWkEnd" uri="file://{{target.folder}}" />   
    </split>
    <process ref="asciiCleanUp" />
 </route>

代码

public void process(Exchange exchange) throws Exception {

    log.info("Ascii Splitter Processor :: start");

    String inBody = exchange.getIn().getBody(String.class);

    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    String fileSuffix = fileName.substring(fileName.lastIndexOf("."), fileName.length());
    String filePrefix = fileName.substring(0, fileName.lastIndexOf("."));
    fileName = filePrefix + "_" + cntr + fileSuffix;
    exchange.getIn().setHeader("CamelFileName",fileName);
    cntr++;
    fileName = (String) exchange.getIn().getHeader("CamelFileName");
    log.info("File being processed: " + fileName );
    log.debug("Message record: " + inBody);
    StringBuilder sb = new StringBuilder();
    Scanner sc = new Scanner(inBody);
    if ( ! hdrFlag ) {
        while ( sc.hasNextLine() ) {
            record = sc.nextLine();
            log.debug("record: " + record);
            log.debug("HEADER FLAG: " + hdrFlag);
            if ( !hdrFlag ){
                    HEADER = record + "\r\n";
                    hdrFlag = true;
                    log.debug("HEADER: " + HEADER);
            }
            sb.append(record).append("\r\n");
        }
      } else {
                sb.append(HEADER).append(inBody); 
            }
    sc.close();
    exchange.getIn().setBody(sb.toString());
    sb = new StringBuilder();

【问题讨论】:

    标签: apache-camel splitter


    【解决方案1】:

    我认为这比上面的要优雅一点。不幸的是,我不在骆驼 2.9 上。但这适用于需要在服务器端加入大型 CSV 有效负载的子工作单元,我正在转换为 Json 并发送到服务器。

    谢谢大家。希望这有助于其他人的用例。

        public void process(Exchange exchange) throws Exception {
    
        log.info("Entering Extract Header Processor ...");
    
            //if file is split in to multiple files modify the name with an index 
            String fileName = (String) exchange.getIn().getHeader("CamelFileName");
            String fileSuffix = fileName.substring(fileName.lastIndexOf("."), fileName.length());
            String filePrefix = fileName.substring(0, fileName.lastIndexOf("."));
            fileName = filePrefix + "_" + fileCounter + fileSuffix;
            fileCounter++;
            //fileName = filePrefix + "_" + Integer.valueOf((int)exchange.getProperty("CamelSplitSize")) + fileSuffix;  // need camel 2.9 for this to work, bummer
            exchange.getIn().setHeader("CamelFileName",fileName);
            log.info(" FILE NAME: " + exchange.getIn().getHeader("CamelFileName", fileName));
            //log.info("File Counter: " + Integer.valueOf((int)exchange.getProperty("CamelSplitSize"))); // need camel 2.9 for this to work, bummer
            log.info("File Counter: " + fileCounter ); 
    
            //if this is the first split body, get the header to attach to the other split bodies
            String body = exchange.getIn().getBody(String.class);       
            StringBuilder sb = new StringBuilder();
    
            if ( (Integer.valueOf((int)exchange.getProperty("CamelSplitIndex")) == 0 ) ) {
                List<String> serviceRecords = new ArrayList<String>(Arrays.asList(body.split(System.lineSeparator())));
                StringBuilder header = getHeader( serviceRecords );
                HEADER = header.toString();
                exchange.getIn().setBody(body);
            } else {
                sb.append(HEADER).append(System.lineSeparator()).append(body);
                exchange.getIn().setBody(sb.toString());
            }
            sb = new StringBuilder();
            log.debug("HEADER: " + HEADER);
            log.info("Exiting Extract Header Processor ... :: Finish");
      }
    
        public StringBuilder getHeader(List<String> serviceRecords) {
            StringBuilder sb = new StringBuilder();
            for ( int i = 0; i < 1; i++ ) {
                log.debug("record: : " + serviceRecords.get(i).toString());
                if ( i == 0 ) {
                    String[] sa = serviceRecords.get(i).toString().split(",");
                    for ( int j = 0; j < sa.length; ++j) {
                        if ( j != 0 ) {
                            sb.append(sa[j]).append(",");
                        }
                    }
                    sb.deleteCharAt(sb.lastIndexOf(",", sb.length())); 
                } else {
                    break;
                }
            }
            return sb;
        }
    
        public void cleanHeader() {
            HEADER = "";
            fileCounter = 0;
        }
    
    }
    

    路线

    <route 
        id="core.accept.file.type.route"
        autoStartup="true" >
        <from uri="{{fileEntranceEndpoint}}" />
        <choice>
            <when>
                <simple>${header.CamelFileName} regex '^.*\.(csv|CSV)$'</simple>
                <log message="${file:name} accepted for processing..." />
                <choice>
                  <when>
                    <simple>${header.CamelFileName} regex '^.*\.(CSV)$'</simple>
                    <setHeader headerName="CamelFileName">
                        <simple>${file:name.noext}.csv</simple>
                    </setHeader>                    
                  </when>
                </choice>
    
                <split streaming="true" >
                <tokenize token="\n" group="600" />
                <log message="Split Group Body: ${body}"/>
                    <to uri="bean:extractHeader" />
                    <to id="acceptedFileType" ref="pConsumer" /> 
                </split>
                <to uri="bean:extractHeader?method=cleanHeader"/>
                <!-- <to id="acceptedFileType" ref="pConsumer" />  -->
            </when>
            <otherwise>  
                <log message="${file:name} is an unknown file type, sending to unhandled repo." loggingLevel="INFO" />
                <to uri="{{unhandledArchive}}" />
            </otherwise>
        </choice>
    </route>
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-04-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-10-21
      相关资源
      最近更新 更多