【问题标题】:Google Cloud Dataflow issue with writing the data (TextIO or DatastoreIO)写入数据的 Google Cloud Dataflow 问题(TextIO 或 DatastoreIO)
【发布时间】:2017-06-05 02:29:17
【问题描述】:

好的,各位。来自 Dataflow 新手的另一个 Dataflow 问题。 (这周刚开始玩..)

我正在创建一个数据管道来接收产品名称列表并生成自动完成数据。数据处理部分似乎一切正常,但我遗漏了一些明显的东西,因为当我添加最后一个“.apply”以使用 DatastoreIO 或 TextIO 写出数据时,我在我的显示以下内容的 IDE:

“类型 ParDo.SingleOutput>,Entity> 的方法 apply(DatastoreV1.Write) 未定义”

If 给了我一个选项,向方法接收器添加强制转换,但这显然不是答案。在尝试写出数据之前,我是否需要执行其他步骤?在尝试写入数据之前,我的最后一步是调用 Dataflow 的实体助手,将我的管道结构从 > 更改为 ,在我看来,这就像我需要写入 Datastore 的内容。

过去几天我对这件事感到非常沮丧,我什至决定将数据写入一些 AVRO 文件,这样我就可以手动将其加载到 Datastore 中。想象一下,当我完成所有工作并在调用 TextIO 时在完全相同的位置出现完全相同的错误时,我是多么的兴奋。这就是为什么我认为我必须在这里遗漏一些非常明显的东西。

这是我的代码。我将所有内容都包括在内以供参考,但您可能只需要查看底部的 main[]。任何投入将不胜感激!谢谢!

西蒙斯先生

package com.client.autocomplete;

import com.client.autocomplete.AutocompleteOptions;


import com.google.datastore.v1.Entity;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.Value;

import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
import org.apache.beam.sdk.coders.DefaultCoder;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.base.MoreObjects;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.extensions.jackson.ParseJsons;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;

/*
 * A simple Dataflow pipeline to create autocomplete data from a list of
 * product names. It then loads that prefix data into Google Cloud Datastore for consumption by
 * a Google Cloud Function. That function will take in a prefix and return a list of 10 product names
 * 
 * Pseudo Code Steps
 * 1. Load a list of product names from Cloud Storage
 * 2. Generate prefixes for use with autocomplete, based on the product names
 * 3. Merge the prefix data together with 10 products per prefix
 * 4. Write that  prefix data to the Cloud Datastore as a KV with a <String>, List<String> structure
 * 
 */

public class ClientAutocompletePipeline {
    private static final Logger LOG = LoggerFactory.getLogger(ClientAutocompletePipeline.class);


    /**
     * A DoFn that keys each product name by all of its prefixes.
     * This creates one row in the PCollection for each prefix<->product_name pair
     */
    private static class AllPrefixes
    extends DoFn<String, KV<String, String>> {
        private final int minPrefix;
        private final int maxPrefix;

        public AllPrefixes(int minPrefix) {
            this(minPrefix, 10);
        }

        public AllPrefixes(int minPrefix, int maxPrefix) {
            this.minPrefix = minPrefix;
            this.maxPrefix = maxPrefix;
        }
        @ProcessElement
        public void processElement(ProcessContext c) {
            String productName= c.element().toString();
            for (int i = minPrefix; i <= Math.min(productName.length(), maxPrefix); i++) {
                c.output(KV.of(productName.substring(0, i), c.element()));
            }
        }
    }

    /**
     * Takes as input the top product names per prefix, and emits an entity
     * suitable for writing to Cloud Datastore.
     *
     */
    static class FormatForDatastore extends DoFn<KV<String, List<String>>, Entity> {
        private String kind;
        private String ancestorKey;

        public FormatForDatastore(String kind, String ancestorKey) {
            this.kind = kind;
            this.ancestorKey = ancestorKey;
        }

