【问题标题】:Calculate time taken by worker threads while processing messages计算工作线程在处理消息时所花费的时间
【发布时间】:2019-10-23 16:38:33
【问题描述】:

检查我的工作线程从接收消息进行处理后运行了多长时间,然后在超过阈值时间限制时记录错误消息的最佳方法是什么。我认为需要在 WorkerManager 类中进行管理。

  1. 我的 WorkerManager 启动了工作线程
  2. 如果有来自提供者的消息,则工作线程通过调用服务类来处理它们。
  3. 如果没有消息,则它会进入休眠状态。

当我的工作线程正在处理消息时,如果处理时间超过 5 分钟,那么我想生成警告消息,但仍让工作线程继续处理。

问题

我想不断检查我的工作线程是否超过了对消息的处理时间 5 分钟,如果它们超过了阈值时间,那么我想记录一条错误消息,但仍然让工作线程按原样继续。

WorkerManager 类

public class WorkerManager implements Runnable {

    private MyWorker[] workers;
    private int workerCount;
    private boolean stopRequested;

    public WorkerManager(int count){
        this.workerCount = count;
    }

    @Override
    public void run(){
        stopRequested = false;
        boolean managerStarted = false;

        while (!stopRequested) {
            if(!managerStarted) {
                workers = new MyWorker[workerCount];
                for (int i = 0; i < workerCount; i++) {
                    final Thread workerThread = new Thread(workers[i], "Worker-" + (i + 1));
                    workerThread.start();
                }
                managerStarted = true;
            }
        }
    }

    public void stop(){
        stopRequested = true;
    }

    //Calll this
    public void cleanUpOnExit() {
        for(MyWorker w: workers){
            w.setStopRequested();
        }
    }
}

工人阶级

   public class MyWorker implements Runnable {

    private final int WAIT_INTERVAL = 200;
    private MyService myService;
    private MyProvider myProvider;
    private boolean stopRequested = false;

    public MyWorker(MyService myService, MyProvider myProvider){
        this.myService = myService;
        this.myProvider = myProvider;
    }

    public void setStopRequested() {
        stopRequested = true;
    }

    @Override
    public void run() {

        while (!stopRequested) {
            boolean processedMessage = false;
            List<Message> messages = myProvider.getPendingMessages();
            if (messages.size() != 0) {
                AdapterLog.debug("We have " + messages.size() + " messages");
                processedMessage = true;
                for (Message message : messages) {
                    processMessage(messages);
                }
            }

            if (!(processedMessage || stopRequested)) {
                // this is to stop the thread from spinning when there are no messages
                try {
                    Thread.sleep(WAIT_INTERVAL);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private void processMessage(Message messages){
        myService.process(messages);
    }
}

【问题讨论】:

    标签: java multithreading


    【解决方案1】:

    您的WorkerManager 需要一种方法来确定每个工作人员的最后一条消息的处理时间。因此工作人员需要跟踪最后处理消息的时间戳。

    然后,您的WorkerManager 可以检查每个工作人员的时间戳并在需要时生成警告。为了使用给定时间段检查工作人员,您可以使用执行器:

        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutorService.scheduleAtFixedRate(this::checkTimeoutProcessingMessages, 5l, 5l, TimeUnit.SECONDS);
    

    您可以检查从每个工作人员那里获取时间戳的时间:

        public void checkTimeoutProcessingMessages() {
            for (MyWorker worker : workers) {
                long lastProcessed = worker.getLastProcessedMessageTimestamp();
                long currentTimestamp = System.currentTimeMillis();
                if (lastProcessed + 5000 > currentTimestamp) {
                    //warn message
                }
            }
        }
    

    【讨论】:

    • 您的回答确实提供了解决方案,但并不完全。我想跟踪我的工作人员在调用服务时的处理时间。我无法让我的代码按预期工作,你能看看我是否遗漏了什么吗? - stackoverflow.com/questions/58612770/…
    猜你喜欢
    • 1970-01-01
    • 2014-05-19
    • 1970-01-01
    • 1970-01-01
    • 2016-09-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-19
    相关资源
    最近更新 更多