【问题标题】:Cache Streaming Treat large file缓存流处理大文件
【发布时间】:2016-08-01 06:21:54
【问题描述】:

我正在研究“在 Apache Camel 中使用缓存以及如何处理大文件”的主题。

目的是用骆驼处理大文件而不将文件加载到内存中,因为它是一个超过 5 GO 的大文件。

我们找到了几个轨道,第一个轨道是使用拆分器组件,让我们可以逐行或逐块读取文件,但是如果我们使用拆分器,我们将无法再次读取文件从一开始,功能需求就是即使在拆分完成时也能够读取文件的某些部分。

所以我们必须使用缓存系统,将块放入缓存中以重复使用它们。

所以我们认为必须使用 CachedOutputStream 类将分割器后的部分文件写入磁盘,该类还提供了对磁盘上的数据进行加密的能力。

下面的例子:

<camelContext xmlns="http://camel.apache.org/schema/spring" trace="false" streamCache="true">

    <streamCaching id="myCacheConfig"  spoolDirectory="target/cachedir" spoolThreshold="16"/>

    <route id="SPLIT-FLOW" streamCache="true">
        <from uri="file:src/data/forSplitCaching\SimpleRecord?noop=true"/>
        <split streaming="true">
            <tokenize token="\n"/>
            <to uri="direct:PROCESS-BUSINESS"/>
        </split>
    </route>

    <route id="PROCESS-BUSINESS" streamCache="true">
        <from uri="direct:PROCESS-BUSINESS"/>
        <bean ref="ProcessBusiness" method="dealRecord"/>
        <choice>
            <when>
                <simple>${in.header.CamelSplitComplete} == "true"</simple>
                <to uri="direct:STREAM-CACHING"/>
            </when>
        </choice>
    </route>

    <route id="STREAM-CACHING">
        <from uri="direct:STREAM-CACHING"/>
        <bean ref="ProcessStreamCaching" method="usingStream"/>
        <setHeader headerName="CamelFileName">
            <simple>${header.CamelFileName}.${header.CamelSplitIndex}</simple>
        </setHeader>
        <to uri="file:src/out"/>
    </route>

</camelContext>

dealRecord 方法将分割后的每一行放入一个缓存中:

public void dealRecord(Exchange exchange) throws Exception { 

   String body; 
   File file; 
   String[] files; 
   boolean isSplitComplete; 

   body = (String) exchange.getIn().getBody(); 
   isSplitComplete = (boolean) exchange.getProperties().get("CamelSplitComplete"); 

   CachedOutputStream cos = new CachedOutputStream(exchange, false); 
   cos.write(body.getBytes("UTF-8")); 

   file = new File("target/cachedir"); 
   files = file.list(); 
   for (String nameTmpfile : files) { 
      LOG.info("Genered File [" + nameTmpfile + "]"); 
   } 

   lstCache.add(cos); 

   if(isSplitComplete){ 
      exchange.getIn().setHeader("Cached",lstCache); 
   } 
} 

使用Stream的方法,可以使用header中存在的每一个缓存

public byte[] usingStream(Exchange exchange) throws InputStreamException { 

   final ArrayList<CachedOutputStream> lstcache; 
   byte[] bytesMessage; 
   StringBuilder messageCompleteOut = new StringBuilder(); 
   InputStream is = null; 

   lstcache = (ArrayList<CachedOutputStream>) exchange.getIn().getHeader("Cached"); 
   for (CachedOutputStream oneCache : lstcache) { 
      try { 
         is = oneCache.getWrappedInputStream(); 
         String messageInputstream = toString(is); 
         LOG.info("Message of Cache ["+ messageInputstream +"]"); 
         messageCompleteOut.append(messageInputstream); 
         messageCompleteOut.append(System.lineSeparator()); 
      } catch (IOException e) { 
         LOG.error(InputStreamException.ERROR_MANIPULATING_INPUT_STREAM_CHANNEL); 
         throw new InputStreamException(InputStreamException.ERROR_MANIPULATING_INPUT_STREAM_CHANNEL,e); 
      } 
      // On ferme le flux 
      IOHelper.close(is); 
   } 
   bytesMessage = messageCompleteOut.toString().getBytes(Charset.forName("UTF-8")); 
   return bytesMessage; 
} 

这个解决方案看起来不错吗?或者也许有更好的方法?

谢谢

【问题讨论】:

    标签: java xml caching apache-camel


    【解决方案1】:

    GenericFileMessage(文件组件使用的消息实现)不会将文件内容加载到内存中,除非需要。所以实际上你只需要确保你不会以强制它转换它的方式访问身体。您还可以编写自己的消息(继承自 GenericFileMessage)并阻止此类转换,或返回不同的内容(某种“摘要”)。

    一路上的处理器可以(从消息头)获取文件在文件系统中的位置并直接打开它,可能会用其他消息替换文件消息。

    【讨论】:

    • 这个类GenericFileMessage允许你管理消息,默认情况下,文件被加载到内存中,任何可用的方法指定建立在特定的缓存中。
    • 看到这个答案:stackoverflow.com/a/9389465/2956532文件的内容没有加载到内存中。 Camel 实际上在消息体中传递 WrappedFile 实例。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-18
    • 1970-01-01
    • 2014-10-28
    • 2012-01-31
    • 1970-01-01
    • 2015-02-07
    相关资源
    最近更新 更多