【发布时间】:2023-04-02 14:33:01
【问题描述】:
我有一个类,它获取一个本地文件,对其进行转换,并将其存储在 GCS 中:
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }
import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._
class GcsService(gcsStorage: Storage) {
def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build
if (destination.unzipGzip) {
for (input ← managed(new ZipInputStream(Files.newInputStream(localPath)));
output ← managed(new GZIPOutputStream(Channels.newOutputStream(gcsStorage.writer(blobInfo))))) {
ByteStreams.copy(input, output)
}
} else if (destination.decompressBzip2) {
for (input <- managed(new BZip2CompressorInputStream(Files.newInputStream(localPath)));
output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
ByteStreams.copy(input, output)
}
} else {
for (input <- managed(Files.newInputStream(localPath));
output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
IOUtils.copy(input, output)
}
}
}
}
case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)
我正在尝试删除一些代码重复,特别是 fileInputStream 和 gcsOutputStream 的创建。但我不能简单地在方法顶部提取这些变量,因为它会在 scala-arm managed 块之外创建资源:
import java.io.{ InputStream, OutputStream }
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }
import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._
class GcsService(gcsStorage: Storage) {
def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build
// FIXME: creates a resource outside of the ARM block
val fileInputStream = Files.newInputStream(localPath)
val gcsOutputStream = Channels.newOutputStream(gcsStorage.writer(blobInfo))
if (destination.unzipGzip) {
unzipGzip(fileInputStream, gcsOutputStream)
} else if (destination.decompressBzip2) {
decompressBzip2(fileInputStream, gcsOutputStream)
} else {
copy(fileInputStream, gcsOutputStream)
}
}
private def unzipGzip(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input ← managed(new ZipInputStream(inputStream));
output ← managed(new GZIPOutputStream(outputStream))) {
ByteStreams.copy(input, output)
}
}
private def decompressBzip2(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input <- managed(new BZip2CompressorInputStream(inputStream));
output <- managed(outputStream)) {
ByteStreams.copy(input, output)
}
}
private def copy(inputStream: InputStream, outputStream: OutputStream): Unit = {
for (input <- managed(inputStream);
output <- managed(outputStream)) {
IOUtils.copy(input, output)
}
}
}
case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)
如您所见,代码更清晰,更可测试,但资源没有被正确处理,因为它们不是“托管”的。例如,如果在创建gcsOutputStream 时抛出异常,fileInputStream 将不会被关闭。
我可能可以使用Google Guava sources and sinks 解决这个问题,但我想知道在 Scala 中是否有更好的方法来解决这个问题,而无需引入 Guava。理想情况下使用标准库,或者 scala-arm 功能,或者甚至在Cats?
- 是否应该将
fileInputStream和gcsOutputStream定义为不接受任何内容并返回流的函数?似乎到处都有() => InputStream和() => OutputStream的代码会更冗长? - 我是否应该使用多个 scala-arm “托管” 进行理解(一个定义
fileInputStream和gcsOutputStream,另一个在每个子函数中定义)?如果我这样做,“内部”输入流将被关闭两次不是问题吗? - 是否有我没有看到的干净且“scalaish”的方法来执行此操作?
【问题讨论】:
-
你在使用 Scala 2.13 吗?您是否考虑过将scala.util.Using 用于您的部分(全部?)资源管理?
-
很有趣,我不知道这个新功能!可悲的是,我们仍在使用 Scala 2.12,由于技术债务,迁移到 Scala 2.13 将是一项巨大的努力,所以可能在几个月后......当我们迁移我们的
scala-arm代码到Using做。有没有一种干净的方法来解决我试图用Using做的事情,而scala-arm不可用?如果是这样,这可能是这个问题的一个有趣的答案,我想我可以使用 Guava 源/接收器,直到我们迁移。 -
我会推荐使用提供完整资源管理的效果系统。喜欢cats-effect
Resource或zioManaged。 -
或者您可以查看一个用于 GCS 的 Scala DSL,它提供流和资源管理,例如 Benji(我是其中的贡献者)
-
@LuisMiguelMejíaSuárez :确实,使用
effect system可能是干净地解决这个问题的最佳方法。我想这意味着从我们当前的“java 风格”转向更“函数式编程风格”。我将研究猫效应/ zio。我真的很想看到一个答案,该答案显示了该方法的代码外观。 @cchantep:非常感谢 Benji 的建议,我不知道它看起来很有趣。可悲的是,我不能像 Play / Akka 这样添加太多依赖项,我们实际上是在尝试减少这个项目中的依赖项数量。
标签: scala resource-management autocloseable scala-arm