【发布时间】:2023-11-18 15:26:01
【问题描述】:
我们有一个用例,在 UI 的任何操作上,我们需要同步读取来自 google pub/sub 主题 A 的消息并将这些消息移动到主题 B。
以下是为处理此行为而编写的代码,它来自 Google Pub Sub 文档,用于同步访问主题。
public static int subscribeSync(String projectId, String subscriptionId, Integer numOfMessages, int count, String acknowledgementTopic) throws IOException {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size).
.build())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(numOfMessages)
.setSubscription(subscriptionName)
.build();
// Use pullCallable().futureCall to asynchronously perform this operation.
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
// START - CODE TO PUBLISH MESSAGE TO TOPIC B
**publishMessage(message.getMessage(),acknowledgementTopic,projectId);**
// END - CODE TO PUBLISH MESSAGE TO TOPIC B
ackIds.add(message.getAckId());
}
// Acknowledge received messages.
AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
// Use acknowledgeCallable().futureCall to asynchronously perform this operation.
subscriber.acknowledgeCallable().call(acknowledgeRequest);
count=pullResponse.getReceivedMessagesList().size();
}catch(Exception e) {
log.error(e.getMessage());
}
return count;
}
以下是向主题 B 发布消息的示例代码
public static void publishMessage(PubsubMessage pubsubMessage,String Topic,String projectId) {
Publisher publisher = null;
ProjectTopicName topicName =ProjectTopicName.newBuilder().setProject(projectId).setTopic(Topic).build();
try {
// Publish the messages to normal topic.
publisher = Publisher.newBuilder(topicName).build();
} catch (IOException e) {
log.error(e.getMessage());
}
publisher.publish(pubsubMessage);
}
这是处理此用例的正确方法还是可以以其他方式处理。我们不想使用 Cloud Dataflow。有人可以让我们知道这是否正常或有问题。 该代码有效,但有时消息即使在同步消费之后仍保留在主题 A 上。 谢谢'
【问题讨论】:
-
您能否详细解释一下您的用例以及为什么需要从 A 获取消息然后发布到 B?
-
@guillaume blaquiere 此用例要求消息必须根据按钮单击等 UI 操作从主题 A 移动到 B。
标签: java google-cloud-platform google-cloud-pubsub