【发布时间】:2018-07-05 18:51:51
【问题描述】:
使用Spring framework 和Google Cloud Platform 开发实时跟踪系统。
Spring Cloud GCP 可以轻松编写GCP PubSub 应用程序Spring Integration 方式。在他们的 github 页面上,我可以编写以下应用程序,例如:Github Samples
@Configuration
@Slf4j
public class GCPConfiguration {
/*
* Message sender code
* */
@Bean
@ServiceActivator(inputChannel = "pubSubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
PubSubMessageHandler adapter =
new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
adapter.setPublishCallback(new ListenableFutureCallback<String>() {
@Override
public void onFailure(Throwable ex) {
log.info("There was an error sending the message.");
}
@Override
public void onSuccess(String result) {
log.info("Message was sent successfully.");
}
});
return adapter;
}
@MessagingGateway(defaultRequestChannel = "pubSubOutputChannel")
public interface PubSubOutboundGateway {
void sendToPubSub(String text);
}
/*
* Message receiver code
* */
@Bean
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubOperations pubSubTemplate) {
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "exampleSubscription");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@ServiceActivator(inputChannel = "pubsubInputChannel")
public void messageReceiver(String payload, @Header(GcpHeaders.ACKNOWLEDGEMENT) AckReplyConsumer ackReplyConsumer) {
log.info("Message arrived! Payload: " + payload);
ackReplyConsumer.ack();
}
}
跟踪设备将持续向此应用程序公开的TCP 端口发送数据,该端口需要转换然后持久化为BigQuery 和GC SQL。从TCP 端口获取数据并将其发布到GC PubSub 已经到位。
我不知道如何以及在何处添加来自GC PubSub 的Google Cloud Dataflow 代码
更新
目标是向GC BigQuery 和GC SQL 插入数据,所以回答会导致数据插入这些服务的答案很好。
【问题讨论】:
标签: java spring-integration spring-cloud google-cloud-dataflow google-cloud-pubsub