【问题标题】:Two-way communication with a Java thread与 Java 线程的双向通信
【发布时间】:2010-10-18 09:11:47
【问题描述】:

在我的应用程序中,我执行了一些繁重的查找操作。这些操作必须在单个线程中完成(持久性框架限制)。

我想缓存结果。因此,我有一个类 UMRCache,有一个内部类 Worker:

public class UMRCache {
  private Worker worker;
  private List<String> requests = Collections.synchronizedList<new ArrayList<String>>());
  private Map<String, Object> cache = Collections.synchronizedMap(new HashMap<String, Object>());
  public UMRCache(Repository repository) {
    this.worker = new Worker(repository);
    this.worker.start();
  }

 public Object get(String key) {
   if (this.cache.containsKey(key)) {
    // If the element is already cached, get value from cache
     return this.cache.get(key);
   }
   synchronized (this.requests) {
     // Add request to queue
     this.requests.add(key);
     // Notify the Worker thread that there's work to do
     this.requests.notifyAll();
   }
   synchronized (this.cache) {
     // Wait until Worker has updated the cache
     this.cache.wait();
    // Now, cache should contain a value for key
     return this.cache.get(key);
   }
 }

 private class Worker extends Thread {
   public void run() {
      boolean doRun = true;
      while (doRun) {
         synchronized (requests) {
            while (requests.isEmpty() && doRun) {
               requests.wait(); // Wait until there's work to do
            }
            synchronized (cache) {
               Set<String> processed = new HashSet<String>();
               for (String key : requests) {
                 // Do the lookup
                 Object result = respository.lookup(key);
                 // Save to cache
                 cache.put(key, result);
                 processed.add(key); 
               }
               // Remove processed requests from queue
               requests.removeAll(processed);
               // Notify all threads waiting for their requests to be served
               cache.notifyAll();
            } 
         }
      }
   }
}

}

我有一个测试用例: 公共类 UMRCacheTest 扩展 TestCase { 私有UMRCache umrCache;

public void setUp() throws Exception {
    super.setUp();
    umrCache = new UMRCache(repository);
}

public void testGet() throws Exception {
    for (int i = 0; i < 10000; i++) {
        final List fetched = Collections.synchronizedList(new ArrayList());
        final String[] keys = new String[]{"key1", "key2"};
        final String[] expected = new String[]{"result1", "result2"}
        final Random random = new Random();

        Runnable run1 = new Runnable() {
            public void run() {
                for (int i = 0; i < keys.length; i++) {
                    final String key = keys[i];
                    final Object result = umrCache.get(key);
                    assertEquals(key, results[i]);
                    fetched.add(um);
                    try {
                        Thread.sleep(random.nextInt(3));
                    } catch (InterruptedException ignore) {
                    }
                }
            }
        };
        Runnable run2 = new Runnable() {
            public void run() {
                for (int i = keys.length - 1; i >= 0; i--) {
                    final String key = keys[i];
                    final String result = umrCache.get(key);
                    assertEquals(key, results[i]);
                    fetched.add(um);
                    try {
                        Thread.sleep(random.nextInt(3));
                    } catch (InterruptedException ignore) {
                    }
                }
            }
        };

        final Thread thread1 = new Thread(run1);
        thread1.start();
        final Thread thread2 = new Thread(run2);
        thread2.start();
        final Thread thread3 = new Thread(run1);
        thread3.start();
        thread1.join();
        thread2.join();
        thread3.join();
        umrCache.dispose();
        assertEquals(6, fetched.size());
    }
}

}

测试随机失败,大约有 10 次运行失败。它将在最后一个断言处失败:assertEquals(6, fetched.size()),在 assertEquals(key, results[i]) 处,或者有时测试运行器永远不会完成。

所以我的线程逻辑有些问题。有什么建议吗?

编辑:

