【问题标题】:Google PubSub with Dataflow using Spring Integration使用 Spring 集成的带有 Dataflow 的 Google PubSub
【发布时间】:2018-07-05 18:51:51
【问题描述】:

使用Spring frameworkGoogle Cloud Platform 开发实时跟踪系统。 Spring Cloud GCP 可以轻松编写GCP PubSub 应用程序Spring Integration 方式。在他们的 github 页面上,我可以编写以下应用程序,例如:Github Samples

@Configuration
@Slf4j
public class GCPConfiguration {
    /*
    *   Message sender code
    * */
    @Bean
    @ServiceActivator(inputChannel = "pubSubOutputChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
        PubSubMessageHandler adapter =
                new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
        adapter.setPublishCallback(new ListenableFutureCallback<String>() {
            @Override
            public void onFailure(Throwable ex) {
                log.info("There was an error sending the message.");
            }

            @Override
            public void onSuccess(String result) {
                log.info("Message was sent successfully.");
            }
        });

        return adapter;
    }

    @MessagingGateway(defaultRequestChannel = "pubSubOutputChannel")
    public interface PubSubOutboundGateway {
        void sendToPubSub(String text);
    }

    /*
    *   Message receiver code
    * */
    @Bean
    public MessageChannel pubsubInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public PubSubInboundChannelAdapter messageChannelAdapter(
            @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
            PubSubOperations pubSubTemplate) {
        PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
        adapter.setOutputChannel(inputChannel);
        adapter.setAckMode(AckMode.MANUAL);
        return adapter;
    }

    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public void messageReceiver(String payload, @Header(GcpHeaders.ACKNOWLEDGEMENT) AckReplyConsumer ackReplyConsumer) {
        log.info("Message arrived! Payload: " + payload);

        ackReplyConsumer.ack();
    }
}

跟踪设备将持续向此应用程序公开的TCP 端口发送数据,该端口需要转换然后持久化为BigQueryGC SQL。从TCP 端口获取数据并将其发布到GC PubSub 已经到位。 我不知道如何以及在何处添加来自GC PubSubGoogle Cloud Dataflow 代码

更新

目标是向GC BigQueryGC SQL 插入数据,所以回答会导致数据插入这些服务的答案很好。

【问题讨论】:

    标签: java spring-integration spring-cloud google-cloud-dataflow google-cloud-pubsub


    【解决方案1】:

    您的问题似乎是如何使用 Google Cloud Dataflow 从 Pub/Sub 流式传输到 Bigquery。

    Dataflow 网站从其示例 page 链接到此 example。请注意,这是使用 SDK 的 1.x 版本,而不是 2.x Apache Beam 版本。你可以找到一个类似的例子here

    还有一个谷歌提供的template可以使用,不需要编码。

    编辑:该模板最近开源,可在 github 上找到。

    【讨论】:

      猜你喜欢
      • 2015-12-24
      • 2016-11-26
      • 2020-04-05
      • 1970-01-01
      • 2022-11-13
      • 1970-01-01
      • 1970-01-01
      • 2019-09-26
      • 2018-02-09
      相关资源
      最近更新 更多