【问题标题】:Using broadcast variables in apache spark在 apache spark 中使用广播变量
【发布时间】:2018-04-27 00:45:42
【问题描述】:

在 CoGroupFunction 中使用广播变量时​​出现错误。如果注释了appProvider.value(),错误就会消失。你知道如何解决这个问题吗?是与变量定义或初始化有关的错误吗?

public class UsageJobDS implements Serializable{
  private static final Logger log = org.apache.log4j.LogManager.getLogger("myLogger");

  Broadcast<Provider> appProvider;

  void init(){
    // init broadcast variable
    ....
  }
  public static void main(String[] args) {
    UsageJobDS ujb = new UsageJobDS();
    ujb.init();
    ujb.run();
  }

  void run(){
    KeyValueGroupedDataset<Long, Row> charges = usageCharges.groupByKey(x -> x.getLong(x.fieldIndex("si__subscription_id")), Encoders.LONG());

    Dataset<ProcessEdr> cogg = edrs.cogroup(charges, rateEDRs, Encoders.bean(ProcessEdr.class));

    log.warn("Count cogg " + cogg.count());
  }

  CoGroupFunction<Long, EDR2, Row, ProcessEdr> rateEDRs = (subscription_id, edrsIter, chargesIter) -> {
        Logger log = org.apache.log4j.LogManager.getLogger("myLogger");
        log.warn("inside rateEDRs function");

        while (edrsIter.hasNext()) {
            appProvider.value(); // HERE
        }
        return results.iterator();
    };

}

我收到了这个错误

java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.opencell.spark.jobs.UsageJobDS.rateEDRs of type org.apache.spark.api.java.function.CoGroupFunction in instance of org.opencell.spark.jobs.UsageJobDS
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)

【问题讨论】:

    标签: java apache-spark lambda apache-spark-sql spark-dataframe


    【解决方案1】:

    实际上,如果将 cogroup 函数定义更改为以下代码,则它可以工作。但是,错误原因仍然未知。

            Dataset<ProcessEdr> cogg = edrs.cogroup(charges, (subscription_id, edrsIter, chargesIter) -> {
                ArrayList<ProcessEdr> results = new ArrayList<>();
    
                System.out.println("App Provider name" + appProvider.value().getIssuer_name());
    
                return results.iterator();
            }, Encoders.bean(ProcessEdr.class)); 
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-01-30
      • 1970-01-01
      • 2018-03-11
      • 1970-01-01
      • 2016-08-18
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多