【问题标题】:How to make a thread wait until a variable reaches a specific value (Multi-threaded Java)如何让线程等待变量达到特定值(多线程 Java)
【发布时间】:2015-01-21 14:07:16
【问题描述】:

我有一个接受客户端连接的服务器程序。这些客户端连接可以属于许多流。例如,两个或多个客户端可以属于同一个流。在这些流中,我必须传递一条消息,但我必须等到所有流都建立起来。为此,我维护了以下数据结构。

ConcurrentHashMap<Integer, AtomicLong> conhasmap = new ConcurrentHashMap<Integer, AtomicLong>();

Integer 是流 ID,Long 是客户端编号。为了使给定流的一个线程等待 AtomicLong 达到特定值,我使用了以下循环。实际上,流的第一个数据包将其放入流 ID 和要等待的连接数。对于每个连接,我都会减少等待的连接数。

while(conhasmap.get(conectionID) != new AtomicLong(0)){
       // Do nothing
}

但是这个循环阻塞了其他线程。根据这个 answer 它进行易失性读取。如何修改代码以等待给定流的正确线程,直到它达到特定值?

【问题讨论】:

  • 为什么不使用 thread.wait() 并在服务器接受新连接后唤醒它?另一个不太优雅的解决方案是睡一会儿并检查值。
  • 使用Condition 可能吗?
  • @PbxMan 我正在等待服务器已经接受的线程。是的,睡一会儿并读取值可能会起作用,但是我正在寻找更好的方法。
  • 只是一个想法:我不能等待 AtomicLong 值并在线程等待线程然后值达到零时通知吗?
  • @fge 将检查条件

标签: java multithreading concurrency java.util.concurrent concurrenthashmap


【解决方案1】:

如果您使用的是 Java 8,CompletableFuture 可能非常适合。这是一个完整的、人为的示例,它正在等待 5 个客户端连接并向服务器发送消息(使用带有 offer/poll 的 BlockingQueue 进行模拟)。

在此示例中,当达到预期的客户端连接消息计数时,CompletableFuture 钩子完成,然后在您选择的任何线程上运行任意代码。

在此示例中,您没有任何复杂的线程等待/通知设置或繁忙的等待循环。

package so.thread.state;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class Main {

  public static String CONNETED_MSG = "CONNETED";
  public static Long EXPECTED_CONN_COUNT = 5L;

  public static ExecutorService executor = Executors.newCachedThreadPool();
  public static BlockingQueue<String> queue = new LinkedBlockingQueue<>();

  public static AtomicBoolean done = new AtomicBoolean(false);

  public static void main(String[] args) throws Exception {

    // add a "server" thread
    executor.submit(() -> server());

    // add 5 "client" threads
    for (int i = 0; i < EXPECTED_CONN_COUNT; i++) {
      executor.submit(() -> client());
    }

    // clean shut down
    Thread.sleep(TimeUnit.SECONDS.toMillis(1));
    done.set(true);
    Thread.sleep(TimeUnit.SECONDS.toMillis(1));
    executor.shutdown();
    executor.awaitTermination(1, TimeUnit.SECONDS);

  }

  public static void server() {

    print("Server started up");
    // track # of client connections established
    AtomicLong connectionCount = new AtomicLong(0L);

    // at startup, create my "hook"
    CompletableFuture<Long> hook = new CompletableFuture<>();
    hook.thenAcceptAsync(Main::allClientsConnected, executor);

    // consume messages
    while (!done.get()) {
      try {
        String msg = queue.poll(5, TimeUnit.MILLISECONDS);
        if (null != msg) {
          print("Server received client message");
          if (CONNETED_MSG.equals(msg)) {
            long count = connectionCount.incrementAndGet();

            if (count >= EXPECTED_CONN_COUNT) {
              hook.complete(count);
            }
          }
        }

      } catch (Exception e) {
        e.printStackTrace();
      }
    }

    print("Server shut down");

  }

  public static void client() {
    queue.offer(CONNETED_MSG);
    print("Client sent message");
  }

  public static void allClientsConnected(Long count) {
    print("All clients connected, count: " + count);
  }


  public static void print(String msg) {
    System.out.println(String.format("[%s] %s", Thread.currentThread().getName(), msg));
  }
}

你会得到这样的输出

[pool-1-thread-1] Server started up
[pool-1-thread-5] Client sent message
[pool-1-thread-3] Client sent message
[pool-1-thread-2] Client sent message
[pool-1-thread-6] Client sent message
[pool-1-thread-4] Client sent message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-1] Server received client message
[pool-1-thread-4] All clients connected, count: 5
[pool-1-thread-1] Server shut down

【讨论】:

  • 事先不知道客户端连接数。可以有许多连接,并且这些连接具有不同的流。我必须等待特定的流。
  • @kani,这是一个人为的例子,你可以将这种模式与任何类型的逻辑一起使用(你现在拥有相同的商业逻辑)然后使用CompletableFuture.complete(..),我只是在尝试展示 Java 引入的新范式,以使复杂异步行为的组合更加流畅,然后围绕 Object.wait()Object.notify() 进行推理
  • 我的客户端线程是根据通信 ID 分组的。例如,如果有 15 个客户端连接,则 10 个属于一个通信 A,5 个属于通信 B。我必须等待 A 的 10 个线程和 B 的 5 个线程。CompletableFuture 可以吗?我看到它需要与执行程序一起收集线程,并且在启动线程时我无法区分通信流。
【解决方案2】:

你的表情:

conhasmap.get(conectionID) != new AtomicLong(0)

将始终为真,因为您正在比较永远不会相等的对象引用,而不是值。更好的表达方式是:

conhasmap.get(conectionID).longValue() != 0L)

,但是像这样在循环中没有等待/通知逻辑的循环并不是一个好习惯,因为它不断地使用 CPU 时间。相反,每个线程都应该在 AtomicLong 实例上调用 .wait(),并且当它递减或递增时,您应该在 AtomicLong 实例上调用 .notifyAll() 以唤醒每个等待线程以检查表达式。 AtomicLong 类在修改时可能已经在调用 notifyAll() 方法,但我不确定。

AtomicLong al = conhasmap.get(conectionID);
synchronized(al) {
    while(al.longValue() != 0L) {
        al.wait(100); //wait up to 100 millis to be notified
    }
}

在递增/递减的代码中,它看起来像:

AtomicLong al = conhasmap.get(conectionID);
synchronized(al) {
    if(al.decrementAndGet() == 0L) {
        al.notifyAll();
    }
}

我个人不会为此计数器使用 AtomicLong,因为您没有从 AtomicLong 的无锁行为中受益。只需使用 java.lang.Long 代替,因为无论如何您都需要在计数器对象上同步 wait()/notify() 逻辑。

【讨论】:

  • 感谢您指出对象引用比较。但是,该解决方案仍然无效。
  • 你觉得al锁和锁是一致的吗?代码在while循环中进入无限循环
  • 您可能遇到过频繁地减少计数器的情况。将 while 条件更改为:while(al.longValue() > 0L) 将 if 条件更改为:if(al.decrementAndGet()
  • 还要检查以确保您只为连接流实例化一次 AtomicLong。每个连接都需要增加/减少相同的 AtomicLong 实例,否则它们将锁定不同的对象实例,这将导致您看到的无限循环。
  • > &
猜你喜欢
  • 1970-01-01
  • 2019-10-31
  • 1970-01-01
  • 1970-01-01
  • 2010-09-22
  • 1970-01-01
  • 2023-03-11
  • 1970-01-01
相关资源
最近更新 更多