【问题标题】:Java: Synchronize two scheduled threads on ScheduledExecutorServiceJava:在 ScheduledExecutorService 上同步两个计划线程
【发布时间】:2019-05-13 12:33:23
【问题描述】:

我想创建一个程序:

  • 执行一项长时间运行的任务,比如说 ChildJob
  • 暂停主要任务 ChildJob,并每隔几秒提交一次完成的工作

我想出了这段代码,它调度两个线程,每隔几秒提交一次(目前提交部分是一个 sop 语句),并且并行运行 ChildJob。

我面临的问题是,我无法正确同步两个线程。

  • 在调用 commit() 时,ChildJob 线程继续处理。如何让 ChildJob 线程等待?

我知道使 process() 方法同步不是一种选择,因为在这种情况下,commit() 作业甚至不会运行。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.Random;
import java.util.Date;


class Main {

    // Processing thread, commiting thread
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

    public static void main(String[] args) {

        Main app = new Main();
        ChildJob workUnit = new ChildJob();

        Thread childJobThread = new Thread(new Runnable() 
        { 
            @Override
            public void run() 
            { 
                System.out.println("childJobThread: "+ Thread.currentThread().getName()+" Start. Time = " + new Date());
                try{ workUnit.process(); }
                catch(InterruptedException e){ e.printStackTrace(); } 
            } 
        }); 
        Thread committerThread = new Thread(new Runnable() 
        { 
            @Override
            public void run() 
            { 
                try{ workUnit.commit();  }
                catch(InterruptedException e){ e.printStackTrace(); } 
            } 
        }); 

        final ScheduledFuture<?> commitHandle = app.scheduler.scheduleAtFixedRate(committerThread, 1, 8, TimeUnit.SECONDS);
        final ScheduledFuture<?> jobHandle = app.scheduler.schedule(childJobThread, 0, TimeUnit.SECONDS);
        /*
        Makes the commitHandle run for 60 * 60 seconds, not needed, manually terminating currently.
        app.scheduler.schedule(new Runnable() {
                public void run() {
                    commitHandle.cancel(true);
                }
            }, 
            60 * 60, TimeUnit.SECONDS);*/
    }
}

class ChildJob {
    private int i = 0;

    public void process() throws InterruptedException 
    { 

         // synchronized(this) 
         // {   
            while(true) {
                System.out.println("ChildJob processing at: " + Thread.currentThread().getName() + " : " + new Date() + "----------: " + i++);
                Thread.sleep(1000);
            }
        //} 
    } 

    public void commit() throws InterruptedException 
    {   
        synchronized(this) 
        { 
            System.out.println("\ncommitterThread: " + Thread.currentThread().getName()+" Start. Time = " + new Date());
            // 3s sleep to check consistency from processing.
            Thread.sleep(3000);
            System.out.println("committerThread: " + Thread.currentThread().getName()+"     End. Time = " + new Date() + "\n");
        } 
    } 
}

现在的结果:

childJobThread: pool-1-thread-1 Start. Time = Mon May 13 17:51:31 IST 2019
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:31 IST 2019----------: 0

committerThread: pool-1-thread-2 Start. Time = Mon May 13 17:51:32 IST 2019
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:32 IST 2019----------: 1
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:33 IST 2019----------: 2
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:34 IST 2019----------: 3
committerThread: pool-1-thread-2     End. Time = Mon May 13 17:51:35 IST 2019

ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:35 IST 2019----------: 4
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:36 IST 2019----------: 5
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:37 IST 2019----------: 6
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:38 IST 2019----------: 7
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:39 IST 2019----------: 8

committerThread: pool-1-thread-2 Start. Time = Mon May 13 17:51:40 IST 2019
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:40 IST 2019----------: 9
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:41 IST 2019----------: 10
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:43 IST 2019----------: 11
committerThread: pool-1-thread-2     End. Time = Mon May 13 17:51:43 IST 2019

ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:44 IST 2019----------: 12
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:45 IST 2019----------: 13
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:46 IST 2019----------: 14
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:47 IST 2019----------: 15
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:48 IST 2019----------: 16

想要的结果:

childJobThread: pool-1-thread-1 Start. Time = Mon May 13 17:51:31 IST 2019
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:31 IST 2019----------: 0

committerThread: pool-1-thread-2 Start. Time = Mon May 13 17:51:32 IST 2019
committerThread: pool-1-thread-2     End. Time = Mon May 13 17:51:35 IST 2019

ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:35 IST 2019----------: 1
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:36 IST 2019----------: 2
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:37 IST 2019----------: 3
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:38 IST 2019----------: 4
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:39 IST 2019----------: 5

