【问题标题】:Convert an Observable to an async generator将 Observable 转换为异步生成器
【发布时间】:2017-10-22 17:14:12
【问题描述】:

我正在尝试将 rxjs 与 babeljs 结合使用来创建一个异步生成器函数,该函数在调用 next 时产生,在调用 error 时抛出,并在调用 complete 时结束。我遇到的问题是我无法从回调中屈服。

我可以await 一个 Promise 来处理返回/抛出要求。

async function *getData( observable ) {
    await new Promise( ( resolve, reject ) => {
        observable.subscribe( {
            next( data ) {
                yield data; // can't yield here
            },
            error( err ) {
                reject( err );
            },
            complete() {
                resolve();
            }
        } );
    } );
}

( async function example() {
    for await( const data of getData( foo ) ) {
        console.log( 'data received' );
    }
    console.log( 'done' );
}() );

这可能吗?

【问题讨论】:

    标签: rxjs babeljs ecmascript-next reactive


    【解决方案1】:

    我问了橡皮鸭,然后我写了下面的代码来做我想要的:

    function defer() {
        const properties = {},
            promise = new Promise( ( resolve, reject ) => {
                Object.assign( properties, { resolve, reject } );
            } );
            return Object.assign( promise, properties );
    }
    
    async function *getData( observable ) {
        let nextData = defer();
        const sub = observable.subscribe( {
            next( data ) {
                const n = nextData;
                nextData = defer();
                n.resolve( data );
            },
            error( err ) {
                nextData.reject( err );
            },
            complete() {
                const n = nextData;
                nextData = null;
                n.resolve();
            }
        } );
        try {
            for(;;) {
                const value = await nextData;
                if( !nextData ) break;
                yield value;
            }
        } finally {
            sub.unsubscribe();
        }
    }
    

    【讨论】:

      【解决方案2】:

      我认为这个解决方案的一个问题是 observable 可以在一批中生成多个值(不延迟)。这是我的建议:

      const defer = () => new Promise (resolve =>
          setTimeout (resolve, 0));
      
      async function* getData (observable)
      {
          let values = [];
          let error = null;
          let done = false;
          observable.subscribe (
              data => values.push (data),
              err => error = err,
              () => done = true);
          for (;;)
          {
              if (values.length)
              {
                  for (const value of values)
                      yield value;
                  values = [];
              }
              if (error)
                  throw error;
              if (done)
                  return;
              await defer ();
          }
      }
      

      【讨论】:

        猜你喜欢
        • 2018-11-24
        • 2021-10-03
        • 1970-01-01
        • 2015-05-30
        • 1970-01-01
        • 2018-07-08
        • 2019-08-17
        • 2021-08-24
        • 2023-02-10
        相关资源
        最近更新 更多