【问题标题】:How can I compose resources in Scala while still closing them correctly with scala-arm?如何在 Scala 中编写资源,同时仍然使用 scala-arm 正确关闭它们?
【发布时间】:2023-04-02 14:33:01
【问题描述】:

我有一个类,它获取一个本地文件,对其进行转换,并将其存储在 GCS 中:

import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }

import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._


class GcsService(gcsStorage: Storage) {

  def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
    val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build

    if (destination.unzipGzip) {
      for (input ← managed(new ZipInputStream(Files.newInputStream(localPath)));
           output ← managed(new GZIPOutputStream(Channels.newOutputStream(gcsStorage.writer(blobInfo))))) {
        ByteStreams.copy(input, output)
      }
    } else if (destination.decompressBzip2) {
      for (input <- managed(new BZip2CompressorInputStream(Files.newInputStream(localPath)));
           output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
        ByteStreams.copy(input, output)
      }
    } else {
      for (input <- managed(Files.newInputStream(localPath));
           output <- managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))) {
        IOUtils.copy(input, output)
      }
    }
  }

}

case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)

我正在尝试删除一些代码重复,特别是 fileInputStreamgcsOutputStream 的创建。但我不能简单地在方法顶部提取这些变量,因为它会在 scala-arm managed 块之外创建资源:

import java.io.{ InputStream, OutputStream }
import java.nio.channels.Channels
import java.nio.file.{ Files, Path }
import java.util.zip.{ GZIPOutputStream, ZipInputStream }

import com.google.cloud.storage.{ BlobInfo, Storage }
import com.google.common.io.ByteStreams
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.commons.io.IOUtils
import resource._


class GcsService(gcsStorage: Storage) {

  def storeFileInGcs(localPath: Path, destination: FileDestination): Unit = {
    val blobInfo = BlobInfo.newBuilder(destination.bucket, destination.path).build

    // FIXME: creates a resource outside of the ARM block
    val fileInputStream = Files.newInputStream(localPath)
    val gcsOutputStream = Channels.newOutputStream(gcsStorage.writer(blobInfo))

    if (destination.unzipGzip) {
      unzipGzip(fileInputStream, gcsOutputStream)
    } else if (destination.decompressBzip2) {
      decompressBzip2(fileInputStream, gcsOutputStream)
    } else {
      copy(fileInputStream, gcsOutputStream)
    }
  }

  private def unzipGzip(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input ← managed(new ZipInputStream(inputStream));
         output ← managed(new GZIPOutputStream(outputStream))) {
      ByteStreams.copy(input, output)
    }
  }

  private def decompressBzip2(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input <- managed(new BZip2CompressorInputStream(inputStream));
         output <- managed(outputStream)) {
      ByteStreams.copy(input, output)
    }
  }

  private def copy(inputStream: InputStream, outputStream: OutputStream): Unit = {
    for (input <- managed(inputStream);
         output <- managed(outputStream)) {
      IOUtils.copy(input, output)
    }
  }
}

case class FileDestination(unzipGzip: Boolean, decompressBzip2: Boolean, bucket: String, path: String)

如您所见,代码更清晰,更可测试,但资源没有被正确处理,因为它们不是“托管”的。例如,如果在创建gcsOutputStream 时抛出异常,fileInputStream 将不会被关闭。

我可能可以使用Google Guava sources and sinks 解决这个问题,但我想知道在 Scala 中是否有更好的方法来解决这个问题,而无需引入 Guava。理想情况下使用标准库,或者 scala-arm 功能,或者甚至在Cats

  • 是否应该将fileInputStreamgcsOutputStream 定义为不接受任何内容并返回流的函数?似乎到处都有() =&gt; InputStream() =&gt; OutputStream 的代码会更冗长?
  • 我是否应该使用多个 scala-arm “托管” 进行理解(一个定义 fileInputStreamgcsOutputStream,另一个在每个子函数中定义)?如果我这样做,“内部”输入流将被关闭两次不是问题吗?
  • 是否有我没有看到的干净且“scalaish”的方法来执行此操作?