committerThread: pool-1-thread-2 Start. Time = Mon May 13 17:51:40 IST 2019
committerThread: pool-1-thread-2     End. Time = Mon May 13 17:51:43 IST 2019

ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:44 IST 2019----------: 6
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:45 IST 2019----------: 7
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:46 IST 2019----------: 8
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:47 IST 2019----------: 9
ChildJob processing at: pool-1-thread-1 : Mon May 13 17:51:48 IST 2019----------: 10

【问题讨论】:

  • 为什么要暂停线程并从另一个提交?您可以从 ChildJob 线程提交 - 更简单可靠。
  • 因为,我需要根据时间使我的提交事件保持一致。说,每(n)秒。而且 ChildJob.process() 方法可以根据处理量放大或缩小,因此在代码中没有任何意义可以放置 commit() 代码。我正在尝试实现检查点/自动保存类型的功能。

标签: java multithreading thread-synchronization scheduledexecutorservice


【解决方案1】:

尝试在while 中进行同步,比如:


while(true) {
  synchronized(this) 
    {   
      System.out.println("ChildJob processing at: " + Thread.currentThread().getName() + " : " + new Date() + "----------: " + i++);
      Thread.sleep(1000);
    }
  } 
}

这个想法是,commit应该有机会闯入job的运行。

样本输出

childJobThread: pool-1-thread-1 Start. Time = Mon May 13 20:42:46 CST 2019
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:42:46 CST 2019----------: 0
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:42:47 CST 2019----------: 1
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:42:48 CST 2019----------: 2
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:42:49 CST 2019----------: 3
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:42:50 CST 2019----------: 4
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:42:52 CST 2019----------: 5
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:42:53 CST 2019----------: 6

committerThread: pool-1-thread-2 Start. Time = Mon May 13 20:42:54 CST 2019
committerThread: pool-1-thread-2     End. Time = Mon May 13 20:42:57 CST 2019

ChildJob processing at: pool-1-thread-1 : Mon May 13 20:42:57 CST 2019----------: 7
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:42:58 CST 2019----------: 8
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:42:59 CST 2019----------: 9
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:00 CST 2019----------: 10
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:01 CST 2019----------: 11

committerThread: pool-1-thread-2 Start. Time = Mon May 13 20:43:02 CST 2019
committerThread: pool-1-thread-2     End. Time = Mon May 13 20:43:05 CST 2019

ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:05 CST 2019----------: 12
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:06 CST 2019----------: 13
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:07 CST 2019----------: 14
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:08 CST 2019----------: 15
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:09 CST 2019----------: 16
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:10 CST 2019----------: 17
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:11 CST 2019----------: 18
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:12 CST 2019----------: 19
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:13 CST 2019----------: 20
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:14 CST 2019----------: 21
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:15 CST 2019----------: 22
ChildJob processing at: pool-1-thread-1 : Mon May 13 20:43:16 CST 2019----------: 23

【讨论】:

  • 这根本行不通。一种更好的方法是使用内置原语,例如reentrant lock、condvars、latch 等。但更好的是完全避免同步问题,通过在一个线程中完成所有操作。多线程的好处是可以进行并行处理。如果您同步并因此序列化处理,您只会增加复杂性并失去好处。所以没有意义。
  • @SvetlinZarev 抱歉,我不太确定“根本不起作用”部分。让它在我的笔记本电脑上运行良好,“提交”在“作业”的运行之间运行,并且在“提交”期间没有“作业”运行。我认为这正是 OP 想要的。
  • 周期性地解锁一个锁然后立即再次锁定它的循环是线程饥饿的秘诀。
  • @SolomonSlow 我承认这不是解决这个问题的最佳解决方案,但它确实有效。另外我认为这种简单的方法可以帮助 OP 理解synchronized,这是一种选择。
  • 基于time 的解决方案本质上是不可靠的。想象一下,您在快速服务器类机器和树莓派上运行相同的应用程序。在服务器上,它每秒可能处理 100k 个元素,但在 ePi 上,它每秒可能处理不到 1 个元素。您在这里看到基于时间的commit 的问题吗?此外,如果计时器线程因 RuntimeException 而失败,它将永远不会被重新调度,因此它永远不会调用commmit
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-08-15
  • 1970-01-01
  • 2016-10-12
  • 1970-01-01
  • 2013-11-12
  • 1970-01-01
相关资源
最近更新 更多