【问题标题】:Decrypt File onto Stream , Transform and Encrypt as a file using Spring Batch使用 Spring Batch 将文件解密到流、转换和加密为文件
【发布时间】:2014-10-22 02:42:03
【问题描述】:

目标 - 解密 .pgp 加密文件,以流形式读取数据,根据供应商要求执行转换,以流形式加密并写入文件。

Logic - 自定义 Reader、Writer 和 Tasklet,将解密/加密的数据存储到 ExecutionContext 并传递到不同的步骤。

适用于 - 小文件 (~1MB)

面临的问题 - 尝试使用(~10MB - 10K 记录) - 读取步骤成功,但是当开始将数据作为加密文件写入时 - 内存问题 - java.lang.OutOfMemoryError: Java heap space

代码 sn-p -

<job id="testJob" xmlns="http://www.springframework.org/schema/batch">

    <!-- Read Encrypted file and Decrypt -->
    <batch:step id="decryptFile" next="readAndWriteData">
        <batch:tasklet ref="fileDecryptionTasklet">
            <batch:listeners>
                <batch:listener ref="decryptFileListener" />
            </batch:listeners>
        </batch:tasklet>
    </batch:step>

    <!-- Read data from decryption step and write to Stream -->
    <batch:step id="readAndWriteData" next="encryptFile">
        <batch:tasklet>
            <batch:chunk reader="hrdsCustomReader" processor="Processor"
                writer="CustomWriter" commit-interval="${.ftp.comit.interval}" />
            <batch:listeners>
                <batch:listener ref="encryptFileListener" />
            </batch:listeners>
        </batch:tasklet>
    </batch:step>

    <!-- Write to vendor specific file -->
    <batch:step id="encryptFile">
        <batch:tasklet ref="fileEncryptionTasklet" />
    </batch:step>

</job>

Tasklet 和自定义编写器代码 sn-ps -

