【问题标题】:Read a file from google storage in dataproc从 dataproc 中的谷歌存储中读取文件
【发布时间】:2021-02-24 18:34:55
【问题描述】:

我正在尝试将 scala spark 作业从 hadoop 集群迁移到 GCP,我有这段代码可以读取文件并创建 ArrayBuffer[String]

    import java.io._
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.fs.FSDataInputStream

    val filename="it.txt.1604607878987"
    val fs = FileSystem.get(new Configuration())
    val dataInputStream: FSDataInputStream = fs.open(new Path(filename))
    val sourceEDR=new BufferedReader(new InputStreamReader(dataInputStream, "UTF-8")); }
    val outputEDRFile = ArrayBuffer[String]()
    buffer = new Array[Char](300)
    var num_of_chars = 0
    while (sourceEDR.read(buffer) > -1) {
        val str = new String(buffer)
        num_of_chars += str.length
        outputEDRFile += (str + "\n");}
        println(num_of_chars)
        

此代码在集群中运行并给我 3025000 个字符,我尝试在 dataproc 中运行此代码:

       val path_gs = new Path("gs://my-bucket")
       val filename="it.txt.1604607878987"

       val fs = path_gs.getFileSystem(new Configuration())        
       val dataInputStream: FSDataInputStream = fs.open(new Path(filename))
       val sourceEDR =new BufferedReader(new InputStreamReader(dataInputStream, "UTF-8")); }

       val outputEDRFile = ArrayBuffer[String]()
       buffer = new Array[Char](300)
       var num_of_chars = 0
       while (sourceEDR.read(buffer) > -1) {
       val str = new String(buffer)
       num_of_chars += str.length
       outputEDRFile += (str + "\n");}
       println(num_of_chars)

它给出了 3175025 个字符,我认为文件内容中添加了空格,或者我必须使用另一个接口从 dataproc 中的谷歌存储中读取文件? 我也尝试了其他编码选项,但它给出了相同的结果。 有什么帮助吗?

【问题讨论】:

  • 您可能需要先阅读 Scala 教程以了解如何使用(而不仅仅是模仿 Java 中的操作方式,这会导致丑陋和错误的 Scala)
  • 您可以比较 Spark 打印的两个文件的内容而不是比较长度吗?是编码不同还是最后缺少一些字符?

标签: java hadoop google-cloud-platform google-cloud-storage google-cloud-dataproc


【解决方案1】:

我没有找到使用缓冲区的解决方案,因此我尝试逐个字符地读取字符,这对我有用:

var i = 0
var r=0
val response = new StringBuilder
while ( ({r=sourceEDR.read(); r} != -1)) {
  val ch= r.asInstanceOf[Char]
  if(response.length < 300) { response.append(ch)}
  else {  val str = response.toString().replaceAll("[\\r\\n]", " ")
    i += str.length
    outputEDRFile += (str + "\n");
    response.setLength(0)
    response.append(ch)
  }
}
val str = response.toString().replaceAll("[\\r\\n]", " ")
i += str.length
outputEDRFile += (str + "\n");

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-30
    • 2022-06-17
    • 2021-07-30
    • 2020-12-06
    • 2023-01-30
    • 1970-01-01
    相关资源
    最近更新 更多