【问题标题】:RxJS: Queue the first source until the second source meet the predicateRxJS:将第一个源排队,直到第二个源满足谓词
【发布时间】:2018-04-23 00:54:39
【问题描述】:

如何用 RxJS 实现这个场景。

当我想获取用户(第一个来源)时,我需要检查互联网连接(第二个来源)。如果 Internet 未连接,则将第一个源排队,直到 Internet 重新打开。如果互联网重新打开,则发出第一​​个源。我可以制作后一个,但在第二个来源为真之前,我无法弄清楚如何将第一个来源排队。

请在此处查看我的代码:

const userEpic = action$ =>
  action$.ofType(USER_REQUEST)
    .withLatestFrom(action$.ofType(CONNECTION))
    .filter(([first, second]) => second.value === true)
    .do(a => console.log('can fetch a user'))    
    .mapTo({ type: SUCCESS });

http://jsbin.com/zomikukiqi/1/edit?js,console,output

【问题讨论】:

  • 你好。你能进一步澄清这一点吗?示例代码似乎与描述不匹配(没有用户,没有互联网检查,行为似乎不相关等)很可能排队 observables 不是理想的解决方案,所以也尝试在没有这种假设的情况下进行解释,如果可能的话:)
  • 嗨对不起,我修改了示例代码以反映描述。我还改进了 jsbin 代码。在 jsbin 中单击,断开连接然后启动请求,然后单击连接,我应该在控制台中看到 can fetch a user。调度第一个源的最后一个值应该没问题,或者甚至更好地调度所有尚未发出的值。我查看了buffer 运算符,但不知道如何缓冲第一个源。感谢收看。

标签: javascript reactjs rxjs react-redux redux-observable


【解决方案1】:

尝试buffer() 存储 USER_REQUEST,直到 CONNECTION 触发缓冲区刷新。

const USER_REQUEST = 'USER_REQUEST';
const CONNECTION = 'CONNECTION';
const SUCCESS = 'SUCCESS';

const requestUser = () => ({ type: USER_REQUEST });
const connection = (x) => ({ type: CONNECTION, value: x });

const bufferedUserRequests$ = (action$, store) =>
  action$.ofType(USER_REQUEST)
    .filter(x => !store.getState().connectionStatus)    
    .buffer(action$.ofType(CONNECTION))

const unbufferedUserRequests$ = (action$, store) =>
  action$.ofType(USER_REQUEST)
    .filter(x => store.getState().connectionStatus)    
    .map(request => [request])

const userEpic = (action$, store) =>
   Rx.Observable.merge(
     bufferedUserRequests$(action$, store), 
     unbufferedUserRequests$(action$, store)
   )
   .flatMap(bufferArray => 
     bufferArray.map(request => Rx.Observable.of(request))
   )
   .do(a => console.log('can fetch a user'))
   .mapTo({ type: SUCCESS })

const reducer = (state = { connectionStatus: false}, action) => {
  switch (action.type) {
    case CONNECTION:
      return { ...state, connectionStatus: action.value }
    default:
      return state;
  }
};

// components/App.js

const { connect } = ReactRedux;

let App = ({ requestUser, connection, connectionStatus }) => (
  <div>
    <button onClick={requestUser}>Start Request</button>
    <p>Internet Connection: {connectionStatus.toString()}</p>
    <button onClick={() => connection(false)}>Disconnect</button>
    <button onClick={() => connection(true)}>Connect</button>
  </div>
);

App = connect(
  ({ connectionStatus }) => ({ connectionStatus }),
  { requestUser, connection }
)(App);

// redux/configureStore.js

const { Provider } = ReactRedux;
const { createStore, applyMiddleware } = Redux;
const { createEpicMiddleware } = ReduxObservable;

const epicMiddleware = createEpicMiddleware(userEpic);

const store = createStore(reducer,
  applyMiddleware(epicMiddleware)
);

// index.js

ReactDOM.render(
  <Provider store={store}>
    <App />
  </Provider>,
  document.getElementById('root')
);
<script src="https://unpkg.com/react@15.2.1/dist/react.min.js"></script>
<script src="https://unpkg.com/react-dom@15.2.1/dist/react-dom.min.js"></script>
<script src="https://unpkg.com/redux@^3.5.2/dist/redux.min.js"></script>
<script src="https://unpkg.com/react-redux@4.4.5/dist/react-redux.min.js"></script>
<script src="https://unpkg.com/@reactivex/rxjs/dist/global/Rx.js"></script>
<script src="https://unpkg.com/redux-observable/dist/redux-observable.min.js"></script>
  <div id="root"></div>

【讨论】:

  • 感谢@richard-matsen。这就是我要找的。 mergeAll也可以简化这段代码:.flatMap(bufferArray =&gt; bufferArray.map(request =&gt; Rx.Observable.of(request)) )
  • 确实可以,很好看。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-08-25
  • 1970-01-01
  • 1970-01-01
  • 2010-12-13
  • 2018-03-17
  • 2020-05-12
  • 1970-01-01
相关资源
最近更新 更多