【问题标题】:Merging observable collection with collection of observables将可观察集合与可观察集合合并
【发布时间】:2026-02-24 03:45:01
【问题描述】:

我有一个返回可观察对象集合的 API 调用。对于该可观察集合中的每个对象,我需要执行另一个 API 调用以获取可观察值。

将 observable 集合与 observables 集合合并的正确方法是什么,这样结果是一个 observable 集合,我们可以去掉额外的数据?

IE: (1:n) -> 1

具体来说

// getMarkets API response:
[   {
    "data": [
      {
        "mkt_name": "X",
        ...
      },
      {...}, {...}, {...} 
      //there is a lot of these, cardinality changes over time
    ]}
]
//getTicker(market=X) API response:
[   {
    "data": [
      {
        "last_trade": "651.4000000000",
        ...
      }
      //there is only one of this, "last_trade" value changes a lot more over time
    ],
    "notifications": []
    }
]

我需要

//getMarketsWithTickers Service response (! Also Observable !)
[ 
  {"market" : "X", "last_trade" : "651.4000000000" },  
  {...}, 
  {...}, 
  {...}
]

如何使用getMarkets/getTicker observables 实现getMarketWithTickers

注意:我对这整个 RxJS 事物是新手。

谢谢

【问题讨论】:

    标签: angular typescript rxjs observable


    【解决方案1】:

    这是一个完整的可运行示例,展示了您可以做什么;

    import { Observable } from 'rxjs/Observable';
    import 'rxjs/add/observable/of';
    import 'rxjs/add/operator/switchMap';
    import 'rxjs/add/observable/forkJoin';
    import 'rxjs/add/operator/map';
    
    // what you already have (simulated):
    interface Market {
      name: string;
    }
    
    interface Trade {
      lastTrade: string;
    }
    
    function getMarkets() {
      return Observable.of({data: [{name: 'm1'}, {name: 'm2'}, {name: 'm3'}] as Array<Market>})
    }
    
    function getTicker(marketName: string) {
      return Observable.of({data: {lastTrade: marketName.substring(1)}});
    }
    
    // what you can do:
    getMarkets()
      .map(obj => obj.data.map(market => market.name)) //1
      .switchMap(marketNames =>
        Observable.forkJoin( // 4
          ...marketNames.map(market => getTicker(market) // 2
          .map(ticker => +ticker.data.lastTrade))) // 3
      )
      .subscribe(trades => console.log(trades.join(', ')));
    

    简短说明(阅读文档了解更多详细信息):

    1. 您使用地图运算符将可观察的市场集合转换为可观察的市场名称
    2. 当您收到市场名称数组时,您为每个市场名称创建一个 Ticker 的 observable。
    3. Ticker 的每个 observable 都转换为 lastTrade 的 observable,再次使用 map 运算符
    4. 您使用 forkJoin 方法创建一个可观察对象,该可观察对象发出一个数组,该数组包含 lastTrade 的每个可观察对象的(最后一个)发出事件

    【讨论】:

    • 我正试图围绕forkJoin 方法中的语法进行思考:前面的... 是否表明它实际上是一个可观察对象的集合? - 无论如何,我现在还有其他类似HTTP 429: too many requests 的问题。您对如何限制这个东西有任何见解,以免它淹没服务器吗? getTicker 请求?谢谢
    • 关于第二个问题:我希望浏览器为您限制请求:它通常只允许对给定主机的 N 个并发请求。您可以检查一下,并检查为什么您的服务器拒绝这些请求,因为我不确定一个接一个地放弃请求会解决任何问题。
    • 好的,谢谢。实际上,我只是看到了一些关于 Websockets API 的文档,尤其是一些叫做 SocketCluster 的文档。我的理解是,它的工作方式与上面的代码几乎相同,但端点实际上是用于订阅(我正在使用的当前 API 是 REST API,因此在提交大量请求时会出现错误,但提供者也暴露了一个websocket API,我认为它没有这个问题,因为它是一个订阅模型)。无论如何,谢谢,对不起,如果我听起来很新,我习惯用 C 编写后端代码。