【发布时间】:2025-12-16 04:40:01
【问题描述】:
我目前正在研究 Apache Spark。我已经为 Apache Hadoop 实现了一个自定义 InputFormat,它通过 TCP 套接字读取键值记录。我想将此代码移植到 Apache Spark 并将其与 hadoopRDD() 函数一起使用。我的 Apache Spark 代码如下:
public final class SparkParallelDataLoad {
public static void main(String[] args) {
int iterations = 100;
String dbNodesLocations = "";
if(args.length < 3) {
System.err.printf("Usage ParallelLoad <coordinator-IP> <coordinator-port> <numberOfSplits>\n");
System.exit(1);
}
JobConf jobConf = new JobConf();
jobConf.set(CustomConf.confCoordinatorIP, args[0]);
jobConf.set(CustomConf.confCoordinatorPort, args[1]);
jobConf.set(CustomConf.confDBNodesLocations, dbNodesLocations);
int numOfSplits = Integer.parseInt(args[2]);
CustomInputFormat.setCoordinatorIp(args[0]);
CustomInputFormat.setCoordinatorPort(Integer.parseInt(args[1]));
SparkConf sparkConf = new SparkConf().setAppName("SparkParallelDataLoad");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaPairRDD<LongWritable, Text> records = sc.hadoopRDD(jobConf,
CustomInputFormat.class, LongWritable.class, Text.class,
numOfSplits);
JavaRDD<LabeledPoint> points = records.map(new Function<Tuple2<LongWritable, Text>, LabeledPoint>() {
private final Log log = LogFactory.getLog(Function.class);
/**
*
*/
private static final long serialVersionUID = -1771348263117622186L;
private final Pattern SPACE = Pattern.compile(" ");
@Override
public LabeledPoint call(Tuple2<LongWritable, Text> tuple)
throws Exception {
if(tuple == null || tuple._1() == null || tuple._2() == null)
return null;
double y = Double.parseDouble(Long.toString(tuple._1.get()));
String[] tok = SPACE.split(tuple._2.toString());
double[] x = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
if(tok[i].isEmpty() == false)
x[i] = Double.parseDouble(tok[i]);
}
return new LabeledPoint(y, Vectors.dense(x));
}
});
System.out.println("Number of records: " + points.count());
LinearRegressionModel model = LinearRegressionWithSGD.train(points.rdd(), iterations);
System.out.println("Model weights: " + model.weights());
sc.stop();
}
}
在我的项目中,我还必须决定哪个 Spark Worker 将连接到哪个数据源(类似于具有 1:1 关系的“匹配”过程)。因此,我创建了数量等于数据源数量的InputSplits,以便我的数据并行发送到SparkContext。我的问题如下:
方法
InpuSplit.getLength()的结果是否影响RecordReader返回的记录数?详细来说,我在测试运行中看到一个 Job 在只返回一条记录后结束,这只是因为我有一个从CustomInputSplit.getLength()函数返回的值 0。-
在 Apache Spark 上下文中,至少在执行
records.map()函数调用时,worker 的数量是否等于我的InputFormat产生的InputSplits的数量?
上述问题 2 的答案对我的项目非常重要。
谢谢你, 尼克
【问题讨论】:
-
我认为您在问题的许多地方都使用了
InputSplit.getLength,而您的意思是InputFormat.getSplits。抱歉,如果我感到困惑。 -
我有类似的东西,我的示例项目在 github 上,你可以在这里查看。 github.com/animeshj/bigdata
标签: hadoop apache-spark