【问题讨论】:

  • 你在使用 Scala 2.13 吗?您是否考虑过将scala.util.Using 用于您的部分(全部?)资源管理?
  • 很有趣,我不知道这个新功能!可悲的是,我们仍在使用 Scala 2.12,由于技术债务,迁移到 Scala 2.13 将是一项巨大的努力,所以可能在几个月后......当我们迁移我们的 scala-arm 代码到 Using做。有没有一种干净的方法来解决我试图用Using 做的事情,而scala-arm 不可用?如果是这样,这可能是这个问题的一个有趣的答案,我想我可以使用 Guava 源/接收器,直到我们迁移。
  • 我会推荐使用提供完整资源管理的效果系统。喜欢cats-effect Resourcezio Managed
  • 或者您可以查看一个用于 GCS 的 Scala DSL,它提供流和资源管理,例如 Benji(我是其中的贡献者)
  • @LuisMiguelMejíaSuárez :确实,使用effect system 可能是干净地解决这个问题的最佳方法。我想这意味着从我们当前的“java 风格”转向更“函数式编程风格”。我将研究猫效应/ zio。我真的很想看到一个答案,该答案显示了该方法的代码外观。 @cchantep:非常感谢 Benji 的建议,我不知道它看起来很有趣。可悲的是,我不能像 Play / Akka 这样添加太多依赖项,我们实际上是在尝试减少这个项目中的依赖项数量。

标签: scala resource-management autocloseable scala-arm


【解决方案1】:

你可以这样重构它:

首先,声明托管资源:

val fileInputStream: ManagedResource[InputStream] = managed(Files.newInputStream(localPath))
val gcsOutputStream: ManagedResource[OutputStream] = managed(Channels.newOutputStream(gcsStorage.writer(blobInfo)))

它不会打开这些资源,它只是声明您希望管理这些资源。

然后您可以使用map 将它们包装在所需的装饰器中(例如ZipInputStream):

if (destination.unzipGzip) {
  for (input ← fileInputStream.map(s => new ZipInputStream(s));
       output ← gcsOutputStream.map(s => new GZIPOutputStream(s))) {
    ByteStreams.copy(input, output)
  }
} else if (destination.decompressBzip2) {
  for (input <- fileInputStream.map(s => new BZip2CompressorInputStream(s));
       output <- gcsOutputStream) {
    ByteStreams.copy(input, output)
  }
} else {
  for (input <- fileInputStream;
       output <- gcsOutputStream) {
    IOUtils.copy(input, output)
  }
}

当然ManagedResource[A]只是值,所以你甚至可以将它作为参数传递给方法:

private def unzipGzip(inputStream: Managed[InputStream], outputStream: Managed[OutputStream]): Unit = {
  for (input ← inputStream.map(s => new ZipInputStream(s));
       output ← outputStream.map(s => new GZIPOutputStream(s))) {
    ByteStreams.copy(input, output)
  }
}

【讨论】:

  • 这是一个有趣的方法,但是“包装”ZipInputStream/GZIPOutputStream/BZip2CompressorInputStream 很遗憾不会关闭,只有内部的fileInputStream/gcsOutputStream 。这是因为 scala-arm map() 方法不会“管理”映射操作的结果(可能是任何东西,并不总是包装资源)。如果包装资源是无状态的,这不是问题,但遗憾的是,情况并非如此(例如,我们需要关闭ZipInputStream's inflater 以关闭关联的本机 ZLIB 资源)。
  • 因此行为类似于在顶部有一个“管理”fileInputStream / gcsOutputStream 的单一理解,然后将这些流包装在子方法中而不使用额外的 @ 987654336@ for-comprehensions.
  • @EtienneNeveu 也许可以用 flatMap 完成,我会检查并更新我的答案。
猜你喜欢
  • 2014-01-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-11-27
  • 2011-08-12
  • 1970-01-01
  • 2017-09-18
相关资源
最近更新 更多