@Override public String read() throws Exception, UnexpectedInputException, ParseException { decryptedData = (String) stepExecution.getJobExecution() .getExecutionContext().get("DecryptedData"); if (decryptedData != null) //logger.info("decryptedData in Custom Reader - \n" + decryptedData); stepExecution.getJobExecution().getExecutionContext() .put("DecryptedData", null); return decryptedData; } public void write(List items) throws Exception { logger.info("Begin writing data as an Encrypted File"); Iterator itr = items.iterator(); while(itr.hasNext()) { String element = itr.next(); lineBuffer.append(element+LINE_SEPARATOR); } ExecutionContext stepContext = this.stepExecution.getExecutionContext(); stepContext.put("EncryptedData", lineBuffer); } public RepeatStatus execute(StepContribution step, ChunkContext chunk) throws Exception { InputStream inputstream = new FileInputStream(inputdirectory); Message encryptMessage = MessageBuilder .withPayload(inputstream) .setHeader( FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER, "decryptAndVerify") .setHeader( FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER, EncryptionUtil.DECRYPT_STREAM_OPERATION) .setHeader(FileEncryptionTransformer.SOURCE_FILE_NAME_HEADER, filename).build(); InputStream inputStream = pgptransformer .doTransformStream(encryptMessage); String strData = IOUtils.toString(inputStream, "UTF-8"); inputstream.close(); chunk.getStepContext().getStepExecution().getExecutionContext().put("DecryptedData", strData); return null; } 公共重复状态执行(StepContribution 步骤,ChunkContext 块) 抛出异常 { lineBuffer = (StringBuffer) chunk.getStepContext() .getJobExecutionContext().get("EncryptedData"); byte[] bytes = lineBuffer.toString().getBytes(); InputStream inputStream = new ByteArrayInputStream(bytes); 消息 encryptMessage = MessageBuilder .withPayload(inputStream) .setHeader(PGPFileTransformer.OUTPUT_FILE_FOLDER, 输出目录) .setHeader( FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER, “签名和加密”) .setHeader( FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER, EncryptionUtil.ENCRYPT_STREAM_OPERATION) .setHeader(FileEncryptionTransformer.SOURCE_FILE_NAME_HEADER, 文件名).build(); pgptransformer.doTransform(encryptMessage); inputStream.close(); chunk.getStepContext().getStepExecution().getExecutionContext().put("EncryptedData", null); 返回空值; }

感谢有人可以帮助解决问题。

【问题讨论】:

  • 您将内容存储在内存中,这对小对象而不是对大对象来说很好。不要将其存储在内存中。您应该创建一个可以解密/加密文件的 InputStream,加密/解密是透明处理的,您可以使用 flatfileitemreader/writer 来处理项目。
  • 您好,Deinum,感谢您的回复。由于数据的敏感性,我不应该将解密的数据写入文件。因此,我将数据解密为 InputStream 并转换为字符串以存储在 ExecutionContext 中。后来我在下一步中获取字符串 - 处理并再次将其作为 StringBuffer 添加到 ExecutionContext 以写入加密文件。
  • 我在哪里说您需要存储解密的数据……您读取加密的数据,再次处理和写入/加密。您不想将整个文件保存在内存中。更糟糕的是,您在内存中保留了大约 4 到 5 次。 Read、byte[]、StringBuffer、String……这些都是副本,不断增加你的内存使用量。
  • stackoverflow.com/questions/6827725/… 可能会有所帮助。
  • 您没有传递InputStream,您的项目阅读器需要将 InputStream 视为任何其他输入流读取行(或行取决于文件中的内容),传递处理器将其交给作者。您基本上以 1 步而不是 3 步结束。

标签: spring encryption batch-processing


【解决方案1】:

能够在

逻辑 - 分块处理 - 200 条记录到 1 行。

在下面发布代码 -

批量配置-

<job id="aic-batch-xxx-ftp" xmlns="http://www.springframework.org/schema/batch">


    <!-- Read data , decrypt , process and write to encrypted file -->
    <batch:step id="readAndWriteData">
        <batch:tasklet>
            <batch:chunk reader="xxxCustomReader" processor="xxxFileProccessor"
                writer="xxxCustomWriter" commit-interval="${aic.batch.xxx.ftp.comit.interval}" />
        </batch:tasklet>
    </batch:step>


</job>

阅读器逻辑 -

StringBuffer decryptedData = new StringBuffer();
    String strLine = "";

    PGPLib pgp = new PGPLib();
    KeyStore keyStore = new KeyStore("xxx.keystore", "xxx");

    long startTime = System.currentTimeMillis();
    // Read & decrypt File Line By Line
    if ((strLine = bufferedReader.readLine()) != null) {
        strLine = strLine.replace("NEW_LINE", "\r\n");
        decryptedData.append((pgp.decryptString(strLine, keyStore,
                "xxx")));
        long endTime = System.currentTimeMillis();
        logger.debug("Total time taken = " + (endTime - startTime) + " msec");
        return decryptedData;
    }
    else
        return null;

作家逻辑 -

public void write(List<? extends StringBuffer> items) throws Exception {
    logger.debug("Begin writing data as an Encrypted File");

    @SuppressWarnings("unchecked")
    Iterator<StringBuffer> itr = (Iterator<StringBuffer>) items.iterator();
    while (itr.hasNext()) {
        StringBuffer element = itr.next();
        encrypt(element);
        count++;
    }
}

public void encrypt(StringBuffer element) throws PGPException, IOException {

    PGPLib pgp = new PGPLib();
    KeyStore keyStore = new KeyStore("xxx.keystore", "xxx");

    String strLine = element.toString();
    StringBuffer buffer = new StringBuffer("");
    int i = 0;
    long startTime = System.currentTimeMillis();

    if (i % 200 == 0) {
        if (i != 0) {
            String encryptString = pgp.encryptString(buffer.toString(),
                    keyStore,
                    "xxx");
            encryptString = encryptString.replace("\r\n", "NEW_LINE");
            bufferedWriter.write(encryptString);
            bufferedWriter.newLine();
        }
        buffer = new StringBuffer(strLine);
    } else {
        buffer.append("\r\n").append(strLine);
    }
    i++;
    if (buffer != null && buffer.length() > 0) {
        String encryptString = pgp.encryptString(buffer.toString(),
                keyStore, "xxx");
        encryptString = encryptString.replace("\r\n", "NEW_LINE");
        bufferedWriter.write(encryptString);
        bufferedWriter.newLine();
    }

    long endTime = System.currentTimeMillis();
    logger.debug("Total time taken = " + (endTime - startTime) + " msec");

}

【讨论】:

  • 看起来还有改进的余地。看起来您在 encrypt 方法中有一些重复,并且您每次需要它们时都在重新创建 PGPLibKeyStore 对象(对于读者和作者)也许您可以创建一个步骤范围的依赖项? .我还怀疑i % 200 应该以某种方式与提交间隔相关联。恕我直言,一个快速的胜利是使用StringBuilder 而不是StringBuffer。后者是同步的,第一个不是。
  • 只是一个想法,难道不可能创建一个读取器,它只逐行读取文件(加密),然后将它传递给解密-处理-加密(或在单个处理器)。这样你的读者只需要阅读,而你的作者只需要写作。您将限制创建/传递的对象数量(尤其是 PGP 的东西),这对于 GC 来说应该更容易。
  • 我面临的挑战是——使用普通加密,它会将整个文件作为一个整体进行加密,所以我无法逐行读取,它所有的字符都是不可读的。因此将 200/n 条记录加密为 1 行并将 \r\n 替换为 NEW_LINE,这样可以识别 1 个块(200 或 N 条记录)。 **提交间隔为1,读取器一次读取1个加密行,处理并写入文件。
  • 啊,真是太可惜了。然而,由于整个文件是加密的,因此我建议能够创建一个 InputStreamOutputStream 来透明地处理它。我无法想象周围没有这样的东西。它将大大提高性能并简化您的代码。
猜你喜欢
  • 1970-01-01
  • 2014-05-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-10-20
  • 2015-12-29
相关资源
最近更新 更多