【问题标题】:Metrics from multiple threads来自多个线程的指标
【发布时间】:2011-05-18 13:44:39
【问题描述】:

所以这似乎是一个非常常见的用例,也许我想多了,但我在保持来自多个线程的集中度量时遇到了问题。假设我有多个工作线程都在处理记录,并且每 1000 条记录我想吐出一些指标。现在我可以让每个线程记录单独的指标,然后获取吞吐量数字,但我必须手动将它们相加(当然时间界限并不准确)。下面是一个简单的例子:

public class Worker implements Runnable {

   private static int count = 0;
   private static long processingTime = 0;

   public void run() {
       while (true) {
          ...get record
          count++;
          long start = System.currentTimeMillis();
          ...do work
          long end = System.currentTimeMillis();
          processingTime += (end-start);
          if (count % 1000 == 0) {
              ... log some metrics
              processingTime = 0;
              count = 0;
          }
       }
    }
}

希望这有点道理。我也知道这两个静态变量可能是 AtomicInteger 和 AtomicLong 。 . .但也许不是。对人们有什么样的想法感兴趣。我曾考虑过使用原子变量并使用 ReeantrantReadWriteLock - 但我真的不希望指标停止处理流程(即指标对处理的影响应该非常小)。谢谢。

【问题讨论】:

    标签: java concurrency metrics


    【解决方案1】:

    将实际处理任务转移到另一个线程可能是个好主意。这个想法是封装您的数据并快速将其传递给处理线程,从而最大限度地减少对正在执行有意义工作的线程的影响。

    有一个小的切换争用,但这个成本通常比任何其他类型的同步要小很多,在许多情况下它应该是一个很好的候选者。我认为 M. Jessup 的解决方案与我的非常接近,但希望以下代码清楚地说明了这一点。

    public class Worker implements Runnable {
    
       private static final Metrics metrics = new Metrics();
    
       public void run() {
          while (true) {
            ...get record
            long start = System.currentTimeMillis();
            ...do work
            long end = System.currentTimeMillis();
            // process the metric asynchronously
            metrics.addMetric(end - start);
         }
      }
    
      private static final class Metrics {
         // a single "background" thread that actually handles
         // processing
         private final ExecutorService metricThread = 
               Executors.newSingleThreadExecutor();
         // data (no synchronization needed)
         private int count = 0;
         private long processingTime = 0;
    
         public void addMetric(final long time) {
            metricThread.execute(new Runnable() {
               public void run() {
                  count++;
                  processingTime += time;
                  if (count % 1000 == 0) {
                     ... log some metrics
                     processingTime = 0;
                     count = 0;
                  }
               }
            });
          }
       }
    }
    

    【讨论】:

    • +1 但我总是想知道这样的解决方案。因为实际上正在实现你想要的?也就是说,从操作系统抢占线程以允许此日志记录的成本是否会减少其他工作线程正在实现的指标数量(如果 number_of_working_threads > cores_on_machine,这只是一个有效的问题)。而不是在您已经获得指标后在同一个线程上执行此操作。
    • 没错,这在某些条件下效果最好。我会将您的条件稍微修改为 number_of_CPU_busy_threads > cores_on_machine。实际上,许多线程的大部分处理时间都处于空闲状态(为 I/O 阻塞等)。在大多数情况下,真正活跃的线程数不会超过核心数,否则无论如何你实际上已经超过了 CPU 容量。如果操作本质上是“序列化”(例如记录到文件),这种模式也很有效。
    • 我真的很喜欢这个答案 - 不能说它会如何影响处理时间,但我的猜测是指标线程将保持相当繁忙,而不会实际影响正在运行的记录处理器。
    【解决方案2】:

    我建议如果您不希望日志记录干扰处理,您应该有一个单独的日志工作线程,并让您的处理线程简单地提供某种类型的值对象,可以移交。在示例中,我选择了 LinkedBlockingQueue,因为它能够使用 offer() 阻塞很短的时间,并且您可以将阻塞推迟到从队列中提取值的另一个线程。您可能需要在 MetricProcessor 中增加逻辑以根据您的要求对数据等进行排序,但即使这是一个长时间运行的操作,它也不会阻止 VM 线程调度程序同时重新启动实际处理线程。

    public class Worker implements Runnable {
    
      public void run() {
        while (true) {
          ... do some stuff
          if (count % 1000 == 0) {
            ... log some metrics
            if(MetricProcessor.getInstance().addMetrics(
                new Metrics(processingTime, count, ...)) {
              processingTime = 0;
              count = 0;
            } else {
              //the call would have blocked for a more significant
              //amount of time, here the results
              //could be abandoned or just held and attempted again
              //as a larger data set later
            }
          }
        }
      }
    }
    
    public class WorkerMetrics {
      ...some interesting data
      public WorkerMetrics(... data){
        ...
      }
      ...getter setters etc
    }
    
    public class MetricProcessor implements Runnable {
      LinkedBlockingQueue metrics = new LinkedBlockingQueue();
      public boolean addMetrics(WorkerMetrics m) {
        return metrics.offer(m); //This may block, but not for a significant amount of time.
      }
    
      public void run() {
        while(true) {
          WorkMetrics m = metrics.take(); //wait here for something to come in
          //the above call does all the significant blocking without
          //interrupting the real processing
          ...do some actual logging, aggregation, etc of the metrics
        }
      }
    }
    

    【讨论】:

    • LinkedBlockingQueue 在添加时肯定会阻塞。你在考虑 ConcurrentLinkedQueue 吗? drainTo 也会阻塞。添加线程的情况绕过了原来的问题,无论你做什么,你都会有某种多线程的序列化。在我看来,添加另一个线程没有用。
    • 是的,它会阻止提供,但是与它可能阻止的任何时间相比,如果它正在执行完整的指标记录,它阻止的时间是微不足道的。 Op 做了声明“但我真的不希望指标停止处理流程”,所以在这里我们委托以允许处理流程继续。并且确定 drainTo 块,但如果那是在 Metrics 处理中,则无关紧要,因为它与主处理分开。如果您没有另一个线程,如何在不阻塞一个或多个处理线程的情况下进行序列化?
    • 如果为记录指标完成了大量工作,您可以在另一个线程中设置参数。但是您应该更新您的答案并删除 add、offer 和 take 不会阻止的陈述。这对 OP 没有帮助,因为它不是事实。
    • 点了,更新了答案以更清楚地表明报价可能会短暂阻止。
    • 基本上我认为这与最佳答案相同 - 在后台 Executor 服务使用与此非常相似的 BlockingQueue - 但隐藏了一些复杂性。一个很好的答案,但我认为另一个更优雅。
    【解决方案3】:

    如果您依赖 count 的状态和 processingTime 的状态来同步,那么您将不得不使用 Lock。例如,当++count % 1000 == 0 为真时,您想评估当时处理时间的指标。

    在这种情况下,使用 ReentrantLock 是有意义的。我不会使用 RRWL,因为实际上并没有发生纯读取的实例。它始终是一个读/写集。但是你需要锁定所有的

      count++
      processingTime += (end-start);
      if (count % 1000 == 0) {
          ... log some metrics
          processingTime = 0;
          count = 0;
      }
    

    无论 count++ 是否会在该位置,您也需要锁定该位置。 最后,如果您使用的是 Lock,则不需要 AtomicLong 和 AtomicInteger。它只是增加了开销并且不是线程安全的。

    【讨论】:

    • 我相信我在实际问题中说过。
    • 当你写的时候,“但也许不是”我以为你的意思是你会因为不使用 AtomicLong 来承受并发性的打击 :)
    猜你喜欢
    • 2021-12-21
    • 1970-01-01
    • 1970-01-01
    • 2022-01-01
    • 2020-10-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多