【问题标题】:Spring Integration Inbound-Channel-Adapter to read large files line-by-lineSpring Integration Inbound-Channel-Adapter 逐行读取大文件
【发布时间】:2014-11-21 15:26:39
【问题描述】:

我目前正在使用 Spring Integration 4.1.0 和 Spring 4.1.2。 我需要能够逐行读取文件并将读取的每一行用作消息。基本上我想允许我们的一个消息源“重播”,但消息不是保存在单个文件中,而是保存在单个文件中。我对这个用例没有交易要求。 我的要求与这篇文章类似,除了与运行 JVM 的服务器位于同一服务器上的文件:spring integration - read a remote file line by line

在我看来,我有以下选择:

1。使用int-file:inbound-channel-adapter 读取文件,然后“拆分”该文件,使 1 条消息现在变成多条消息。 示例配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

        <int-file:inbound-channel-adapter id="filereader" directory="/tmp" filename-pattern="myfile.txt" channel="channel1"/>
        <int-file:file-to-string-transformer input-channel="channel1" output-channel="channel2"/>
        <int:channel id="channel1"/>
        <int:splitter input-channel="channel2" output-channel="nullChannel"/>
        <int:channel id="channel2"/>
    </beans>

问题是文件非常大,当使用上述技术时,整个文件首先被读入内存,然后被拆分,JVM 堆空间不足。真正需要的步骤是:读取一行并将行转换为消息,发送消息,从内存中删除消息,重复。

  1. 使用int-file:tail-inbound-channel-adapterend="false"(基本上表示从文件开头读取)。根据每个文件的需要启动和停止此适配器(每次启动前更改文件名)。 示例配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    
        <int-file:tail-inbound-channel-adapter id="apache"
            channel="exchangeSpringQueueChannel"
            task-executor="exchangeFileReplayTaskExecutor"
            file="C:\p2-test.txt"
            delay="1"
            end="false"
            reopen="true"
            file-delay="10000" />
    
        <int:channel id="exchangeSpringQueueChannel" />
        <task:executor id="exchangeFileReplayTaskExecutor" pool-size="1" />
    </beans>
    
  2. 让 Spring Integration 调用 Spring Batch 并使用 ItemReader 处理文件。当然允许对整个过程进行更细粒度的控制,但需要大量工作来设置作业存储库等(而且我不关心作业历史记录,所以我要么告诉作业不要记录状态和/或使用内存中的MapJobRepository)。

