【问题标题】:sum the every n number of row values in pig对 pig 中每 n 个行值求和
【发布时间】:2017-06-01 10:20:10
【问题描述】:

我有这样的数据。

 1:23:0.20
 2:34:0.50
 3:67:0.90
 4:87:0.10
 5:23:0.12

我正在尝试像这样对每 2 行最后一列的值求和。

0.20+0.50 = 0.70
0.90+0.10 = 1.0

然后像这样打印

 1:23:0.20:0.70
 2:34:0.50:0.70
 3:67:0.90:1.0
 4:87:0.10:1.0
 5:23:0.12

这是我的猪脚本

 data = LOAD '/home/user/Documents/test/test.txt' using PigStorage(':') AS (tag:int,rssi:chararray,weightage:chararray,seqnum:int);
B = FOREACH (GROUP data ALL) {
A_ordered = ORDER data BY rssi;
GENERATE FLATTEN(CUSTOM_UDF(A_ordered));
}

我尝试使用 java UDF。但不能正常工作。

this is what I tried.

public List<String> sumValues() {
    List<String> processedList = new ArrayList<>();
    if (entries == null) {
        return processedList;
    } else {
        double columnSum = 0;
        List<String> tempList = new ArrayList<>(); 
        int length = entries.size();
        for (int index = 1; index <= length; index++) {
            tempList.add(entries.get(index - 1)); 
            String[] splitValues = entries.get(index - 1).split(DELIMITER);
            if (splitValues.length >= MIN_SPLIT_STRING_LENGTH) {

                try {
                    double lastValue = Double.parseDouble(splitValues[WEIGHTAGE_INDEX]);
                    columnSum = columnSum + lastValue;

                    if ((index % ROWS_TO_BE_SUMMED == 0) || (index == length)) {
                        for (String tempString : tempList) {
                            processedList.add(tempString + ":" + columnSum);
                        }
                        tempList.clear(); // Clear the temporary array
                        columnSum = 0;
                    }
                } catch (NumberFormatException e) {
                    System.out.println("Invalid weightage");
                }
            } else {
                System.out.println("Invalid input");
            }
        }
    }
    return processedList;
}


@Override
public String exec(Tuple input) throws IOException {
    System.out.println("------INSIDE EXEC FUCTION ----" + input);
    if (input != null && input.size() != 0) {
        try {
            String str = (String) input.get(0);
            if (str != null) {
                String splitStrings[] = str.split(":");
                if (splitStrings != null && splitStrings.length >= 3 && splitStrings[2].equals(EXIT)) {
                    List<String> processedList = sumValues();
                    String sum = processedList.toString();
                    System.out.println("SUM VALUE----:" + sum);
                    return sum;
                } else {
                    System.out.println("INPUT VALUE----:" + str);
                    entries.add(str);
                    return null;
                }
            }
        } catch (Exception e) {
            return null;
        }
    }
    return null;
}
}

上面的代码打印空结果。 任何帮助将不胜感激。

【问题讨论】:

    标签: java hadoop apache-pig bigdata


    【解决方案1】:

    这可以在 PIG 本身中完成。根据数据集中的偶数行生成另一列,例如 f11,并从中减去 1 以创建具有相同 id 的 2 行的集合。这将允许您将这两条记录分组到新的列并对最后一列求和。然后将新集合与关系加入并获得所需的列。

    注意:对于 n 行求和,使用 f1%n_value。

    A = LOAD 'input.txt' USING PigStorage(':') AS (f1:int,f2:int,f3:double);
    B = FOREACH A GENERATE f1,(f1%2 == 0 ? (f1-1):f1) AS f11,f2,f3;
    C = GROUP B BY f11;
    D = FOREACH C GENERATE group AS f11,SUM(f3) AS Total;
    E = JOIN B BY f11,D BY f11;
    F = FOREACH E GENERATE B.f1,B.f2,B.f3,D.Total;-- Note:use B::f1,B::f2,B::f3,D::Total if '.' doesn't work.
    

    输出

    一个

    1,23,0.20
    2,34,0.50
    3,67,0.90
    4,87,0.10
    5,23,0.12
    

    B - 根据偶数行号添加新的第二列 - 1。

    1,1,23,0.20
    2,1,34,0.50
    3,3,67,0.90
    4,3,87,0.10
    5,5,23,0.12
    

    C - 按新的第二列分组

    1,{(1,23,0.20),(2,34,0.50)}
    3,{(3,67,0.90),(4,87,0.10)}
    5,{(5,23,0.12)}
    

    D - 分组后生成总和

    1,0.70
    3,1.0
    5,0.12
    

    E - 使用新列将上一步中的数据集与 B 连接

    1,1,23,0.20,1,0.70
    2,1,34,0.50,1,0.70
    3,3,67,0.90,3,1.0
    4,3,87,0.10,3,1.0
    5,5,23,0.12,5,0.12
    

    E - 获取所需的列。

    1,23,0.20,0.70
    2,34,0.50,0.70
    3,67,0.90,1.0
    4,87,0.10,1.0
    5,23,0.12,0.12
    

    【讨论】:

      【解决方案2】:

      在您的 udf 中,您收到 tuple(int, chararray, chararray, int) 并尝试获取第一个元素为 String。当您用try...catch 包围代码时,您看不到肯定出现在那里的ClassCastException。 Aso 您不需要按: 拆分值,因为您已经加载了它拆分。

      【讨论】:

      • 不,它只打印空结果
      猜你喜欢
      • 2015-10-19
      • 2020-04-03
      • 2013-07-13
      • 2017-02-18
      • 1970-01-01
      • 1970-01-01
      • 2021-06-24
      • 2013-02-22
      • 2017-11-16
      相关资源
      最近更新 更多