【问题标题】:Cascading - Group By field name issue级联 - 按字段名称分组问题
【发布时间】:2025-11-27 13:15:02
【问题描述】:

我正在尝试使用 Cascading 读取文件并在特定字段上使用 Group By。

但它返回源文件中的所有行。

源文件:

no,date,amt
1,3/10/2016,1000
1,3/10/2016,2000
1,3/11/2016,400
232,2/10/2016,1500

代码:

Fields tscnFields = new Fields("no", "date", "amt");
FileTap tscnFileTap = new FileTap(new TextDelimited(tscnFields,true, ","),  "C://Users//Test//tscn.txt");

    final Fields groupField = new Fields("date");

    Pipe pipe = new Pipe("test"); 
    pipe = new Each(pipe, new Debug()); 
    pipe = new GroupBy("group by date", pipe, groupField); 
    Fields outFields = new Fields("no", "date", "amt");
    FileTap sinkTap = new FileTap(new TextDelimited(outFields,true, ","), "C://Users//Test//out.txt", SinkMode.REPLACE);         Flow flow = flowConnector.connect("FlowMonitorTest", tscnFileTap, sinkTap, pipe);
    flow.complete();

我得到的输出是

['1', '3/10/2016', '1000']
['1', '3/10/2016', '2000']
['1', '3/11/2016', '400']
['232', '2/10/2016', '1500']
tuples count: 4

【问题讨论】:

  • 你想要什么样的输出?

标签: cascading


【解决方案1】:

如果您想要如下所示的输出

date
2/10/2016
3/10/2016
3/10/2016
3/11/2016

然后您必须检查此代码,该代码将按日期对元组进行分组,并仅返回带有日期字段的元组。以下是代码

import java.util.Properties;

import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.operation.Debug;
import cascading.operation.Identity;
import cascading.pipe.Each;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.local.TextDelimited;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;

public class Main {
    public static void main(String[] args) {
        Tap sourceTap=new FileTap(new TextDelimited(true,","),
                "text");
        Tap sinkTap=new FileTap(new TextDelimited(true,","),
                "output");
        Pipe inputPipe=new Pipe("input_to_group");
        Fields groupField=new Fields("date");
        inputPipe=new Each(inputPipe,new Debug());
        inputPipe=new GroupBy(inputPipe,groupField);
        inputPipe=new Each(inputPipe,new Fields("date"),new Identity());
        Properties properties=new Properties();
        AppProps.setApplicationJarClass(properties, Main.class);
        FlowDef flowDef=FlowDef.flowDef().addSource(inputPipe, sourceTap)
                .addTailSink(inputPipe, sinkTap);
        Flow zodiacFlow=new LocalFlowConnector(properties).connect(flowDef);
        zodiacFlow.complete();
    }
}

希望这个答案能满足您的要求!
谢谢你

【讨论】:

    【解决方案2】:

    是的,显然你得到与输入相同的输出。

    您正在使用字段 'date' 对字段进行分组,但对分组的字段数据不做任何操作,因此您获得的输入与输出相同。

    应该怎么做:在分组数据后的级联中,我们需要调用Buffer来处理分组的数据(Buffer在每个大会)。

    在上面的示例中,您使用字段 'date' 对数据进行分组,因此以下三组将是每个 缓冲区操作的输入:

    no,date,amt
    1,3/10/2016,1000
    1,3/10/2016,2000
    
    no,date,amt
    1,3/11/2016,400
    
    no,date,amt
    232,2/10/2016,1500
    

    现在,通过这些输入,您可以执行更改输入的操作。

    注意: 在 Hadoop Cascading 中,每个数据操作都是在不同的分布式系统中进行的。为了确保在同一台机器上执行操作,我们进行了缓冲区操作(需要分组)。

    【讨论】: