【问题标题】:Sending PubSub message manually in Dataflow在 Dataflow 中手动发送 PubSub 消息
【发布时间】:2019-01-04 14:37:56
【问题描述】:

如何在 Dataflow 中手动发送 PubSub 消息(即不使用 PubsubIO)?

导入(通过 Maven)google-cloud-dataflow-java-sdk-all 2.5.0 已经导入了 com.google.pubsub.v1 的一个版本,我无法找到一种简单的方法来向 Pubsub 主题发送消息(例如,这个版本不允许操纵 @ 987654324@个实例,这是官方文档中描述的方式)。

【问题讨论】:

    标签: java google-cloud-dataflow publish-subscribe apache-beam google-cloud-pubsub


    【解决方案1】:

    你会考虑使用PubsubUnboundedSink吗?快速示例:

    import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.transforms.Create;
    import org.apache.beam.sdk.values.PCollection;
    
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
    import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
    
    
    public class PubsubTest {
    
        public static void main(String[] args) {
    
            DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
                            .as(DataflowPipelineOptions.class); 
    
            // writes message to "output_topic"
            TopicPath topic = PubsubClient.topicPathFromName(options.getProject(), "output_topic");
    
            Pipeline p = Pipeline.create(options);
    
            p
            .apply("input string", Create.of("This is just a message"))
            .apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    c.output(new PubsubMessage(c.element().getBytes(), null));          
                }
            }))
            .apply("write to topic", new PubsubUnboundedSink(
                PubsubJsonClient.FACTORY,
                StaticValueProvider.of(topic), // topic
                "timestamp", // timestamp attribute
                "id", // ID attribute
                5 // number of shards
            )); 
    
            p.run();
        }
    }
    

    【讨论】:

    • 不幸的是,我的问题主要是使用数据流组件。这是因为我希望能够从我的代码中的任何位置发布 pubsub 消息,而且我不能将每个阶段都连接到一个 pubsub 源。这在代码维护和可视化数据流图方面都是不可接受的,这会变得非常混乱。
    • 对于com.google.pubsub.v1,我遇到了同样的问题,因为我无法导入Publisher。一种可能性是使用依赖项将其从 dataflow-java-sdk-all 中排除并导入最新版本或直接使用 HTTP 客户端库调用 API
    • 我找到了解决方法,今天有时间会发布答案
    【解决方案2】:

    这是我发现浏览https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-java/blob/master/dataflow/src/main/java/com/google/cloud/dataflow/examples/StockInjector.java的一种方式:

    import com.google.api.services.pubsub.Pubsub;
    import com.google.api.services.pubsub.model.PublishRequest;
    import com.google.api.services.pubsub.model.PubsubMessage;
    public class PubsubManager {
        private static final Logger logger = LoggerFactory.getLogger(PubsubManager.class);
        private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance();
        private static final Pubsub pubsub = createPubsubClient();
    
        public static class RetryHttpInitializerWrapper implements HttpRequestInitializer {
    
            // Intercepts the request for filling in the "Authorization"
            // header field, as well as recovering from certain unsuccessful
            // error codes wherein the Credential must refresh its token for a
            // retry.
            private final GoogleCredential wrappedCredential;
    
            // A sleeper; you can replace it with a mock in your test.
            private final Sleeper sleeper;
    
            private RetryHttpInitializerWrapper(GoogleCredential wrappedCredential) {
                this(wrappedCredential, Sleeper.DEFAULT);
            }
    
            // Use only for testing.
            RetryHttpInitializerWrapper(
                    GoogleCredential wrappedCredential, Sleeper sleeper) {
                this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential);
                this.sleeper = sleeper;
            }
    
            @Override
            public void initialize(HttpRequest request) {
                final HttpUnsuccessfulResponseHandler backoffHandler =
                        new HttpBackOffUnsuccessfulResponseHandler(
                                new ExponentialBackOff())
                                .setSleeper(sleeper);
                request.setInterceptor(wrappedCredential);
                request.setUnsuccessfulResponseHandler(
                        new HttpUnsuccessfulResponseHandler() {
                            @Override
                            public boolean handleResponse(HttpRequest request,
                                                          HttpResponse response,
                                                          boolean supportsRetry)
                                    throws IOException {
                                if (wrappedCredential.handleResponse(request,
                                        response,
                                        supportsRetry)) {
                                    // If credential decides it can handle it, the
                                    // return code or message indicated something
                                    // specific to authentication, and no backoff is
                                    // desired.
                                    return true;
                                } else if (backoffHandler.handleResponse(request,
                                        response,
                                        supportsRetry)) {
                                    // Otherwise, we defer to the judgement of our
                                    // internal backoff handler.
                                    logger.info("Retrying " + request.getUrl());
                                    return true;
                                } else {
                                    return false;
                                }
                            }
                        });
                request.setIOExceptionHandler(new HttpBackOffIOExceptionHandler(
                        new ExponentialBackOff()).setSleeper(sleeper));
            }
        }
    
        /**
         * Creates a Cloud Pub/Sub client.
         */
        private static Pubsub createPubsubClient() {
            try {
                HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
                GoogleCredential credential = GoogleCredential.getApplicationDefault();
                HttpRequestInitializer initializer =
                        new RetryHttpInitializerWrapper(credential);
                return new Pubsub.Builder(transport, JSON_FACTORY, initializer).build();
            } catch (IOException | GeneralSecurityException e) {
                logger.error("Could not create Pubsub client: " + e);
            }
            return null;
        }
    
        /**
         * Publishes the given message to a Cloud Pub/Sub topic.
         */
        public static void publishMessage(String message, String outputTopic) {
            int maxLogMessageLength = 200;
            if (message.length() < maxLogMessageLength) {
                maxLogMessageLength = message.length();
            }
            logger.info("Received ...." + message.substring(0, maxLogMessageLength));
    
            // Publish message to Pubsub.
            PubsubMessage pubsubMessage = new PubsubMessage();
            pubsubMessage.encodeData(message.getBytes());
    
            PublishRequest publishRequest = new PublishRequest();
            publishRequest.setMessages(Collections.singletonList(pubsubMessage));
            try {
                pubsub.projects().topics().publish(outputTopic, publishRequest).execute();
            } catch (java.io.IOException e) {
                logger.error("Stuff happened in pubsub: " + e);
            }
        }
    }
    

    【讨论】:

      【解决方案3】:

      您可以使用 PubsubIO writeMessages 方法发送 pubsub 消息

      数据流管道步骤

      Pipeline p = Pipeline.create(options);
         p.apply("Transformer1", ParDo.of(new Fn.method1()))
          .apply("Transformer2", ParDo.of(new Fn.method2()))
          .apply("PubsubMessageSend", PubsubIO.writeMessages().to(PubSubConfig.getTopic(options.getProject(), options.getpubsubTopic ())));
      

      在 PipeLineOptions 中定义要发送 pub subs 消息的项目名称和 pubsubTopic

      【讨论】:

        猜你喜欢
        • 2018-01-01
        • 1970-01-01
        • 2020-07-27
        • 2020-10-16
        • 1970-01-01
        • 2021-09-19
        • 2015-12-14
        • 2013-09-08
        • 2019-10-28
        相关资源
        最近更新 更多