【问题标题】:Exception handling in Spring XD streamsSpring XD 流中的异常处理
【发布时间】:2015-11-09 12:16:07
【问题描述】:

如何创建一个故障安全的 Spring XD 流,它会在一条特定消息触发异常后继续正常运行(即记录错误但继续使用流中的下一条消息),而无需添加 try catch( Throwable) 在每个 Stream 步骤中?

使用 Reactor 或 RxJava 模型有什么简单的方法吗?

使用 Reactor 的示例流:

@Override
public Publisher<Tuple> process(Stream<GenericMessage> inputStream) {
  return inputStream
      .flatMap(SomeClass::someFlatMap)
      .filter(SomeClass::someFilter)
      .when(Throwable.class, t -> log.error("error", t));
}

【问题讨论】:

    标签: java rx-java spring-xd project-reactor


    【解决方案1】:

    RxJava 可以被处理器模块使用。在创建订阅时,需要创建订阅并处理错误,订阅者需要添加一个 onError 处理程序:

           subject = new SerializedSubject(PublishSubject.create());
            Observable<?> outputStream = processor.process(subject);
            subscription = outputStream.subscribe(new Action1<Object>() {
                @Override
                public void call(Object outputObject) {
                    if (ClassUtils.isAssignable(Message.class, outputObject.getClass())) {
                        getOutputChannel().send((Message) outputObject);
                    } else {
                        getOutputChannel().send(MessageBuilder.withPayload(outputObject).build());
                    }
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    logger.error(throwable.getMessage(), throwable);
                }
            }, new Action0() {
                @Override
                public void call() {
                    logger.error("Subscription close for [" + subscription + "]");
                }
            });
    

    在此处查看更多示例:https://github.com/spring-projects/spring-xd/tree/master/spring-xd-rxjava/src

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-10-08
      • 2020-05-27
      • 2021-11-12
      • 1970-01-01
      相关资源
      最近更新 更多