【问题标题】:What are the practical differences between an AsyncIterable and an Observable?AsyncIterable 和 Observable 之间的实际区别是什么?
【发布时间】:2023-03-08 02:55:01
【问题描述】:

我最近对这个话题很感兴趣。似乎 AsyncIterables 和 Observables 都具有类似流的特性,尽管它们的使用方式略有不同。

你可以像这样使用异步迭代

const myAsyncIterable = async function*() { yield 1; yield 2; yield 3; }

const main = async () => {
  for await (const number of myAsyncIterable()) {
    console.log(number)
  }
}

main()

你可以像这样消费一个 observable

const Observable = rxjs
const { map } = rxjs.operators

Observable.of(1, 2, 3).subscribe(x => console.log(x))
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>

我的首要问题是基于RxJS pr

如果 observable 的发射速度比循环完成的速度快,那么随着缓冲区越来越满,内存就会增加。我们可以提供使用不同策略的其他方法(例如,仅使用最近的值等),但将其保留为默认值。请注意,循环本身可能有多个等待,这会加剧问题。

在我看来,异步迭代器本质上没有背压问题,那么在 Observable 上实现 Symbol.asyncIterator (@@asyncIterator) 并默认使用背压策略是否正确?鉴于 AsyncIterables,甚至还需要 Observables 吗?

理想情况下,您可以通过代码示例向我展示 AsyncIterables 和 Observables 之间的实际区别。

【问题讨论】:

  • 默认为背压策略”是什么意思?
  • 您似乎已经回答了正文中标题中的问题(异步迭代是基于拉的,可观察的是基于推的)。你真的想知道其中的区别吗?
  • @Bergi 特别是,当缓冲区变得更满时允许内存建立的背压策略(正如贡献者似乎暗示的那样)。不,对我来说这听起来不是一个好策略。另外,我可能只是稍微回答了我的问题,但我正在寻找更充实的东西。
  • 我们是否需要规范中的 Observable?” - 这是一个基于意见的问题,我们无法在此回答。不过,您链接的提案文本应该为此提供很好的论据。
  • 非常接近(迭代器与可迭代)重复:What is the difference between async generators and Observables?

标签: javascript ecmascript-6 promise rxjs observable


【解决方案1】:

可观察到的东西令人费解,我的理解可能存在缺陷。但是异步迭代器只是一个返回承诺的迭代器,它可以在“实时”事件流(热可观察)中解析未来的事件。它可以使用队列来实现,如下所示。

function* iterateClickEvents(target) {
  const queue = []
  target.addEventListener('click', e => queue.shift()?.fulfill(e))
  while (true)
    yield new Promise(fulfill => queue.push({fulfill}))
}

//use it
for await (const e of iterateClickEvents(myButton))
  handleEvent(e)

然后您可以实现流畅的运算符,例如:

class FluentIterable {
  constructor(iterable) {
    this.iterable = iterable
  }
  filter(predicate) {
    return new FluentIterable(this.$filter(predicate))
  }
  async* $filter(predicate) {
    for await (const value of this.iterable)
      if (predicate(value))
        yield value
  }
  async each(fn) {
    for await (const value of this.iterable)
      fn(value)
  }
}

//use it
new FluentIterable(iterateClickEvents(document.body))
  .filter(e => e.target == myButton)
  .each(handleEvent)
  .catch(console.error)

https://codepen.io/ken107/pen/PojZjgB

您可以实现一个map 运算符,它返回inner 迭代器的结果。事情从那里开始变得复杂。

