【问题标题】:How to cast a string in Apache Flink Datastream in Scala?如何在 Scala 的 Apache Flink Datastream 中转换字符串?
【发布时间】:2021-05-24 21:02:00
【问题描述】:

我正在编写一个 Scala 脚本,用于使用 Datastream API 在 Apache Flink 中处理 csv 文件。 我需要将格式固定到某些列,然后将它们转换为正确的类型。

我当前的代码是这样的:

package org.myorg.tareac

import org.apache.flink.api.scala._
import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction}

object BatchJob {

  def main(args: Array[String]) {
    // set up the batch execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    
    val inputPath = "file:////home/almu/Practicas_BigData/Tarea_Flink_B/RIA_exportacion_datos_diarios_Huelva_20140206.csv"
    
    val input = env.readCsvFile[(String, String, String, String, String, String,
                                 String, String, String, String, String, String,
                                 String, String, String, String, String, String)](inputPath, fieldDelimiter=";", ignoreFirstLine=true)


    input.print()
    
    val casted_data = input.flatMap((IDPROVINCIA: String, SPROVINCIA: String, IDESTACION: String, SESTACION: String,
                                     FECHA: String, ANIO: String, TEMPMAX: String, HORMINTEMPMAX: String, TEMPMIN: String,
                                     HORMINTEMPMIN: String, TEMPMEDIA: String, HUMEDADMAX: String, HUMEDADMIN: String,
                                     HUMEDADMEDIA: String, VELVIENTO: String, DIRVIENTO: String, RADIACION: String, PRECIPITACION: String) => {
    
                                  IDESTACION.replace("\"", "").cast(Types.Int);
                                  SESTACION.replace("\"", "");
                                  FECHA.substring(6,9).cast(Int);
                                  RADIACION.replace(",", ".").replace("", 0).cast(Double);
                                  PRECIPITACION.replace(",", ".").replace("", 0).cast(Double) 
                                 })
                                 

    // execute program
    env.execute("Flink Batch CSV Scala Processing")
  }
}

但是,当我执行mvn clean package 时,我得到了这个错误:

[ERROR] /home/almu/Practicas_BigData/Tarea_Flink_B/tareac/src/main/scala/batch/ProcessFileBatch.scala:54: error: value cast is not a member of String
[ERROR]                                   IDESTACION.replace("\"", "").cast(Types.Int);
[ERROR]                                                                ^
[ERROR] one error found

我怎样才能正确地进行演员表?

【问题讨论】:

    标签: scala apache-flink data-stream


    【解决方案1】:

    文件内容:

    Jack,12,num_123,北京

    代码:

      val input = env.readCsvFile[(String, String, String, String)](inputPath, fieldDelimiter = ",", ignoreFirstLine = false)
    
        input
          .map((value: (String, String, String, String)) => {
            (value._1, value._2.toInt, value._3.substring(value._3.indexOf("_") + 1).toInt)
          })
          .print()
    

    结果:

    (Jack,12,123)
    

    【讨论】:

      【解决方案2】:

      .cast(Types.Int) 替换为.toInt

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2018-09-12
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多