【问题标题】:Mulesoft Batch aggregator ImmutableRecordAwareList payloadMulesoft 批处理聚合器 ImmutableRecordAwareList 有效负载
【发布时间】:2022-10-05 10:20:17
【问题描述】:

在使用 Anypoint studio 7.12 和 mule Runtime 4.4 批处理流时,当数据转换完成并由 Batch Aggregator 聚合记录时,记录的集合是类型不可变记录感知列表,当我试图将聚合器中的数据写入文件时,我遇到了错误,这表明 mule 运行时正在尝试将不可变集合转换为导致错误的 InputStream。

想知道是否有其他人遇到过类似的问题,并且您知道如何解决它。

错误:

com.mulesoft.mule.runtime.module.batch.internal.commit.ImmutableRecordAwareList could not be transformed to the desired type java.io.InputStream"

请分享您的评论以帮助我解决它。 以下是导致此错误的示例 mule 配置文件

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
    xmlns:file="http://www.mulesoft.org/schema/mule/file"
    xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd">
    <http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="3c39b3e8-228c-4dcf-a145-2204df0a7ba6" >
        <http:listener-connection host="0.0.0.0" port="8081" />
    </http:listener-config>
    <file:config name="File_Config" doc:name="File Config" doc:id="5e47593d-5967-4dc6-ad57-6da7e2530779" >
        <file:connection workingDir="C:\workspace\TestData\Mulesoft" />
    </file:config>
    <flow name="batchfilewriterFlow" doc:id="83e23d37-3594-40a8-a8cd-805b8242d6f9" >
        <http:listener doc:name="Listener" doc:id="df99d40a-2826-4d33-9565-5a15c0a49c05" config-ref="HTTP_Listener_config" path="/writer"/>
        <file:read doc:name="Read csv" doc:id="34172ee1-f238-433d-b712-a56f79517e50" config-ref="File_Config" path="Contact.csv" outputMimeType="application/csv; header=true; separator=|"/>
        <ee:transform doc:name="TO CVS" doc:id="3e36402d-7c54-464d-a1a5-519e7bd96fac" >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/csv header=true, separator=","
---
payload]]></ee:set-payload>
            </ee:message>
        </ee:transform>
        <batch:job jobName="batchfilewriterBatch_Job" doc:id="c9695d46-b534-4202-8b88-0e1142617c2a" >
            <batch:process-records >
                <batch:step name="Batch_Step" doc:id="577a43b4-262e-4dcd-aa79-8c24a466fdd2" >
                    <ee:transform doc:name="Transform Message" doc:id="4e61f76f-299d-4049-89f4-0dc3b4cc0e91" >
                        <ee:message >
                            <ee:set-payload ><![CDATA[%dw 2.0
output application/java
---
payload]]></ee:set-payload>
                        </ee:message>
                    </ee:transform>
                    <batch:aggregator doc:name="Batch Aggregator" doc:id="201a1803-91a2-46aa-bffd-998b5a03f53f" size="200">
                        <file:write doc:name="Write" doc:id="7e7318ec-e5a8-425c-adae-bc960731357a" config-ref="File_Config" path="Error.csv" />
                    </batch:aggregator>
                </batch:step>
            </batch:process-records>
        </batch:job>
    </flow>
</mule>

【问题讨论】:

    标签: dataweave mulesoft mule4 mule-component


    【解决方案1】:

    批处理的这种用法对我来说毫无意义。

    1. 批次的输入应该是记录。该流程将从文件读取的 CSV 转换为另一个 CSV,只是使用不同的分隔符。这是没有意义的,因为批处理的目的是处理记录,将文件转换为 Java 记录会更有效。
    2. 在批处理中写入文件不是一个好主意。 Mule 中的批处理作业在多个线程中执行。这意味着该文件可能会被覆盖和/或损坏。
    3. 仅在批处理步骤中转换为 Java 意味着文件写入操作将不知道如何处理生成的 Java 对象。您需要将该有效负载转换为可以写入文件的内容。
    4. 但比前面的所有要点更重要的是,根本没有理由为此流程使用批处理。它没有任何可以利用批处理作业的面向记录的处理。

      相反,您可以完全删除批次,只需将转换后的 CSV 的输出直接写入文件。如果文件很大,您可能想尝试在文件读取和转换中使用streaming writer 属性以减少内存使用。

      例子:

      • outputMimeType="application/csv; header=true; separator='|'; streaming=true"
      • output application/csv header=true, separator=",", streaming=true

    【讨论】:

    • 感谢您的快速回复,我同意您的评论,即此批处理流程没有任何商业意义,因为它正在读取 PIPE 分隔的 CSV 文件并用逗号写入另一个 CSV 文件。
    • 我同意你的 cmets ,我把这个流程放在一起是为了理解为什么批处理聚合器创建不可变对象,这在我写入文件或任何需要 inputStream 的目标时导致异常。如果我考虑您的第二点,我不应该批量发送数据,因为它是多线程和异步的,并且这些线程可能会相互踩踏以覆盖和损坏数据。就您的观点而言,我认为将转换器输出保持为 CSV 将导致有效负载成为将被馈送到批处理的记录数组,这是不正确的方法。
    • 它可能会被转换,但将它再次转换为 CSV 然后解析 yo 记录将是多余的。该错误可能是因为 3) 在聚合器中。您没有提供错误消息中的所有上下文行以确保,但我认为这并不重要,因为用例不正确。
    • 对某些人来说可能没有意义,但是在 MuleSoft 文档的批处理主题下描述了将数据写入 CSV 文件;此错误也不限于此特定用例。我在聚合器将数据写入 Salesforce 时遇到了同样的错误。在这种情况下,使用确实有意义。
    • 我相信你参考了这个例子:docs.mulesoft.com/mule-runtime/4.3/…。聚合器正在使用流式传输。在这种情况下,我不确定这是否意味着其他批处理线程不会相互干扰。该示例也可能是部分示例。这需要更多的分析才能理解。
    【解决方案2】:

    我为上面提到的问题找到了一些解决方法,所以我想与您分享。不确定这是否是问题的完美解决方案,但确实解决了问题。

    1. 在批处理聚合器中,如果我将有效负载转换为可序列化的自定义 java 对象,我不会收到错误消息。在我的情况下,我将有效负载发送到 Salesforce Bulk API 2,并且通过上述更改,我没有看到上述错误

    2. 如果您不想使用自定义可序列化对象,那么您可以添加一个转换器,如

      有效载荷映射(值,索引)-> 值 不确定使用上述转换但上述错误消失的有效载荷会发生什么。

      稍后我将更新这篇文章并添加更多发现

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-10-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多