我现在可能已经破解了,感谢所有帮助过的人。 解决方案似乎是:

 public Object get(String key) {
   if (this.cache.containsKey(key)) {
    // If the element is already cached, get value from cache
     return this.cache.get(key);
   }
   synchronized (this.requests) {
     // Add request to queue
     this.requests.add(key);
     // Notify the Worker thread that there's work to do
     this.requests.notifyAll();
   }
   synchronized (this.cache) {
     // Wait until Worker has updated the cache
     while (!this.cache.containsKey(key)) {
       this.cache.wait();
     }
    // Now, cache should contain a value for key
     return this.cache.get(key);
   }
 }

【问题讨论】:

  • 私有映射 cache = new Collections.synchronizedMap(new HashMap());错了...不能用第一个'new'编译,你确定你的复制/粘贴吗?
  • 你是对的。我已更正错误。

标签: java multithreading


【解决方案1】:

get() 方法逻辑可能会丢失结果并卡住



   synchronized (this.requests) {
     // Add request to queue
     this.requests.add(key);
     // Notify the Worker thread that there's work to do
     this.requests.notifyAll();
   }

   // ----- MOMENT1.  If at this moment Worker puts result into cache it
   // will be missed since notification will be lost

   synchronized (this.cache) {
     // Wait until Worker has updated the cache
     this.cache.wait();

    // ----- MOMENT2.  May be too late, since cache notifiation happened before at MOMENT1

    // Now, cache should contain a value for key
     return this.cache.get(key);
   }

【讨论】:

  • 啊,当然!似乎可以通过将整个块包装在 synchronized (this.cache) 中来解决问题,这样: synchronized (this.cache) { synchronized (this.requests) { this.requests.add(key); this.requests.notifyAll(); } this.cache.wait();返回 this.cache.get(key); } 似乎也没有显着的性能影响。谢谢楼主,很好看!
  • 这可能会导致死锁,因为 getter 将锁定缓存然后结果,而 Worker 锁定结果然后缓存。为避免死锁,线程必须以相同的顺序锁定对象。
  • 该死,你是对的。我偶尔会进行悬挂测试。但是,我对同步和等待()的顺序应该是什么完全空白。能给个提示吗?
  • 好吧,看起来 get() 现在更没问题了。但出于这个目的,我会使用 Java 并发包中的 ExecutorService + Future 接口。参考未来的例子 - 几乎就是你所做的
  • 感谢您的提示!我将深入了解 java.util.concurrent。
【解决方案2】:

您的测试中的变量 fetched 是一个 ArrayList,可从您的两个匿名 Runnable 实例访问和更新。

ArrayList 不是线程安全的,来自文档:

请注意,此实现不是 同步。如果多个线程 访问 ArrayList 实例 同时,并且至少其中之一 线程修改列表 在结构上,它必须是同步的 外部。 (结构修改 是任何添加或删除的操作 一个或多个元素,或显式 调整后备数组的大小;仅仅 设置元素的值不是 结构修改。)这是 通常由完成 在某个对象上同步 自然地封装了列表。如果不 这样的对象存在,列表应该是 “包装”使用 Collections.synchronizedList 方法。 这最好在创建时完成,以 防止意外不同步 访问列表:

因此我认为您的测试需要稍作调整。

【讨论】:

  • 谢谢,应该已经发现了。将 Collections.synchronizedList 包装添加到“获取”列表中。
  • 但是偶尔test runner挂掉的问题还是可见的。当我暂停挂起的运行器时,我可以看到“Thread-0”卡在 Worker.run() 方法中的 requests.wait() 上,而“Thread-1”卡在 this.cache.wait() 中UMRCache.get().
【解决方案3】:

我注意到您在缓存中的查找不是原子操作:

if (this.cache.containsKey(key)) {
    // If the element is already cached, get value from cache
    return this.cache.get(key);
}

由于您永远不会从代码中的缓存中删除,因此您始终会通过此代码获得一些价值。但是,如果将来您打算清理缓存,那么这里缺乏原子性将成为一个问题。

【讨论】:

  • 感谢您的意见。清除缓存目前不是问题,但将来肯定会。
猜你喜欢
  • 1970-01-01
  • 2020-08-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-05-24
相关资源
最近更新 更多