【问题标题】:Stop renaming file if data processing fails while streaming remote directory file如果在流式传输远程目录文件时数据处理失败,则停止重命名文件
【发布时间】:2019-05-21 04:27:02
【问题描述】:

我正在使用 SFTP 从远程目录读取文件。我可以使用出站网关通过流获取文件,甚至将其移动到存档文件夹。

我正在处理文件中的数据,但如果数据中有问题,那么我会抛出错误。 我不想重命名文件如果在处理数据时抛出任何错误,我该如何实现。如果我能在使用 Spring 集成时获得一些错误处理程序的良好做法,那将非常有帮助。

.handle(Sftp.outboundGateway(sftpSessionFactory(), GET, "payload.remoteDirectory + payload.filename").options(STREAM).temporaryFileSuffix("_reading"))
.handle(readData(),c->c.advice(afterReading()))
.enrichHeaders(h -> h
        .headerExpression(FileHeaders.RENAME_TO, "headers[file_remoteDirectory] + 'archive/' + headers[file_remoteFile]")
        .headerExpression(FileHeaders.REMOTE_FILE, "headers[file_remoteFile]")
        .header(FileHeaders.REMOTE_DIRECTORY, "headers[file_remoteDirectory]"))
.handle(Sftp.outboundGateway(sftpSessionFactory(), MV, "headers[file_remoteDirectory]+headers[file_remoteFile]").renameExpression("headers['file_renameTo']"))
.get();

@Bean
    public ExpressionEvaluatingRequestHandlerAdvice afterReading() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("successReading.input");
        advice.setOnSuccessExpressionString("payload + ' was successful streamed'");
        advice.setFailureChannelName("failureReading.input");
        advice.setOnFailureExpressionString("payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

    @Bean
    public IntegrationFlow successReading() {
        return f -> f.log();
    }

    @Bean
    public IntegrationFlow failureReading() {
        return f -> f.log(ERROR);
    }


public GenericHandler readData() {
return new GenericHandler() {
    @Override
    public Object handle(Object o, Map map) {
        InputStream file = (InputStream) o;
        String fileName = (String) map.get(REMOTE_FILE);
        try {
            // processing data
        } catch (Exception e) {
            return new SftpException(500, String.format("Error while processing the file %s because of  Error: %s and reason %s", fileName, e.getMessage(), e.getCause()));
        }
        Closeable closeable = (Closeable) map.get(CLOSEABLE_RESOURCE);
        if (closeable != null) {
            try {
                closeable.close();
                file.close();
            } catch (Exception e) {
                logger.error(String.format("Session didn`t get closed after reading the stream data for file %s and error %s"), fileName, e.getMessage());
            }
        }
        return map;
    }
    };
}

更新

【问题讨论】:

    标签: spring-integration spring-integration-dsl spring-integration-sftp


    【解决方案1】:

    ExpressionEvaluatingRequestHandlerAdvice 添加到.handler() 端点.handle(readData(), e -> e.advice(...))

    最后提供的建议类是o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice。此建议比其他两个建议更通用。它提供了一种机制来评估发送到端点的原始入站消息的表达式。在成功或失败之后,可以评估单独的表达式。可选地,可以将包含评估结果的消息与输入消息一起发送到消息通道。

    【讨论】:

    • 我已按照建议添加了建议(已更新)并提到了link,但我无法理解在哪里可以停止重命名,因为流程将进入成功通道如果我在服务中抛出错误。抱歉,由于我是 Spring 集成的新手,所以我问了一些琐碎的问题。
    • 这是不可能的;如果建议的处理程序抛出异常,则采用备用路径;我建议您在调试器中运行以找出问题所在。
    • 我希望您已经参考了有问题的更新代码。我也进行了调试,我可以看到当我从 readData() 方法抛出错误时,它总是在建议中进入成功通道。我在 .handle(readData(),c->c.advice(afterReading())) 之后添加了一个过滤器来检查消息是否包含错误实例,这对我有用,我认为这不是正确的方法。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-04-22
    • 1970-01-01
    相关资源
    最近更新 更多