【问题标题】:How to forward incoming data via REST to an SSE stream in Quarkus如何通过 REST 将传入数据转发到 Quarkus 中的 SSE 流
【发布时间】:2020-04-23 06:45:32
【问题描述】:

在我的设置中,我想通过 SSE 通道(服务器发送的事件)转发某些状态更改。通过调用 REST 端点来启动状态更改。因此,我需要将传入的状态更改转发到 SSE 流。

在 Quarkus 中完成此任务的最佳/最简单方法是什么。

我能想到的一个解决方案是使用 EventBus (https://quarkus.io/guides/reactive-messaging)。 SSE 端点将订阅状态更改并将其推送到 SSE 通道。状态更改端点发布适当的事件。

这是一个可行的解决方案吗?还有其他(更简单的)解决方案吗?在任何情况下我都需要使用响应式的东西来完成这个吗?

非常感谢任何帮助!

【问题讨论】:

    标签: server-sent-events quarkus


    【解决方案1】:

    最简单的方法是使用 rxjava 作为流提供程序。首先,您需要添加 rxjava 依赖项。它可以来自 quarkus 中的响应式依赖项,例如 kafka,也可以直接使用它(如果您不需要任何流式库):

            <dependency>
                <groupId>io.reactivex.rxjava2</groupId>
                <artifactId>rxjava</artifactId>
                <version>2.2.19</version>
            </dependency>
    

    这是关于如何每秒发送随机双精度值的示例:

        @GET
        @Path("/stream")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        @SseElementType("text/plain")
        public Publisher<Double> stream() {
            return Flowable.interval(1, TimeUnit.SECONDS).map(tick -> new Random().nextDouble());
        }
    

    我们创建了新的 Flowable,它将每秒触发一次,并且在每个滴答声中,我们生成下一个随机双精度数。研究有关如何创建 Flowable 的任何其他选项,例如 Flowable.fromFuture() 以使其适应您的特定代码逻辑。

    P.S 代码将在您每次查询此端点时生成新的 Flowable,我这样做是为了节省空间,在您的情况下,我假设您将拥有一个事件源,您可以构建一次并使用相同的实例每次时间端点查询

    【讨论】:

    • 感谢您的指点。你能看看the solution I have come up with吗?这是一种有效的方法吗?
    • @robbit 看起来不错?,我用的是 flowable,因为我比较熟悉它
    【解决方案2】:

    Dmytro,感谢您为我指明了正确的方向。 我选择了与 Kotlin 相关的 Mutiny。我的代码现在看起来像这样:

    data class DeviceStatus(var status: Status = Status.OFFLINE) {
        enum class Status {OFFLINE, CONNECTED, ANALYZING, MAINTENANCE}
    }
    
    @ApplicationScoped
    class DeviceStatusService {
        var deviceStatusProcessor: PublishProcessor<DeviceStatus> = PublishProcessor.create()
        var deviceStatusQueue: Flowable<DeviceStatus> = Flowable.fromPublisher(deviceStatusProcessor)
    
        fun pushDeviceStatus(deviceStatus: DeviceStatus) {
            deviceStatusProcessor.onNext(deviceStatus)
        }
    
        fun getStream(): Multi<DeviceStatus> {
            return Multi.createFrom().publisher(deviceStatusQueue)
        }
    }
    
    @Path("/deviceStatus")
    class DeviceStatusResource {
        private val LOGGER: Logger = Logger.getLogger("DeviceStatusResource")
    
        @Inject
        @field: Default
        lateinit var deviceStatusService: DeviceStatusService
    
        @POST
        @Consumes(MediaType.APPLICATION_JSON)
        fun status(status: DeviceStatus): Response {
            LOGGER.info("POST /deviceStatus " + status.status);
            deviceStatusService.pushDeviceStatus(status)
            return Response.ok().build();
        }
    
        @GET
        @Path("/eventStream")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        @SseElementType(MediaType.APPLICATION_JSON)
        fun stream(): Multi<DeviceStatus>? {
            return deviceStatusService.getStream()
        }
    }
    

    作为最小设置,服务可以直接使用 deviceStatusProcessor 作为发布者。但是,Flowable 增加了缓冲。 欢迎对实施提出意见。

    【讨论】:

      猜你喜欢
      • 2022-01-04
      • 2019-10-29
      • 2017-07-22
      • 1970-01-01
      • 2020-12-02
      • 1970-01-01
      • 1970-01-01
      • 2018-07-15
      • 2012-06-10
      相关资源
      最近更新 更多