【发布时间】:2014-10-22 02:42:03
【问题描述】:
目标 - 解密 .pgp 加密文件,以流形式读取数据,根据供应商要求执行转换,以流形式加密并写入文件。
Logic - 自定义 Reader、Writer 和 Tasklet,将解密/加密的数据存储到 ExecutionContext 并传递到不同的步骤。
适用于 - 小文件 (~1MB)
面临的问题 - 尝试使用(~10MB - 10K 记录) - 读取步骤成功,但是当开始将数据作为加密文件写入时 - 内存问题 - java.lang.OutOfMemoryError: Java heap space
代码 sn-p -
<job id="testJob" xmlns="http://www.springframework.org/schema/batch">
<!-- Read Encrypted file and Decrypt -->
<batch:step id="decryptFile" next="readAndWriteData">
<batch:tasklet ref="fileDecryptionTasklet">
<batch:listeners>
<batch:listener ref="decryptFileListener" />
</batch:listeners>
</batch:tasklet>
</batch:step>
<!-- Read data from decryption step and write to Stream -->
<batch:step id="readAndWriteData" next="encryptFile">
<batch:tasklet>
<batch:chunk reader="hrdsCustomReader" processor="Processor"
writer="CustomWriter" commit-interval="${.ftp.comit.interval}" />
<batch:listeners>
<batch:listener ref="encryptFileListener" />
</batch:listeners>
</batch:tasklet>
</batch:step>
<!-- Write to vendor specific file -->
<batch:step id="encryptFile">
<batch:tasklet ref="fileEncryptionTasklet" />
</batch:step>
</job>
Tasklet 和自定义编写器代码 sn-ps -
@Override
public String read() throws Exception, UnexpectedInputException,
ParseException {
decryptedData = (String) stepExecution.getJobExecution()
.getExecutionContext().get("DecryptedData");
if (decryptedData != null)
//logger.info("decryptedData in Custom Reader - \n" + decryptedData);
stepExecution.getJobExecution().getExecutionContext()
.put("DecryptedData", null);
return decryptedData;
}
public void write(List items) throws Exception {
logger.info("Begin writing data as an Encrypted File");
Iterator itr = items.iterator();
while(itr.hasNext()) {
String element = itr.next();
lineBuffer.append(element+LINE_SEPARATOR);
}
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("EncryptedData", lineBuffer);
}
public RepeatStatus execute(StepContribution step, ChunkContext chunk)
throws Exception {
InputStream inputstream = new FileInputStream(inputdirectory);
Message encryptMessage = MessageBuilder
.withPayload(inputstream)
.setHeader(
FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER,
"decryptAndVerify")
.setHeader(
FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER,
EncryptionUtil.DECRYPT_STREAM_OPERATION)
.setHeader(FileEncryptionTransformer.SOURCE_FILE_NAME_HEADER,
filename).build();
InputStream inputStream = pgptransformer
.doTransformStream(encryptMessage);
String strData = IOUtils.toString(inputStream, "UTF-8");
inputstream.close();
chunk.getStepContext().getStepExecution().getExecutionContext().put("DecryptedData", strData);
return null;
}
公共重复状态执行(StepContribution 步骤,ChunkContext 块)
抛出异常 {
lineBuffer = (StringBuffer) chunk.getStepContext()
.getJobExecutionContext().get("EncryptedData");
byte[] bytes = lineBuffer.toString().getBytes();
InputStream inputStream = new ByteArrayInputStream(bytes);
消息 encryptMessage = MessageBuilder
.withPayload(inputStream)
.setHeader(PGPFileTransformer.OUTPUT_FILE_FOLDER,
输出目录)
.setHeader(
FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER,
“签名和加密”)
.setHeader(
FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER,
EncryptionUtil.ENCRYPT_STREAM_OPERATION)
.setHeader(FileEncryptionTransformer.SOURCE_FILE_NAME_HEADER,
文件名).build();
pgptransformer.doTransform(encryptMessage);
inputStream.close();
chunk.getStepContext().getStepExecution().getExecutionContext().put("EncryptedData", null);
返回空值;
}
感谢有人可以帮助解决问题。
【问题讨论】:
-
您将内容存储在内存中,这对小对象而不是对大对象来说很好。不要将其存储在内存中。您应该创建一个可以解密/加密文件的 InputStream,加密/解密是透明处理的,您可以使用 flatfileitemreader/writer 来处理项目。
-
您好,Deinum,感谢您的回复。由于数据的敏感性,我不应该将解密的数据写入文件。因此,我将数据解密为 InputStream 并转换为字符串以存储在 ExecutionContext 中。后来我在下一步中获取字符串 - 处理并再次将其作为 StringBuffer 添加到 ExecutionContext 以写入加密文件。
-
我在哪里说您需要存储解密的数据……您读取加密的数据,再次处理和写入/加密。您不想将整个文件保存在内存中。更糟糕的是,您在内存中保留了大约 4 到 5 次。 Read、byte[]、StringBuffer、String……这些都是副本,不断增加你的内存使用量。
-
您没有传递
InputStream,您的项目阅读器需要将 InputStream 视为任何其他输入流读取行(或行取决于文件中的内容),传递处理器将其交给作者。您基本上以 1 步而不是 3 步结束。
标签: spring encryption batch-processing