【问题标题】:Multiple column join using Apache beam使用 Apache Beam 进行多列连接
【发布时间】:2018-07-19 05:42:38
【问题描述】:

我有两个文件 prices_1.txt(源)和 products_1.txt(目标)。我已经成功编写了一个程序,可以在两侧加入单列。但现在我想加入多个列,但不知道如何使用 apache-beam 来做到这一点。 下面给出了单连接程序以及我正在使用的数据文件。 下面的程序对两个文件中的 SLID 列执行连接。如何加入 SLID、PRODID ? 请指导我完成此操作。

prices_1.txt

SLID,PRODID,REGIONID,STDPRICE,MINPRICE,STARTDATE,ENDDATE
9,100860,101,130,124,2002-01-01,2002-12-31
4,100860,102,132,125.6,2002-01-01,2003-05-31
7,100860,103,135,128,2003-06-01,
11,100861,105,239,231.2,2002-01-01,2002-12-31
2,100861,107,242,233.6,2003-01-01,2003-05-31
6,100861,106,245,236,2003-06-01,
4,100870,104,122.8,122.4,2003-01-01,
3,100871,101,154,153.2,2002-01-01,2002-12-31
1,100890,108,445,440.5,2003-06-01,2003-07-31
5,100890,105,449.7,446.4,2002-01-01,
10,101863,102,98.0,99.1,2002-04-01,2003-03-15
8,102130,103,178.9,182.5,2002-07-01,2003-04-12

products_1.txt

SLID,PRODID,NAME

4,100860,"Motherboard"

2,100861,"Flat Monitor"

3,100870,"Processor 5 GHZ"

1,100871,"Printer"

8,100890,"Digital Camera"

11,101860,"Memory Card 1GB"

9,101863,"Video Accelerator"

10,102130,"Scanner"

6,200376,"Network card"

7,200380,"Flash card"

5,300001,"LCD Monitor"

12,10987,"Mouse"

程序代码

public class JoinExample {
public static void main(String[] args) throws Exception
    {
        long start = System.currentTimeMillis();
        PipelineOptions options = PipelineOptionsFactory.create().as(HadoopFileSystemOptions.class);
        options.setRunner(SparkRunner.class);
        Pipeline pipeline = Pipeline.create(options);

        PCollection<String> prices = pipeline.apply(TextIO.read().from("/home/ICEStore/apachebeam/prices_1.txt"));
        PCollection<String> products = pipeline.apply(TextIO.read().from("/home/ICEStore/apachebeam/products_1.txt"));
        PCollection<String> formattedResults = joinEvents(prices , products);
        formattedResults.apply(TextIO.write().to("/home/ICEStore/apachebeam/temp/join").withoutSharding());

        pipeline.run().waitUntilFinish();
        long end = System.currentTimeMillis();
        System.out.println("+++++++++++++++++++++++++++++++++++++++++++++++++++++++Groovy End :: "+TimeUnit.MILLISECONDS.toSeconds(end-start));
    }

    static PCollection<String> joinEvents(PCollection<String> prices,PCollection<String> products) throws Exception 
    {

        final TupleTag<String> priceInfoTag = new TupleTag<String>();
        final TupleTag<String> productInfoTag = new TupleTag<String>();

        PCollection<KV<String, String>> pricesInfo = prices.apply(ParDo.of(new ExtractPricesDataFn()));
        PCollection<KV<String, String>> productsInfo = products.apply(ParDo.of(new ExtractProductsDataFn()));

        PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
                .of(priceInfoTag, pricesInfo)
                .and(productInfoTag, productsInfo)
                .apply(CoGroupByKey.<String>create());

        PCollection<KV<String, String>> finalResultCollection =
                kvpCollection.apply(ParDo.of(
                        new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) {
                                KV<String, CoGbkResult> e = c.element();

                                Iterator<String> iter1 = e.getValue().getAll(priceInfoTag).iterator();
                                int lhsCount = 0;
                                Iterator<String> iter2 = e.getValue().getAll(productInfoTag).iterator();

                                c.output(KV.of(e.getKey(), (iter1.next() + iter2.next())));
                            }
                        }));

        PCollection<String> formattedResults = finalResultCollection
                .apply(ParDo.of(new DoFn<KV<String, String>, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        String outputstring = "Country code: " + c.element().getKey()
                                + ", " + c.element().getValue();
                        c.output(c.element().getValue());
                    }
                }));
        return formattedResults;
        }
        static class ExtractPricesDataFn extends DoFn<String, KV<String, String>> {
        @ProcessElement
        public void processElement(ProcessContext context) throws Exception {
            String[] row = context.element().split(",");

            context.output(KV.of(row[0], context.element()));

        }
    }

    static class ExtractProductsDataFn extends DoFn<String, KV<String, String>> {
        @ProcessElement
        public void processElement(ProcessContext context) throws Exception {
            String[] row = context.element().split(",");

            context.output(KV.of(row[0], context.element()));

        }
    }
}

【问题讨论】:

  • 从您的问题中,我了解到您想同时加入“SLID”和“PRODID”,对吗?您可以编辑帖子以获得“所需”的输出吗?

标签: google-cloud-dataflow apache-beam


【解决方案1】:

解决问题的简单方法是在 ExtractProductsDataFnExtractPricesDataFn 中添加两个键

以下示例:

static class ExtractPricesDataFn extends DoFn<String, KV<String, String>> {
    @ProcessElement
    public void processElement(ProcessContext context) throws Exception {
        String[] row = context.element().split(",");
        context.output(KV.of(row[0] + "," + row[1], context.element()));
    }  

static class ExtractProductsDataFn extends DoFn<String, KV<String, String>> {  
    @ProcessElement
    public void processElement(ProcessContext context) throws Exception {
        String[] row = context.element().split(",");

        context.output(KV.of(row[0] + "," + row[1], context.element()));

    }
}

【讨论】:

  • 感谢回复,多数据类型使用会失败
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-05-14
  • 1970-01-01
  • 2021-04-13
  • 2013-01-06
  • 1970-01-01
相关资源
最近更新 更多