【问题标题】:Apache Beam Counter/Metrics not available in Flink WebUIApache Beam Counter/Metrics 在 Flink WebUI 中不可用
【发布时间】:2018-08-07 09:47:22
【问题描述】:

我正在使用 Flink 1.4.1 和 Beam 2.3.0,想知道是否可以在 Flink WebUI(或任何地方)中提供指标,就像在 Dataflow WebUI 中一样?

我使用过类似的计数器:

import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
...
Counter elementsRead = Metrics.counter(getClass(), "elements_read");
...
elementsRead.inc();

但我在 Flink WebUI 中的任何地方(任务指标或累加器)都找不到 "elements_read" 计数。我认为在BEAM-773 之后这会很简单。

【问题讨论】:

    标签: java apache-flink metrics apache-beam


    【解决方案1】:

    在仪表板中选择作业后,您将看到该作业的 DAG,并且 DAG 下方有一个选项卡列表。

    1. 点击“任务指标”标签
    2. 点击你的 DAG 框
    3. 单击“添加指标”按钮,以显示该运算符指标

    【讨论】:

    • 试过了,但没有运气。我的计数器不在指标列表中。您是如何创建 Beam 计数器/指标的?
    • 嗯...你能在累加器标签中看到你的计数器吗?
    • @robosoul,有什么进展吗?我也面临同样的问题:我能看到的只是标准指标,没有我的自定义指标的迹象。
    • @diegoreico .. 我可以在 Accumulators 选项卡中看到指标,但在 Metrics 选项卡中看不到。我正在使用 Flink 版本:1.12.0 .. 使用最新的 Apache Beam Master 分支代码..跨度>
    【解决方案2】:

    如果您的管道在分离模式下运行,则不支持查询指标。参考this

    public class FlinkDetachedRunnerResult implements PipelineResult {
    
      FlinkDetachedRunnerResult() {}
    
      @Override
      public State getState() {
        return State.UNKNOWN;
      }
    
      @Override
      public MetricResults metrics() {
        throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
      }
    
      @Override
      public State cancel() throws IOException {
        throw new UnsupportedOperationException("Cancelling is not yet supported.");
      }
    
      @Override
      public State waitUntilFinish() {
        return State.UNKNOWN;
      }
    
      @Override
      public State waitUntilFinish(Duration duration) {
        return State.UNKNOWN;
      }
    
      @Override
      public String toString() {
        return "FlinkDetachedRunnerResult{}";
      }
    }
    

    但是,我可以使用 slf4j reporter 查看指标

    【讨论】:

    • @zorro 您如何通过 slf4j 记者查看指标?
    【解决方案3】:
    from apache_beam.metrics.metric import Metrics
    from apache_beam.metrics.metric import MetricsFilter
    from apache_beam.options.pipeline_options import PipelineOptions
    import apache_beam as beam
    import csv
    import logging
    
    GAME_DATA = [
    'user1_team1,team1,18,1447686663000,2015-11-16 15:11:03.921',
    'user1_team1,team1,18,1447690263000,2015-11-16 16:11:03.921',
    'user2_team2,team2,2,1447690263000,2015-11-16 16:11:03.955',
    'user3_team3,team3,8,1447690263000,2015-11-16 16:11:03.955',
    'user4_team3,team3,5,1447690263000,2015-11-16 16:11:03.959',
    'user1_team1,team1,14,1447697463000,2015-11-16 18:11:03.955',
    'robot1_team1,team1,9000,1447697463000,2015-11-16 18:11:03.955',
    'robot2_team2,team2,1,1447697463000,2015-11-16 20:11:03.955',
    'robot2_team2,team2,9000,1447697463000,2015-11-16 21:11:03.955',
    'robot1_team1,1000,2447697463000,2915-11-16 21:11:03.955',
    'robot2_team2,9000,1447697463000,2015-11-16 21:11:03.955']
    
    class ParseGameEventFn(beam.DoFn):
        def __init__(self):
            super(ParseGameEventFn, self).__init__()
        self.game_events = Metrics.counter(self.__class__, 'game_events')
    
        def process(self, element, *args, **kwargs):
            try:
                self.game_events.inc()
                row = list(csv.reader([element]))[0]
                if int(row[2]) < 5:
                   return
                yield {
                    'user': row[0],
                    'team': row[1],
                    'score': int(row[2]),
                    'timestamp': int(row[3]) / 1000.0,
                }
            except Exception as ex:
                logging.error('Parse error on {}: {}'.format(element, ex))
    
    with beam.Pipeline(options=pipeline_options) as pipeline:
        results = (
            pipeline
            | "Create" >> beam.Create(GAME_DATA)
            | "Parsing" >> beam.ParDo(ParseGameEventFn())
            | "AddEventTimestamps" >> beam.Map(
                 lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
            | "Print" >> beam.Map(print))
    
    metric_results = pipeline.result.metrics().query(MetricsFilter().with_name('game_events'))
    outputs_user_counter = metric_results['counters'][0]
    print(outputs_user_counter.committed)
    

    conf/flink-conf.yaml 中 Prometheus 的 Flink 配置

    metrics.reporters: prom
    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    metrics.reporter.prom.port: 9250-9260
    

    我可以在 Accumulators 选项卡中看到指标,但在 Metrics 选项卡中看不到。我使用的是 Flink 版本:1.12.0。使用最新的 Apache Beam 主分支代码。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多