【问题标题】:Hazelcast Jet internal optimizationHazelcast Jet 内部优化
【发布时间】:2018-11-05 16:41:59
【问题描述】:

我检查了 Hazelcast Jet 以满足我的项目需求,但我发现文档在以下主题方面确实含糊不清:

1) 当我对两个列表流执行数据连接时...例如:

BatchStage<Trade> trades = p.drawFrom(list("trades"));
BatchStage<Entry<Integer, Broker>> brokers =    
p.drawFrom(list("brokers"));
BatchStage<Tuple2<Trade, Broker>> joined = trades.hashJoin(brokers,
    joinMapEntries(Trade::brokerId),
    Tuple2::tuple2);
joined.drainTo(Sinks.logger());

那么我可以告诉 Jet 下面实际会发生什么连接吗? 地图侧连接或减少侧连接......?我的意思是想象一下“经纪人”规模很小,而交易规模很大。执行这两组连接的最佳技术是地图侧连接,也就是广播连接....当 Jet 进行连接时,哪些数据将通过网络传输?是否有任何基于尺寸的优化?

2) 我正在测试以下场景:

简单的管道:

private Pipeline createPipeLine() {
    Pipeline p = Pipeline.create();
    BatchStage stage = p.drawFrom(Sources.<Date>list("master"));
    stage.drainTo(Sinks.logger());
    return p;
}

list("master") 不断被集群中的另一个节点填充。现在,当我将此管道提交到集群时,只有列表(“master”)的子集被排入记录器。我可以以某种方式将 Jet 作业设置为不断地将list("master") 排出到标准输出吗?

提前致谢

【问题讨论】:

标签: hazelcast hazelcast-jet


【解决方案1】:
  1. 来自 HashJoin 的 Javadoc:

    在实现上,哈希连接转换针对吞吐量进行了优化 这样每个计算成员都拥有所有丰富内容的本地副本 数据,存储在哈希表中(因此得名)。丰富的溪流是 在从主流中摄取任何数据之前,已完全消耗。

    对于您的示例,broker 列表中的所有项目将首先被所有成员消费,然后 trades 列表将被消费。

  2. IList 是一个批处理源,你需要一个流源来持续消费这些项目。您可以使用IQueue 作为源,这是为队列创建源的简单方法:

    StreamSource<Trade> queueSource = SourceBuilder.<IQueue<Trade>>stream("queueStream", 
            c -> c.jetInstance().getHazelcastInstance().getQueue("trades"))
        .<Trade>fillBufferFn((queue, buf) -> buf.add(queue.poll()))
        .build();
    

【讨论】:

  • 2.关于 IList,我错过了它只是批处理源,我很抱歉。
  • 我成功地将 IMap 配置为 StreamSource,然后按预期工作。 1.“每个计算成员都有一个所有丰富数据的本地副本”这对于如何根据处理数据的大小设计Pipeline作业来说是非常有用的信息。谢谢。
  • This blog post 还可以让您了解如何使用连接/扩充。
猜你喜欢
  • 2023-03-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-11
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多