【问题标题】:Why I m still getting data from my webflux spring boot even if I unsubscribe?为什么即使我取消订阅,我仍然从我的 webflux spring boot 中获取数据?
【发布时间】:2018-12-08 14:12:52
【问题描述】:

我正在学习响应式网络,对于一个教程,我想从我的 spring webflux rest 服务到 angular 6 客户端获取一些 twitter 主题标签结果搜索。

当在 chrome 中点击我的localhost:8081/search/test 时,我会以一种反应的方式获取 json 格式的推文(通过推文和浏览器显示每一条推文)。

所以为了更开心,我做了一个小的角度搜索输入,我会在控制台推文中显示

问题是,当我搜索 java 标签时,我会得到控制台日志,然后如果我尝试搜索 spring 标签,我将在控制台中记录 spring 推文,而 Java 推文仍在继续

我做了一些研究,发现我应该为我的消费者取消订阅 Flux。

我试图实现这个但没有成功

这是我尝试过的

Spring WebFlux 控制器

private TwitHashLocalService localService;
    private TwitHashRemoteService remoteService;

    @GetMapping(value="search/{tag}",produces=MediaType.TEXT_EVENT_STREAM_VALUE) 
    public Flux<Tweet> getByTag(@PathVariable String tag) throws TwitterException{
        return localService.findByTag(tag).mergeWith(remoteService.findByTag(tag).doOnNext(tweet -> localService.save(tweet)));
    }

我的服务

本地 mongo 数据库

private MongoService mongoService;

    public Flux<Tweet> findByTag(String tag) {

        return mongoService.findByTag(tag);
    }

远程 Twitter 流通量

public Flux<Tweet> findByTag(String hashtag) throws TwitterException {

    return Flux.create(sink -> {
        TwitterStream twitterStream = new TwitterStreamFactory(configuration).getInstance();
        twitterStream.onStatus(status -> sink.next(Tweet.fromStatus(status,hashtag)));
        twitterStream.onException(sink::error);
        twitterStream.filter(hashtag);
        sink.onCancel(twitterStream::shutdown);
    }); 

}

角度

我的反应式推特搜索服务

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { ITweet } from './itweet';
import { Observable, of } from 'rxjs';


@Injectable({
    providedIn: 'root'
})
export class ReactiveTwitterService {

    myTweets: ITweet[] = new Array();

    tweetTag: string;

    baseUrl = 'http://localhost:8081/search';

    constructor(private http_client: HttpClient) { }

    getTweetStream(tag): Observable<Array<ITweet>> {

        this.myTweets = [];
        const url = this.baseUrl + '/' + tag;

        return Observable.create(observer => {
            const eventSource = new EventSource(url);
            eventSource.onmessage = (event) => {
                console.log('received event');
                const json = JSON.parse(event.data);
                console.log(json);
                console.log(json.tweetData.name, json.tweetData.text, json.tag);
                this.myTweets.push(new ITweet(json.tweetData.name, json.tweetData.text, json.tag));
                observer.next(this.myTweets);
            };
            eventSource.onerror = (error) => {
                // readyState === 0 (closed) means the remote source closed the connection,
                // so we can safely treat it as a normal situation. Another way of detecting the end of the stream
                // is to insert a special element in the stream of events, which the client can identify as the last one.
                if (eventSource.readyState === 0) {
                    console.log('The stream has been closed by the server.');
                    eventSource.close();
                    observer.complete();
                } else {
                    observer.error('EventSource error: ' + error);
                }
            };
        });   
    }   
}

组件搜索栏

import { Component, OnInit, HostListener } from '@angular/core';
import { ReactiveTwitterService } from '../reactive-twitter.service';
import { Observable, Subscription } from 'rxjs';
import { ITweet } from '../itweet';

@Component({
    selector: 'app-serach-bar',
    templateUrl: './serach-bar.component.html',
    styleUrls: ['./serach-bar.component.css']
})
export class SerachBarComponent implements OnInit {
    innerWidth: number;


    subscription: Subscription = new Subscription();
    placeholder = 'search';

    styleClass = {
        wide_screen: 'w3-input w3-light-grey',
        break_point: 'w3-input w3-white'
    };

    tweets: Observable<ITweet[]>;

    constructor(private twiterService: ReactiveTwitterService) { }

    doSearch(tag) {
        console.log('test' + tag);
        this.subscription.unsubscribe();
        this.tweets = this.twiterService.getTweetStream(tag);
        this.subscription.add(this.tweets.subscribe());
    }


    ngOnInit() {
    }

    @HostListener('window:resize', ['$event'])
    onResize(event) {
        this.innerWidth = window.innerWidth;
    }

    getStyle() {
        return (innerWidth > 769) ? this.styleClass.wide_screen : this.styleClass.break_point;
    }
}

正如您在搜索中看到的那样,我试图在研究之前取消订阅,但这不起作用

我该怎么办?

【问题讨论】:

  • 我只是尝试关闭 eventSource 如果它已创建但没有成功

标签: angular spring-webflux project-reactor angular2-observables unsubscribe


【解决方案1】:

1) 确保取消在 RxJs 中传播

从我看到的代码来看,Angular 端没有取消处理程序。

尝试执行以下操作,在 JS 端对取消订阅做出反应:

Observable.create(observer => {
        const eventSource = new EventSource(url);

        // your code here 

        return () => eventSource.close(); // on cancel function
    }); 

2) 添加.log()ing

我建议向 Reactor 的管道添加额外的日志记录,这样就可以清楚地知道哪些信号通过管道传播。为此目的使用.log() 运算符。

3) 确保 EventSource 在浏览器端真正关闭。

使用浏览器调试控制台观察所有打开的服务器连接。确保在更改标签/清理搜索请求后关闭连接

4) 记住最终一致性

所有通过响应式管道传播的事件都是异步且非阻塞的,因此在服务器端的实际操作和最终取消之间可能会有一些延迟

【讨论】:

  • 谢谢,我想说的是,根据您的建议,我还通过取消订阅然后为下一个搜索值重新实例化新的订阅包装器来更改取消订阅
  • 很高兴为您提供帮助)
  • 最后一个问题,你有什么书推荐,所以我可以在生产环境中掌握反应
  • :D 你可以看看我的一个 XD -> packtpub.com/application-development/…。它与生产并不完全相关,但绝对可以从中学到一些东西:D。另一本好书是->manning.com/books/reactive-design-patterns
  • 我很高兴得到了一位作家的帮助 :) 愿上帝保佑你
猜你喜欢
  • 2018-07-17
  • 2017-06-22
  • 1970-01-01
  • 2021-12-17
  • 1970-01-01
  • 1970-01-01
  • 2021-09-10
  • 2022-01-22
  • 2019-07-23
相关资源
最近更新 更多