【问题标题】:Apache Spark with custom InputFormat for HadoopRDDApache Spark 与 HadoopRDD 的自定义 InputFormat
【发布时间】:2025-12-16 04:40:01
【问题描述】:

我目前正在研究 Apache Spark。我已经为 Apache Hadoop 实现了一个自定义 InputFormat,它通过 TCP 套接字读取键值记录。我想将此代码移植到 Apache Spark 并将其与 hadoopRDD() 函数一起使用。我的 Apache Spark 代码如下:

public final class SparkParallelDataLoad {

    public static void main(String[] args) {
        int iterations = 100;
        String dbNodesLocations = "";
        if(args.length < 3) {
            System.err.printf("Usage ParallelLoad <coordinator-IP> <coordinator-port> <numberOfSplits>\n");
            System.exit(1);
        }
        JobConf jobConf = new JobConf();
        jobConf.set(CustomConf.confCoordinatorIP, args[0]);
        jobConf.set(CustomConf.confCoordinatorPort, args[1]);
        jobConf.set(CustomConf.confDBNodesLocations, dbNodesLocations);

        int numOfSplits = Integer.parseInt(args[2]);

        CustomInputFormat.setCoordinatorIp(args[0]);
        CustomInputFormat.setCoordinatorPort(Integer.parseInt(args[1]));

        SparkConf sparkConf = new SparkConf().setAppName("SparkParallelDataLoad");

        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        JavaPairRDD<LongWritable, Text> records = sc.hadoopRDD(jobConf, 
                CustomInputFormat.class, LongWritable.class, Text.class, 
                numOfSplits);

        JavaRDD<LabeledPoint> points = records.map(new Function<Tuple2<LongWritable, Text>, LabeledPoint>() {

            private final Log log = LogFactory.getLog(Function.class);
            /**
             * 
             */
            private static final long serialVersionUID = -1771348263117622186L;

            private final Pattern SPACE = Pattern.compile(" ");
            @Override
            public LabeledPoint call(Tuple2<LongWritable, Text> tuple)
                    throws Exception {
                if(tuple == null || tuple._1() == null || tuple._2() == null)
                    return null;
                double y = Double.parseDouble(Long.toString(tuple._1.get()));
                String[] tok = SPACE.split(tuple._2.toString());
                double[] x = new double[tok.length];
                for (int i = 0; i < tok.length; ++i) {
                    if(tok[i].isEmpty() == false)
                        x[i] = Double.parseDouble(tok[i]);
                }
                return new LabeledPoint(y, Vectors.dense(x));
            }

        });

        System.out.println("Number of records: " + points.count());
        LinearRegressionModel model = LinearRegressionWithSGD.train(points.rdd(), iterations);
        System.out.println("Model weights: " + model.weights());

        sc.stop();
    }
}

在我的项目中,我还必须决定哪个 Spark Worker 将连接到哪个数据源(类似于具有 1:1 关系的“匹配”过程)。因此,我创建了数量等于数据源数量的InputSplits,以便我的数据并行发送到SparkContext。我的问题如下:

  1. 方法InpuSplit.getLength() 的结果是否影响RecordReader 返回的记录数?详细来说,我在测试运行中看到一个 Job 在只返回一条记录后结束,这只是因为我有一个从 CustomInputSplit.getLength() 函数返回的值 0。

  2. 在 Apache Spark 上下文中,至少在执行 records.map() 函数调用时,worker 的数量是否等于我的 InputFormat 产生的 InputSplits 的数量?

上述问题 2 的答案对我的项目非常重要。

谢谢你, 尼克

【问题讨论】:

  • 我认为您在问题的许多地方都使用了InputSplit.getLength,而您的意思是InputFormat.getSplits。抱歉,如果我感到困惑。
  • 我有类似的东西,我的示例项目在 github 上,你可以在这里查看。 github.com/animeshj/bigdata

标签: hadoop apache-spark


【解决方案1】:

是的。Spark 的sc.hadoopRDD 将创建一个具有InputFormat.getSplits 报告的分区数的RDD。

hadoopRDD 的最后一个参数称为minPartitions(代码中的numOfSplits)将用作InputFormat.getSplits 的提示。但是getSplits返回的数字不管是大是小都会被尊重。

查看代码https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L168

【讨论】:

  • 感谢您的回答。您是否还知道从InputSplits 读取数据是否并行发生?换个方式表达上一个问题:RDD 是否会通过从数据源中检索数据来并行创建?
  • 将为每个拆分创建一个任务。每个工作 CPU 核心拾取一项任务。如果您有足够的工作核心,所有拆分都将并行完成。否则,每个核心将进行一次拆分,然后在完成后拾取更多工作,直到完成所有任务。
  • 好的,我明白了。感谢您的宝贵时间和帮助。