【问题标题】:loopback event stream causing race condition导致竞争条件的环回事件流
【发布时间】:2019-05-18 00:58:26
【问题描述】:

我的总体目标是监视我的数据库是否有任何更改,并自动将这些更改广播给连接到我网站的任何用户。 我看到的问题是我有一个动作触发了对我的数据库的发布请求,然后同时触发了事件流,因为我正在观看的模型已经改变。这会导致初始操作完成,并且事件流触发的操作在完成之前被中断。

这是在我的数据库中创建新博客文章条目而触发的第一个操作

export const topicSubmit = (date, newTopic, newTopicBody, memberId, name) => {
    return {
        type: 'TOPIC_SUBMIT',
        payload: axios({
            method: 'post',
            url: `/api/blogPosts`,
            data: {
                "blogTitle": newTopic,
                "blogBody": newTopicBody,
                "date": date,
                "upVotes": 0,
                "numComments": 0,
                "voteNames": [],
                "memberId": memberId,
                "steamNameId": name
            }
        })
            .then(response => {
                return response.data
            })
            .catch(err => err)
    }
}

// this is the boot script that creates the change stream

var es = require('event-stream');
module.exports = function (app) {
    console.log('realtime boot script')
    var BlogPost = app.models.BlogPost;
    BlogPost.createChangeStream(function (err, changes) {
        changes.pipe(es.stringify()).pipe(process.stdout);
    });
}

// this is the event listener on my front end that will dispatch all
// changes made in my database to my front end

componentDidMount() {
        const { dispatch } = this.props;
          let urlToChangeStream = '/api/blogPosts/change-stream?_format=event-stream';
          let src = new EventSource(urlToChangeStream);
          src.addEventListener('data', function (msg) {
              let data = JSON.parse(msg.data);
          dispatch(liveChangeBlogs(data))
          });

我希望在事件侦听器调度“liveChangeBlogs”操作之前,“TOPIC_SUBMIT”操作应返回已完成 这是我在环回事件流https://loopback.io/doc/en/lb3/Realtime-server-sent-events.html中找到的文档@

【问题讨论】:

    标签: javascript loopbackjs event-stream


    【解决方案1】:

    我最终使用 Redux Thunk 解决了这个问题,向我的 componentDidMount 添加了一个 setTimeout 和一个闭包。 topicSubmit 操作和引导脚本没有改变。不确定 setTimeout 是否是正确的方法,但这是我能想到的绕过比赛案例的唯一方法。

     componentDidMount() {
            const { dispatch } = this.props;
            const newBlog = this.handleNewBlog;
            let urlToChangeStream = '/api/blogPosts/change-stream?_format=event-stream';
            let src = new EventSource(urlToChangeStream);
            src.addEventListener('data', function (msg) {
                newBlog(msg)
            });
    
            const newThread = this.handleNewThread;
            let urlToChangeStream2 = '/api/threads/change-stream?_format=event-stream';
            let src2 = new EventSource(urlToChangeStream2);
            src2.addEventListener('data', function (msg) {
                newThread(msg)
            });
            dispatch(getBlogs());
        }
    
      handleNewBlog(msg) {
            const { dispatch } = this.props;
            let data = JSON.parse(msg.data);
            if(data.data == undefined) {
                setTimeout(() => {
                    dispatch(getBlogs());
                }, 1000);
            } else {
                setTimeout(() => {
                    dispatch(liveChangeBlogs(data));
                }, 1000);
            }
        }
    
     handleNewThread(msg) {
            const { dispatch, viewingThreadId } = this.props;
            let data2 = JSON.parse(msg.data);
            console.log('data2: ', data2)
            if (data2.type == 'remove') {
                return dispatch(getThreadsById(viewingThreadId))
            }
            let id = data2.data.blogPostId
            setTimeout(() => {
                if (viewingThreadId === id) {
                    dispatch(getThreadsById(id));
                } else {
                    return
                }
            }, 1000);
        }
    

    【讨论】:

      【解决方案2】:

      我希望在事件侦听器调度“liveChangeBlogs”操作之前,“TOPIC_SUBMIT”操作应返回已完成

      恐怕这是不可能的。即使 LoopBack 服务器在发送对 POST 请求的响应之前拒绝发送事件流条目,它仍然无法保证客户端将在它之前接收(并处理!)对 POST 请求的响应处理事件流条目。

      我的建议是跟踪客户端中正在进行的请求,并删除同一客户端所做更改的事件流条目。

      大致如下:

      const pendingRequests = [];
      
      export const topicSubmit = (date, newTopic, newTopicBody, memberId, name) => {
          const data = {
                      "blogTitle": newTopic,
                      "blogBody": newTopicBody,
                      "date": date,
                      "upVotes": 0,
                      "numComments": 0,
                      "voteNames": [],
                      "memberId": memberId,
                      "steamNameId": name
                  };
          const entry = {type: 'CREATE', data}
          pendingRequests.push(entry);
      
          return {
              type: 'TOPIC_SUBMIT',
              payload: axios({
                  method: 'post',
                  url: `/api/blogPosts`,
                  data,
      
              })
                  .then(response => {
                      return response.data
                  })
                  .catch(err => err)
                  .finally(() => {
                    // remove the entry from pending requests
                    const ix = pendingRequests.indexOf(entry);
                    if (ix > -1) pendingRequests.splice(ix, 1);
                  })
          }
      }
      
      // this is the event listener on my front end that will dispatch all
      // changes made in my database to my front end
      // (...)
                src.addEventListener('data', function (msg) {
                    let data = JSON.parse(msg.data);
                    const ours = pendingRequests.find(it => {
                      it.type === data.type && /* check fields that uniquely identify model instance and/or the change being made */
                    });
                    if (ours) {
                      // this event was triggered by us, discard it
                      return; 
                    }
                dispatch(liveChangeBlogs(data))
                });
      

      【讨论】:

        猜你喜欢
        • 2022-09-23
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-12-07
        • 2021-09-03
        • 2014-02-21
        相关资源
        最近更新 更多