【问题标题】:How to read huge CSV file in Mule如何在 Mule 中读取巨大的 CSV 文件
【发布时间】:2013-04-30 00:38:19
【问题描述】:

我正在使用 Mule Studio 3.4.0 社区版。 关于如何解析使用 File Endpoint 传入的大型 CSV 文件,我有一个大问题。场景是我有 3 个 CSV 文件,我会将文件的内容放入数据库。 但是当我尝试加载一个大文件(大约 144MB)时,我得到了“OutOfMemory”异常。我认为将我的大 CSV 划分/拆分为更小的 CSV 的解决方案(我不知道这个解决方案是否是最好的)o 尝试找到一种方法来处理 CSV 而不会引发异常。

<file:connector name="File" autoDelete="true" streaming="true" validateConnections="true" doc:name="File"/>

<flow name="CsvToFile" doc:name="CsvToFile">
        <file:inbound-endpoint path="src/main/resources/inbox" moveToDirectory="src/main/resources/processed"  responseTimeout="10000" doc:name="CSV" connector-ref="File">
            <file:filename-wildcard-filter pattern="*.csv" caseSensitive="true"/>
        </file:inbound-endpoint>
        <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property"/>
        <choice doc:name="Choice">
            <when expression="INVOCATION:nome_file=azienda" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/companies-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Azienda"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertAziende" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Azienda">
                    <jdbc-ee:query key="InsertAziende" value="INSERT INTO aw006_azienda VALUES (#[map-payload:AW006_ID], #[map-payload:AW006_ID_CLIENTE], #[map-payload:AW006_RAGIONE_SOCIALE])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
            <when expression="INVOCATION:nome_file=servizi" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/services-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Servizi"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertServizi" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Servizi">
                    <jdbc-ee:query key="InsertServizi" value="INSERT INTO ctrl_aemd_unb_servizi VALUES (#[map-payload:CTRL_ID_TIPO_OPERAZIONE], #[map-payload:CTRL_DESCRIZIONE], #[map-payload:CTRL_COD_SERVIZIO])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
            <when expression="INVOCATION:nome_file=richiesta" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/requests-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Richiesta"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertRichieste" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Richiesta">
                    <jdbc-ee:query key="InsertRichieste" value="INSERT INTO ctrl_aemd_unb_richiesta VALUES (#[map-payload:CTRL_ID_CONTROLLER], #[map-payload:CTRL_NUM_RICH_VENDITORE], #[map-payload:CTRL_VENDITORE], #[map-payload:CTRL_CANALE_VENDITORE], #[map-payload:CTRL_CODICE_SERVIZIO], #[map-payload:CTRL_STATO_AVANZ_SERVIZIO], #[map-payload:CTRL_DATA_INSERIMENTO])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
        </choice>   
    </flow>

拜托,我不知道如何解决这个问题。 在此先感谢您的任何帮助

【问题讨论】:

    标签: csv mule mule-studio


    【解决方案1】:

    正如 SteveS 所说,csv-to-maps-transformer 可能会在处理之前尝试将整个文件加载到内存中。您可以尝试做的是将 csv 文件拆分为较小的部分,然后将这些部分发送到 VM 以单独处理。 首先,创建一个组件来实现这第一步:

    public class CSVReader implements Callable{
        @Override
        public Object onCall(MuleEventContext eventContext) throws Exception {
    
            InputStream fileStream = (InputStream) eventContext.getMessage().getPayload();
            DataInputStream ds = new DataInputStream(fileStream);
            BufferedReader br = new BufferedReader(new InputStreamReader(ds));
    
            MuleClient muleClient = eventContext.getMuleContext().getClient();
    
            String line;
            while ((line = br.readLine()) != null) {
                muleClient.dispatch("vm://in", line, null);
            }
    
            fileStream.close();
            return null;
        }
    }
    

    然后,将您的主要流程一分为二

    <file:connector name="File" 
        workDirectory="yourWorkDirPath" autoDelete="false" streaming="true"/>
    
    <flow name="CsvToFile" doc:name="Split and dispatch">
        <file:inbound-endpoint path="inboxPath"
            moveToDirectory="processedPath" pollingFrequency="60000"
            doc:name="CSV" connector-ref="File">
            <file:filename-wildcard-filter pattern="*.csv"
                caseSensitive="true" />
        </file:inbound-endpoint>
        <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property" />
        <component class="com.dgonza.CSVReader" doc:name="Split the file and dispatch every line to VM" />
    </flow>
    
    <flow name="storeInDatabase" doc:name="receive lines and store in database">
        <vm:inbound-endpoint exchange-pattern="one-way"
            path="in" doc:name="VM" />
        <Choice>
            .
            .
            Your JDBC Stuff
            .
            .
        <Choice />
    </flow>
    

    保持您当前的file-connector 配置以启用流式传输。使用此解决方案,可以处理 csv 数据,而无需先将整个文件加载到内存中。 高温

    【讨论】:

    • 非常感谢史蒂夫斯和丹尼尔,我会试试这个解决方案。
    • 嗨,我尝试使用您的架构,但是,尽管我能够插入数百行,但有时我会收到以下消息:
    • INFO 2013-05-07 18:23:18,379 [[splitmultithread].FileSplitter.receiver.02] org.mule.transport.file.FileMessageReceiver:在文件中获得锁:C:\workspace_3。 4\splitmultithread\src\main\resources\inbox\richiesta.csv 错误 2013-05-07 18:24:00,144 [[splitmultithread].storeInDatabase.stage1.04] org.mule.processor.AsyncWorkListener:工作导致 ' 异常工作完成”。正在执行的工作是:org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@3bc752
    • 您的file:inbound-endpointpollingFrequency 值是多少?尝试增加此值。也许这个端点正在尝试读取仍在处理的文件。
    • 是的,丹尼尔,这就是问题所在。您知道如何禁用轮询,或者应该更好的是,仅当目录中有文件时如何触发文件端点?而不是每 x 秒?
    【解决方案2】:

    我相信 csv-to-maps-transformer 会强制整个文件进入内存。由于您正在处理一个大文件,就个人而言,我倾向于只编写一个 Java 类来处理它。 File 端点会将文件流传递给您的自定义转换器。然后,您可以建立 JDBC 连接并一次提取一行信息,而无需加载整个文件。我使用 OpenCSV 为我解析 CSV。因此,您的 java 类将包含以下内容:

    protected Object doTransform(Object src, String enc) throws TransformerException {  
    
        try {
            //Make a JDBC connection here
    
            //Now read and parse the CSV
    
            FileReader csvFileData = (FileReader) src;
    
    
            BufferedReader br = new BufferedReader(csvFileData);
            CSVReader reader = new CSVReader(br);
    
            //Read the CSV file and add the row to the appropriate List(s)
            String[] nextLine;
            while ((nextLine = reader.readNext()) != null) {
                //Push your data into the database through your JDBC connection
            }
            //Close connection.
    
                   }catch (Exception e){
        }
    

    【讨论】:

    • 首先,我非常感谢 SteveS 和 Daniel 的帮助。我会尝试你的解决方案,如果我有问题,我希望你会在这里。谢谢
    猜你喜欢
    • 1970-01-01
    • 2015-06-02
    • 1970-01-01
    • 2021-05-29
    • 2017-08-11
    • 2020-08-05
    • 1970-01-01
    相关资源
    最近更新 更多