        @ProcessElement
        public void processElement(ProcessContext c) {
            // Initialize an EntityBuilder and get it a valid key
            Entity.Builder entityBuilder = Entity.newBuilder();
            Key key = makeKey(kind, ancestorKey).build();
            entityBuilder.setKey(key);

            // New HashMap to hold all the properties of the Entity
            Map<String, Value> properties = new HashMap<>();
            String prefix = c.element().getKey();
            String productsString = "Products[";

            // iterate through the product names and add each one to the productsString
            for (String productName : c.element().getValue()) {
                // products.add(productName);
                productsString += productName + ", ";
            }
            productsString += "]";

            properties.put("prefix", makeValue(prefix).build());            
            properties.put("products", makeValue(productsString).build());
            entityBuilder.putAllProperties(properties);
            c.output(entityBuilder.build());
        }
    }


    /**
     * Options supported by this class.
     *
     * <p>Inherits standard Beam example configuration options.
     */
    public interface Options
    extends AutocompleteOptions {
        @Description("Input text file")
        @Validation.Required
        String getInputFile();
        void setInputFile(String value);

        @Description("Cloud Datastore entity kind")
        @Default.String("prefix-product-map")
        String getKind();
        void setKind(String value);

        @Description("Whether output to Cloud Datastore")
        @Default.Boolean(true)
        Boolean getOutputToDatastore();
        void setOutputToDatastore(Boolean value);

        @Description("Cloud Datastore ancestor key")
        @Default.String("root")
        String getDatastoreAncestorKey();
        void setDatastoreAncestorKey(String value);

        @Description("Cloud Datastore output project ID, defaults to project ID")
        String getOutputProject();
        void setOutputProject(String value);
    }


    public static void main(String[] args)  throws IOException{

        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

        //  create the pipeline  
        Pipeline p = Pipeline.create(options);

        PCollection<String> toWrite = p

            // A step to read in the product names from a text file on GCS
            .apply(TextIO.read().from("gs://sample-product-data/clean_product_names.txt"))

            // Next expand the product names into KV pairs with prefix as key (<KV<String, String>>)
            .apply("Explode Prefixes", ParDo.of(new AllPrefixes(2)))

            // Apply a GroupByKey transform to the PCollection "flatCollection" to create "productsGroupedByPrefix".
            .apply(GroupByKey.<String, String>create())

            // Now format the PCollection for writing into the Google Datastore
            .apply("FormatForDatastore", ParDo.of(new FormatForDatastore(options.getKind(),
                    options.getDatastoreAncestorKey())) 

            // Write the processed data to the Google Cloud Datastore
            // NOTE: This is the line that I'm getting the error on!!
            .apply(DatastoreIO.v1().write().withProjectId(MoreObjects.firstNonNull(
                    options.getOutputProject(), options.getOutputProject()))));

        // Run the pipeline.
        PipelineResult result = p.run();
    }
}

【问题讨论】:

    标签: java google-cloud-datastore google-cloud-dataflow


    【解决方案1】:

    我认为您需要另一个右括号。我已经删除了一些无关的位并根据括号重新缩进:

    PCollection<String> toWrite = p
        .apply(TextIO.read().from("..."))
        .apply("Explode Prefixes", ...)
        .apply(GroupByKey.<String, String>create())
        .apply("FormatForDatastore", ParDo.of(new FormatForDatastore(
          options.getKind(), options.getDatastoreAncestorKey()))
            .apply(...);
    

    具体来说,您需要另一个括号来关闭apply("FormatForDatastore", ...)。现在,它正在尝试调用 ParDo.of(...).apply(...),但它不起作用。

    【讨论】:

    • 非常感谢您的意见,您是说我需要在options.getDatastoreAncestorKey())) 之后添加一个近括号??如果是这样,当我这样做时,我会得到一个红色的语法错误,突出显示我刚刚添加的整个应用程序告诉我:Type mismatch: cannot convert from ParDo.SingleOutput&lt;KV&lt;String,List&lt;String&gt;&gt;,Entity&gt; to PTransform&lt;? super PCollection&lt;KV&lt;String,Iterable&lt;String&gt;&gt;&gt;,OutputT&gt;
    • 是的,您需要添加该括号。后面的错误表明您有一个转换,它需要KV&lt;String, List&lt;String&gt;&gt; 的集合作为输入,但给了它KV&lt;String, Iterable&lt;String&gt;&gt; 的集合。 FormatForDatastore DoFn 的类型应该扩展 DoFn&lt;KV&lt;String, Iterable&lt;String&gt;&gt;, Entity&gt;
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-23
    • 2017-12-14
    • 2019-02-18
    相关资源
    最近更新 更多