【问题标题】:Java, Multi Threading, ExecutorServiceJava,多线程,ExecutorService
【发布时间】:2012-09-06 17:16:52
【问题描述】:

以下是我的代码的某些部分,它使用了线程。目的是从数据库中检索所有记录(大约 5,00,000 条)并向它们发送警报电子邮件消息。我面临的问题是变量 emailRecords 变得非常繁重,并且花费了太多时间来发送电子邮件。如何通过使用多线程来快速处理,以便并行处理 5,00,000 条记录?我尝试使用 ExecutorService 但在实现时感到困惑。我在方法 checkName()、getRecords() 和 sendAlert() 中搞混了。所有这三种方法都被相关地使用。那么,executorService在哪里使用呢??

请向我提供如何处理以下代码以及需要编辑哪个部分的建议?提前致谢!!

public class sampledaemon implements Runnable {

    private static List<String[]> emailRecords = new ArrayList<String[]>();

    public static void main(String[] args) {
        if (args.length != 1) {
            return;
        }

        countryName = args[0];

        try {
            Thread t = null;
            sampledaemon daemon = new sampledaemon();
            t = new Thread(daemon);
            t.start();
        } catch (Exception e) {
            e.printStackTrace()
        }

    }

    public void run() {
        Thread thisThread = Thread.currentThread();
        try {
            while (true) {
                checkName(countryName);
                Thread.sleep(TimeUnit.SECONDS.toMillis(10));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void checkName(String countryName) throws Exception {
        Country country = CountryPojo.getDetails(countryName)

        if (country != null) {
            getRecords(countryconnection);
        }
    }

    private void getRecords(Country country, Connection con) {
        String users[] = null;
        while (rs.next()) {
            users = new String[2];
            users[0] = rs.getString("userid");
            users[1] = rs.getString("emailAddress");
            emailRecords.add(props);

            if (emailRecords.size() > 0) {
                sendAlert(date, con);
            }
        }
    }

    void sendAlert(String date, Connection con) {
        for (int k = 0; k < emailRecords.size(); k++) {
            //check the emailRecords and send email 
        }
    }
}

【问题讨论】:

标签: java multithreading executorservice


【解决方案1】:

据我所知,您很可能是单线程数据检索,而多线程用于电子邮件发送。粗略地说,您将循环浏览您的结果集并建立一个记录列表。当该列表达到一定大小时,您制作一个副本并将该副本发送到线程中进行处理,并清除原始列表。在结果集的末尾,检查列表中是否有未处理的记录,并将其发送到池中。

最后,等待线程池处理完所有记录。

类似的东西:

protected void processRecords(String countryName) {
  ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, 
        new ArrayBlockingQueue<Runnable>(5), new ThreadPoolExecutor.CallerRunsPolicy());

   List<String[]> emaillist = new ArrayList<String>(1000);

   ResultSet rs = ....

   try {
     while (rs.next()) {
        String user[] = new String[2];
        users[0] = rs.getString("userid");
        users[1] = rs.getString("emailAddress");

        emaillist.add(user);
        if (emaillist.size() == 1000) {
            final List<String[]> elist = new ArrayList<String[]>(emaillist);
            executor.execute(new Runnable() {
                public void run() {
                    sendMail(elist);
                }
            }
            emaillist.clear();
        }
     }
   }
   finally {
     DbUtils.close(rs);
   }

   if (! emaillist.isEmpty()) {
            final List<String[]> elist = emaillist;
            executor.execute(new Runnable() {
                public void run() {
                    sendMail(elist);
                }
            }
            emaillist.clear();
   }

   // wait for all the e-mails to finish.
   while (! executor.isTerminated()) {
       executor.shutdown();
       executor.awaitTermination(10, TimeUnit.DAYS);
   }


}

【讨论】:

  • 非常感谢。有些问题,当一次从数据库中检索 10000 条(我限制了 sql 查询)记录时,只有 1000 条被加载到电子邮件列表并被处理。我们已经提到了 10 个最大线程,那么一次会有 10 个线程处理这 1000 条记录吗?这样在单线程中,如果 1 条记录需要 2 秒来处理,则 10,000 条记录将需要 20,000 秒。但是现在实施后,10条记录需要2秒,10,000条需要2000秒?并且在处理完 1000 条记录后,它会加载另外 1000 条记录并继续直到达到 10,000 条??
  • 按照它的设置方式,您最多可以同时执行 11 个 1000 块。 10 个在线程池中,1 个在主线程中(根据 CallerRunsPolicy)。您可以很好地进行不同的设置(例如使用 SynchronousQueue 而不是 ArrayBlockingQueue)以阻止提交,直到线程可用,但我不想对此进行全面讨论,因为您可以通过谷歌搜索到很多很棒的资源找到这些东西。
  • PS:除了有 11 个并发执行的块之外,您还可以有 5 个块在队列中(我给 ArrayBlockingQueue 的容量为 5)。当队列填满并提交另一个项目时,CallerRunsPolicy 就会启动。因此,即使有 100 万条记录,您一次也只能加载 16 个块。
  • 谢谢。目前我将使用 ArrayBlockingQueue 并研究 SynchronousQueue。另外,请您指导我如何检查哪个线程需要多少时间来处理?
【解决方案2】:

使用FixedThreadPool 的好处是您不必一次又一次地进行昂贵的线程创建过程,它一开始就完成 ...见下文..

ExecutorService executor = Executors.newFixedThreadPool(100);

ArrayList<String> arList =  Here your Email addresses from DB will go in ;

for(String s : arList){

     executor.execute(new EmailAlert(s));

 }



public class EmailAlert implements Runnable{

  String addr;

   public EmailAlert(String eAddr){


         this.addr = eAddr;

       }


  public void run(){


     // Do the process of sending the email here..

  }

 }

【讨论】:

  • 我认为 OP 试图避免的问题是必须先将列表中的所有记录排队,然后再对该列表进行任何工作(如发送电子邮件)
  • @Kumar,谢谢。但是 arList 只能在 Country 对象验证后才能填充,它会检索该特定国家的电子邮件地址。这就是为什么它在验证后从 checkName() 调用的方法 getResult() 中的原因。我被困在如何实现包含所有这些方法的 executorService ......或者至少并行发送电子邮件。
  • @MadanMadan 你可以做一些事情,比如验证 Country 对象,然后检索它的电子邮件并将其存储在 ArrayList 中......直到你完成了这一切......然后使用我提到的 Executors上面使这些电子邮件并行发送......或者你可以做这样的事情......在验证单个国家对象并检索其电子邮件后,将其发送到执行者以执行,同时继续检索电子邮件。 ..
  • @kumar 你的意思是在getRecords()方法里面我应该在调用sendAlert()方法时使用ExecutorService??
【解决方案3】:

创建第二个线程来完成所有工作而不是在主线程中完成相同的工作并不能帮助您避免在处理任何一个之前用 500 万条记录填充 emailRecords 列表的问题他们。

听起来您的目标是能够从数据库中读取数据并并行发送电子邮件。与其担心代码,不如先为您想要完成的工作考虑一种算法。像这样的:

  1. 在一个线程中,从数据库中查询记录,为每个结果,添加一个作业到 ExecutorService
  2. 该工作向一个人/地址/记录发送电子邮件。

或者

  1. 分批从数据库中读取记录 N(50、100、1000 等)
  2. 将每个批次提交给 executorService

【讨论】:

  • 所以你发布的代码根本不适合算法。您通常需要至少两个类来处理这个问题:db-querying 逻辑(它具有对 ExecutorService 的引用)和 Runnable/Callable 的子类,您将在第一个类中使用它需要的所有数据进行实例化。工作并将其传递给ExecutorService.submit()
  • @matt谢谢。我可以一次将查询结果限制为 10000 个。但是我很困惑在哪里使用 ExecutorService?在代码中, 1. 线程的公共 void run() 方法调用 checkName() 方法,其中 Country 对象被初始化。 2. checkName() 方法调用 getRecords() 方法,其中数据库查询运行并填充 emailRecords 3. 如果 emailRecords>0,则调用 sendAlert() 方法。我被困在这里。一次检索 10,000 条记录,但是我们如何将 executorService 用于方法 sendAlert() 以便并行使用而不会发生冲突?如果可以的话,请举个例子..
  • 在我上一条评论中,我提到了如何构建它。扔掉旧代码和对方法 A 调用 B 调用 C 调用 D 的担忧。让处理 DB 查询的类引用 executorService 并在处理查询结果时向其提交新作业。
猜你喜欢
  • 2020-09-24
  • 1970-01-01
  • 2019-03-15
  • 2011-11-01
  • 2011-08-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-08-17
相关资源
最近更新 更多