【问题标题】:Why is Observable functionality getting executed twice for a single call?为什么 Observable 功能会在一次调用中执行两次?
【发布时间】:2019-04-29 02:59:33
【问题描述】:

程序的完整结构

注释:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface UserAnnotation {
}

然后创建了一个拦截器:

public class UserInterceptor implements MethodInterceptor {

    private static final Logger logger = LoggerFactory.getLogger(UserInterceptor.class);

    @Inject
    UserService userService; // this is not working

    public Object invoke(MethodInvocation invocation) throws Throwable {
        logger.info("UserInterceptor : Interceptor Invoked");
        Object result = invocation.proceed();
        Observable<List<User>> observable = (Observable<List<Sample>>) result;
        observable.flatMap(Observable::from).subscribe(object -> {
            User user = (User)object
            SampleSender sender = new SampleSender();
            sender.setBoolean(user.isBoolean());
            logger.info("Pushing Data into Sender");
            userService.insert(String.join("_", "key", "value"), sender); 
        }
        return result;
    }
}

然后我创建了一个 GuiceModule 如下:-

public class UserModule extends AbstractModule {
    @Override
    protected void configure() {
        SampleInterceptor interceptor = new SampleInterceptor()
        requestInjection(interceptor);
        bindInterceptor(Matchers.any(), Matchers.annotatedWith(SampleAnnotation.class), interceptor);
}

}

我使用上述注释的类是

// This class also have so many method and this was already declared and using in another services, I created a sample class here
class UserClassForInterceptor {

      @Inject
      AnotherClass anotherClass;
      // this userMethod() is not a new method, its already created, 
      // now I am adding annotation to it, because after finishing this functionality, 
      // I want something should be done, so created annotation and added here
      @UserAnnotation
      public Observable<List<Sample>> userMethod() {
            logger.info("This is printing only once");
            return anotherClass.getUser().flatMap(user ->{
                logger.info("This is also printing twice");
                // this logger printed twise means, this code snippet is getting executed twise
            });
      }
}

public class AnotherClass{
          public Observable<User> getUser(){
           Observable<Sample> observableSample = methodReturnsObservableSample();
           logger.info("Getting this logger only once");
           return observableSample.map(response-> {
               logger.info("This logger is printing twice");
               //here have code to return observable of User
           });
      }
}

如果我删除可观察对象内的注释记录器,则只会打印一次,但是当我使用注释时,这些记录器会被打印两次。我不知道为什么会这样。

我有一个RestModule,我使用它来绑定UserClassForInterceptor,如下所示

public final class RestModule extends JerseyServletModule {
    // other classes binding
    bind(UserClassForInterceptor.class).in(Scopes.SINGLETON);
    // other classes binding
    install(new SampleModule());
}

现在我有一个引导类,我在其中绑定RestModule

public class Bootstrap extends ServerBootstrap {
   binder.install(new RestModule());
}

用法:-

@Path("service/sample")
public class SampleRS {
    @Inject
    UserClassForInterceptor userClassForInterceptor;

    public void someMethod() {
        userClassForInterceptor.sampleMethod();
    }
}

【问题讨论】:

  • 您的UserAnnotation 是如何实现的?那将是寻找双重调用的第一个地方。此外,您在getTestObject() 中缺少return 关键字。
  • 我已经更新了代码和UserAnnocation 实现,其用法等与我在链接中分享的SampleAnnotation 相同
  • getTestObject() 中,您创建一个以getUser().flatMap... 开头的观察者链。但是,它没有任何反应,因为没有订阅。您向我们展示的代码不会表现出您描述的行为
  • 用我的示例应用程序结构更新问题
  • 我再说一遍,在getTestObject()方法中,getUser().flatMapUser( user -&gt; { ... });这个语句是没有作用的,因为没有订阅。该语句内的日志记录语句无法执行。这意味着您没有向我们展示展示您描述的行为的代码。

标签: java-8 rx-java


【解决方案1】:

您创建了一个注释@UserAnnotation,以及一个与该注释一起使用的拦截器类。您将注释附加到方法userMethod()

你的拦截器例程做的第一件事是调用userMethod()来获取它返回的observable,然后拦截器订阅返回的observable,导致第一条日志消息出现。最终,拦截器将 observable 返回给原始调用者。当其他东西订阅返回的 observable 时,观察者链会第二次被激活,因此日志消息会出现两次。

RxJava 有副作用

虽然 RxJava 是“函数式反应式编程”概念的实现,但您(以函数式方式)构建的观察者链仅在订阅时才起作用,并且这些订阅具有副作用。日志输出是一种副作用,可能是最良性的;更改变量或调用具有副作用的方法会产生更广泛的影响。

当一个观察者链(正确地)被构建时,它会充当一个潜在的计算,直到有一个订阅者。如果您需要有多个订阅者,就像您的问题域一样,那么您必须决定是否需要为每个订阅激活观察者链,正常情况下,还是只为所有重叠订阅激活一次。

如果您希望所有重叠订阅共享同一个 observable,那么您可以使用 share() 运算符。有许多相关的运算符会影响可观察对象和订阅的生命周期。这是一个概述:How to use RxJava share() operator?

面向方面的编程:拦截器和Guice

您的代码使用 Guice 来提供一种称为“面向方面编程”的功能。这允许您将代码引入程序以解决横切关注点,或通过设置受控网关来增强其功能。使用 Guice 或类似的 AOP 方法需要纪律

在您的情况下,您使用拦截过程通过订阅具有非平凡副作用的观察者链来导致无法解释的(直到现在)副作用。想象一下,您拦截的方法建立了一个一次性连接,而您的拦截器在完成工作时用尽了该连接,导致原始调用者无法使用该连接。

您需要的纪律是了解拦截器必须遵循的规则。想想诸如“首先,不要伤害”之类的规则。

以 FRP 方式做事

如果您需要在处理用户信息时添加额外的步骤,那么您应该在拦截器中构造一个新的 observable 来执行此操作,但前提是原始调用者订阅了 observable:

    Object result = invocation.proceed();
    Observable<List<User>> observable = (Observable<List<Sample>>) result;
    Observable<List<User>> newObservable = observable
      .doOnNext( sampleList ->
         Observable.fromIterable( sampleList )
           .subscribe(object -> {
             User user = (User)object
             SampleSender sender = new SampleSender();
             sender.setBoolean(user.isBoolean());
             logger.info("Pushing Data into Sender");
             userService.insert(String.join("_", "key", "value"), sender); 
           }));
    return newObservable;

通过返回修改后的观察者链,您不会从原始观察者链中引入副作用,并确保您在自己的代码中引入的副作用在原始观察者时触发链已订阅。

【讨论】:

    【解决方案2】:

    这段代码也帮助了我

    public Object invoke(MethodInvocation invocation) throws Throwable {
        Object result = null;
        try{
            logger.debug("Interceptor Invoked");
            result = invocation.proceed();
            Observable<List<User>> observable = (Observable<List<User>>)result;
    
            return observable
                    .doOnNext(this::updateUser);
        }
        catch(Exception ex){
            logger.error("Error: ",ex);
        }
        return result;
    }
    
    private void updateUser(List<User> users) {
        if(CollectionUtils.isNotEmpty(users)) {
            for(User user: users) {
                SampleSender sender = new SampleSender();
                sender.setBoolean(user.isBoolean());
                logger.info("Pushing Data into Sender");
                userService.insert(String.join("_", "key", "value"), sender); 
            }
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2018-08-18
      • 2011-12-01
      • 2023-03-16
      • 1970-01-01
      • 1970-01-01
      • 2016-10-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多