【讨论】:

    【解决方案2】:

    主要区别在于哪一方决定何时迭代。

    对于异步迭代器,客户端通过调用await iterator.next() 来决定。源决定何时解决承诺,但客户端必须首先要求下一个值。因此,消费者从源中“拉取”数据。

    Observables 注册一个回调函数,当有新值进入时,observable 立即调用该回调函数。因此,源“推送”给消费者。

    通过使用 Subject 并将其映射到异步迭代器的下一个值,可以轻松地使用 Observable 来使用异步迭代器。然后,只要您准备好使用下一个项目,您就可以在主题上调用 next。这是一个代码示例

    const pull = new Subject();
    const output = pull.pipe(
      concatMap(() => from(iter.next())),
      map(val => { 
        if(val.done) pull.complete();
        return val.value;
      })
    );
    //wherever you need this 
    output.pipe(
    
    ).subscribe(() => {
      //we're ready for the next item
      if(!pull.closed) pull.next();
    });
    
    

    【讨论】:

    • 从消费的角度来看,Observable 能做什么而 AsyncIterable 不能?
    • 我正在寻找一些实用的东西,有代码示例
    【解决方案3】:

    This 是当前的实现Observable[Symbol.asyncIterator]

    这是在数组上实现的Symbol.asyncIterator 的基本示例:

    const dummyPromise = (val, time) => new Promise(res => setTimeout(res, time * 1000, val));
    
    const items = [1, 2, 3];
    
    items[Symbol.asyncIterator] = async function * () {
      yield * await this.map(v => dummyPromise(v, v));
    }
    
    !(async () => {
      for await (const value of items) {
    
      console.log(value);
    }
    })();
    /* 
    1 - after 1s
    2 - after 2s
    3 - after 3s
    */
    

    我对生成器(sync 生成器)的理解是它们是可暂停函数,这意味着您可以立即请求一个值,然后在 10 秒后请求另一个值。异步生成器遵循相同的方法,除了它们生成的值是异步,这意味着您必须await 才能获得它。

    例如:

    const dummyPromise = (val, time) => new Promise(res => setTimeout(res, time * 1000, val));
    
    const items = [1, 2, 3];
    items[Symbol.asyncIterator] = async function * () {
      yield * await this.map(v => dummyPromise(v, v));
    }
    
    const it = items[Symbol.asyncIterator]();
    
    (async () => {
      // console.log(await it.next())
      await it.next();
    
      setTimeout(async () => {
        console.log(await it.next());
      }, 2000); // It will take 4s in total
    })();
    

    回到Observable的实现:

    async function* coroutine<T>(source: Observable<T>) {
      const deferreds: Deferred<IteratorResult<T>>[] = [];
      const values: T[] = [];
      let hasError = false;
      let error: any = null;
      let completed = false;
    
      const subs = source.subscribe({
        next: value => {
          if (deferreds.length > 0) {
            deferreds.shift()!.resolve({ value, done: false });
          } else {
            values.push(value);
          }
        },
        error: err => { /* ... */ },
        complete: () => { /* ... */ },
      });
    
      try {
        while (true) {
          if (values.length > 0) {
            yield values.shift();
          } else if (completed) {
            return;
          } else if (hasError) {
            throw error;
          } else {
            const d = new Deferred<IteratorResult<T>>();
            deferreds.push(d);
            const result = await d.promise;
            if (result.done) {
              return;
            } else {
              yield result.value;
            }
          }
        }
      } catch (err) {
        throw err;
      } finally {
        subs.unsubscribe();
      }
    }
    

    据我了解:

    • values 用于跟踪同步值 如果您有of(1, 2, 3),则values 数组将在到达while(true) { } 之前包含[1, 2, 3]。而且因为您使用的是for await (const v of ...), 你会像在 it.next(); it.next(); it.next() ... 一样请求值。

      换句话说,一旦您可以从迭代器中使用一个值,您就会立即请求下一个值,直到数据生产者无法提供任何东西。

    • deferreds 用于异步值 所以在你的第一个 it.next()values 数组是空的(意味着 observable 没有同步发出),所以它会回退到最后一个 else,它只是创建一个添加到 deferreds 的承诺, 之后,该承诺为 awaited,直到 resolvesrejects

      当 observable 最终发出时,deferreds 不会为空,因此等待的 promise 将 resolve 与新到达的值。

    const src$ = merge(
      timer(1000).pipe(mapTo(1)),
      timer(2000).pipe(mapTo(2)),
      timer(3000).pipe(mapTo(3)),
    );
    
    !(async () => {
      for await (const value of src$) {
        console.log(value);
      }
    })();
    

    StackBlitz

    【讨论】:

    • 当自己已经在处理一系列承诺时,implement the async-iterator interface yourself 比使用异步生成器函数更容易(也更有效)。
    • 有趣!你的回答让我想起了数据加载器。谢谢你。我个人觉得这种方法更容易掌握,虽然它可能不是最有效的
    猜你喜欢
    • 2021-06-12
    • 1970-01-01
    • 1970-01-01
    • 2010-10-22
    • 1970-01-01
    • 2021-04-09
    • 2012-05-08
    • 2011-03-06
    • 2020-02-16
    相关资源
    最近更新 更多