4。通过扩展MessageProducerSupport 创建我自己的FileLineByLineInboundChannelAdapter。 大部分代码可以从ApacheCommonsFileTailingMessageProducer 借用(另见http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter)。下面是一个示例,但需要一些工作才能将读数放入它自己的 Thread 中,以便我在逐行阅读时遵守 stop() 命令。

    package com.xxx.exchgateway.common.util.springintegration;

    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import org.apache.commons.io.IOUtils;
    import org.springframework.core.task.SimpleAsyncTaskExecutor;
    import org.springframework.core.task.TaskExecutor;
    import org.springframework.integration.core.MessageSource;
    import org.springframework.integration.endpoint.MessageProducerSupport;
    import org.springframework.integration.file.FileHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.util.Assert;

    /**
     * A lot of the logic for this class came from {@link #ApacheCommonsFileTailingMessageProducer}.
     * See {@link http://forum.spring.io/forum/spring-projects/integration/119897-custom-upd-inbound-channel-adapter}
     */
    public class FileLineByLineInboundChannelAdapter extends MessageProducerSupport implements MessageSource<String> {
        private volatile File file;

        /**
         * The name of the file you wish to tail.
         * @param file The absolute path of the file.
         */
        public void setFile(File file) {
            Assert.notNull("'file' cannot be null");
            this.file = file;
        }

        protected File getFile() {
            if (this.file == null) {
                throw new IllegalStateException("No 'file' has been provided");
            }
            return this.file;
        }

        @Override
        public String getComponentType() {
            return "file:line-by-line-inbound-channel-adapter";
        }

        private void readFile() {
            FileInputStream fstream;
            try {
                fstream = new FileInputStream(getFile());

                BufferedReader br = new BufferedReader(new InputStreamReader(fstream));

                String strLine;

                // Read File Line By Line, make sure we honor if someone manually sets the isRunning=false (via clicking the stop() method in JMX)
                while ((strLine = br.readLine()) != null && isRunning()) {
                    send(strLine);
                }

                //Close the input stream
                IOUtils.closeQuietly(br);
                IOUtils.closeQuietly(fstream);
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        @Override
        protected void doStart() {
            super.doStart();

            // TODO this needs to be moved into it's own thread since isRunning() will return "false" until this method has completed
            // and we want to honor the stop() command while we read line-by-line
            readFile();
        }

        protected void send(String line) {
            Message<?> message = this.getMessageBuilderFactory().withPayload(line).setHeader(FileHeaders.FILENAME, this.file.getAbsolutePath()).build();
            super.sendMessage(message);
        }

        @Override
        public Message<String> receive() {
            // TODO Auto-generated method stub
            return null;
        }
    }

在我看来,我的用例并没有超出人们可能喜欢做的典型事情的范围,所以我很惊讶我找不到开箱即用的解决方案。然而,我已经搜索了很多并查看了很多示例,不幸的是还没有找到适合我需要的东西。

我假设我可能错过了框架已经提供的一些明显的东西(尽管这可能属于 Spring Integraton 和 Spring Batch 之间的模糊界限)。如果我对自己的想法完全不满意,或者我错过了一个简单的解决方案,或者提供替代建议,有人可以告诉我吗?

【问题讨论】:

    标签: java spring spring-batch spring-integration


    【解决方案1】:

    Spring Integration 4.x 有一个很好的新特性,即使用迭代器作为消息:

    Spring Integration Reference

    从 4.1 版开始,AbstractMessageSplitter 支持 Iterator 类型来拆分值。

    这允许将迭代器作为消息发送,而不是将整个文件读入内存。

    Here is Spring Context 将 CSV 文件拆分为每行一条消息的简单示例:

    <int-file:inbound-channel-adapter 
            directory="${inputFileDirectory:/tmp}"
            channel="inputFiles"/>
    
    <int:channel id="inputFiles">
        <int:dispatcher task-executor="executor"/>
    </int:channel>
    
    <int:splitter 
        input-channel="inputFiles" 
        output-channel="output">
        <bean 
            class="FileSplitter" 
            p:commentPrefix="${commentPrefix:#}" />
    </int:splitter>
    
    <task:executor 
        id="executor" 
        pool-size="${poolSize:8}" 
        queue-capacity="${aueueCapacity:0}" 
        rejection-policy="CALLER_RUNS" />
    
    <int:channel id="output"/>
    

    这是splitter implementation

    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.Iterator;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.integration.splitter.AbstractMessageSplitter;
    import org.springframework.integration.transformer.MessageTransformationException;
    import org.springframework.messaging.Message;
    import org.springframework.util.Assert;
    
    public class FileSplitter extends AbstractMessageSplitter {
        private static final Logger log = LoggerFactory.getLogger(FileSplitter.class);
    
        private String commentPrefix = "#";
    
        public Object splitMessage(Message<?> message) {
            if(log.isDebugEnabled()) {
                log.debug(message.toString());
            }
            try {
    
                Object payload = message.getPayload();
                Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload"); 
    
                return new BufferedReaderFileIterator((File) payload);
            } 
            catch (IOException e) {
                String msg = "Unable to transform file: " + e.getMessage();
                log.error(msg);
                throw new MessageTransformationException(msg, e);
            }
        }
    
        public void setCommentPrefix(String commentPrefix) {
            this.commentPrefix = commentPrefix;
        }
    
        public class BufferedReaderFileIterator implements Iterator<String> {
    
            private File file;
            private BufferedReader bufferedReader;
            private String line;
    
            public BufferedReaderFileIterator(File file) throws IOException {
                this.file = file;
                this.bufferedReader = new BufferedReader(new FileReader(file));
                readNextLine();
            }
    
            @Override
            public boolean hasNext() {
                return line != null;
            }
    
            @Override
            public String next() {
                try {
                    String res = this.line;
                    readNextLine();
                    return res;
                } 
                catch (IOException e) {
                    log.error("Error reading file", e);
                    throw new RuntimeException(e);
                }   
            }
    
            void readNextLine() throws IOException {
                do {
                    line = bufferedReader.readLine();
                }
                while(line != null && line.trim().startsWith(commentPrefix));
    
                if(log.isTraceEnabled()) {
                    log.trace("Read next line: {}", line);
                }
    
                if(line == null) {
                    close();
                }
            }
    
            void close() throws IOException {
                bufferedReader.close();
                file.delete();
            }
    
            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
    
        }
    
    }
    

    请注意从 splitMessage() 处理程序方法返回的 Iterator 对象。

    【讨论】:

    • 感谢 Smollet 的回复。这真的很有帮助。我尝试了您的解决方案(甚至是第二个更新版本),它有一个小故障,它读取文件的第一行,然后继续读取同一行。因此,我编辑了您的原始帖子,对 next() 和 readNextLine() 方法进行了略微修改。我假设(可能是错误的)大多数人会希望在读取文件的所有行后停止读取文件。如果这确实是人们的要求,那么我所做的更新还调整了 hasNext() 方法以在读取最后一行后关闭文件。
    • 有趣的地方!过去几个小时我一直在调试类似的问题(循环读取相同的文件)。读者确实需要关闭,但您的更改并没有解决我当前的问题。我怀疑 file:inbound-channel-adapter 一遍又一遍地发送带有相同文件的消息(如果没有在入站通道适配器上设置 prevent-duplicates="true" )。此外,在处理完文件后删除它会很好。
    • 我冒昧地更新了代码 sn-p 以帮助我在 Spring XD 环境中完成文件处理。 @TonyFalabella 它也应该与您的需求兼容(如果不需要,只需将 file.delete() 注释掉)。
    • 这里有一个更好的 Splitter 实现:stackoverflow.com/questions/27171978/…
    • 我查看了 Artem 的回复,并希望在不编写代码的情况下尽可能多地在 SpEL 中做,所以我会接受他的建议。再次感谢 Smollet。
    【解决方案2】:

    我也有这个,我也将文件复制到另一个文件夹并从文件中读取数据

    fileCopyApplicationContext.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:file="http://www.springframework.org/schema/integration/file"
        xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/integration
                http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/integration/file
                http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
                http://www.springframework.org/schema/context 
                http://www.springframework.org/schema/context/spring-context.xsd">
    
        <context:property-placeholder />
    
        <file:inbound-channel-adapter id="filesIn"
            directory="E:/usmandata/logs/input/" filter="onlyPropertyFiles"
            auto-startup="true">
            <int:poller id="poller" fixed-delay="500" />
        </file:inbound-channel-adapter>
    
    
    
        <int:service-activator input-channel="filesIn"
            output-channel="filesOut" ref="handler" />
    
        <file:outbound-channel-adapter id="filesOut"
            directory="E:/usmandata/logs/output/" />
    
    
    
    
        <bean id="handler" class="com.javarticles.spring.integration.file.FileHandler" />
        <bean id="onlyPropertyFiles"
            class="org.springframework.integration.file.config.FileListFilterFactoryBean"
            p:filenamePattern="*.log" />
    </beans>
    

    FileHandler.java

    package com.javarticles.spring.integration.file;
    
    import java.io.File;
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    
    public class FileHandler {
        public File handleFile(File input) throws IOException {
           // System.out.println("Copying file: " + input.getAbsolutePath());
    
    
            RandomAccessFile file = new RandomAccessFile(input,"r");
    
            FileChannel channel = file.getChannel();
    
            //System.out.println("File size is: " + channel.size());
    
            ByteBuffer buffer = ByteBuffer.allocate((int) channel.size());
    
            channel.read(buffer);
    
            buffer.flip();//Restore buffer to position 0 to read it
    
            System.out.println("Reading content and printing ... ");
    
            for (int i = 0; i < channel.size(); i++) {
                System.out.print((char) buffer.get());
            }
    
            channel.close();
            file.close();
            return input;
        }
    }
    

    SpringIntegrationFileCopyExample.java

    package com.javarticles.spring.integration.file;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.util.Properties;
    
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class SpringIntegrationFileCopyExample {
    
        public static void main(String[] args) throws InterruptedException, IOException {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                    "fileCopyApplicationContext.xml");
    
        }
    
    }
    

    【讨论】:

      猜你喜欢
      • 2013-07-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-11-14
      相关资源
      最近更新 更多