本文是我们学院课程中名为Java Concurrency Essentials的一部分 。
在本课程中,您将深入探讨并发的魔力。 将向您介绍并发和并发代码的基础知识,并学习诸如原子性,同步和线程安全之类的概念。 在这里查看 !
1.简介
本文介绍了多线程应用程序的测试。 我们实现一个简单的阻塞队列,并在压力测试条件下测试其阻塞行为以及行为和性能。 最后,我们阐明了用于多线程类的单元测试的可用框架。
2. SimpleBlockingQueue
在本节中,我们将实现一个非常基本且简单的阻塞Queue 。 这个队列除了保留我们放入其中的元素并在调用get()时将它们还给他们之外,什么也没做。 在新元素可用之前, get()应该会阻塞。
显然, java.util.concurrent包已经提供了这样的功能,并且不需要再次实现它,但是出于演示目的,我们在这里这样做是为了展示如何测试这样的类。
作为队列的数据结构,我们从java.util包中选择一个标准的LinkedList 。 此列表未同步,调用其get()方法不会阻塞。 因此,我们必须同步访问列表,并且必须添加阻止功能。 后者可以通过一个简单的while()循环来实现,当队列为空时,该循环调用列表上的wait()方法。 如果队列不为空,则返回第一个元素:
public class SimpleBlockingQueue<T> {
private List<T> queue = new LinkedList<T>();
public int getSize() {
synchronized(queue) {
return queue.size();
}
}
public void put(T obj) {
synchronized(queue) {
queue.add(obj);
queue.notify();
}
}
public T get() throws InterruptedException {
while(true) {
synchronized(queue) {
if(queue.isEmpty()) {
queue.wait();
} else {
return queue.remove(0);
}
}
}
}
}
测试阻止操作
尽管此实现非常简单,但是测试所有功能(尤其是阻止功能)并不是那么容易。 当我们只在空队列上调用get()时,当前线程将被阻塞,直到另一个线程将新项插入队列中。 这意味着在单元测试中我们至少需要两个不同的线程。 当一个线程阻塞时,另一个线程等待特定的时间。 如果在此期间另一个线程不执行其他代码,则可以假定阻止功能正在运行。 检查阻塞线程是否不执行任何其他代码的一种方法是,在引发异常或执行get()调用后的行时,添加一些设置的布尔标志:
private static class BlockingThread extends Thread {
private SimpleBlockingQueue queue;
private boolean wasInterrupted = false;
private boolean reachedAfterGet = false;
private boolean throwableThrown;
public BlockingThread(SimpleBlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
try {
queue.get();
} catch (InterruptedException e) {
wasInterrupted = true;
}
reachedAfterGet = true;
} catch (Throwable t) {
throwableThrown = true;
}
}
public boolean isWasInterrupted() {
return wasInterrupted;
}
public boolean isReachedAfterGet() {
return reachedAfterGet;
}
public boolean isThrowableThrown() {
return throwableThrown;
}
}
标志wasInterrupted指示阻塞线程是否被中断,标志标志reachedAfterGet显示执行了get之后的行,最后throwableThrown会告诉我们是否throwableThrown了任何类型的Throwable 。 使用这些标志的getter方法,我们现在可以编写一个单元测试,该单元测试首先创建一个空队列,启动BlockingThread ,等待一段时间,然后将新元素插入队列。
@Test
public void testPutOnEmptyQueueBlocks() throws InterruptedException {
final SimpleBlockingQueue queue = new SimpleBlockingQueue();
BlockingThread blockingThread = new BlockingThread(queue);
blockingThread.start();
Thread.sleep(5000);
assertThat(blockingThread.isReachedAfterGet(), is(false));
assertThat(blockingThread.isWasInterrupted(), is(false));
assertThat(blockingThread.isThrowableThrown(), is(false));
queue.put(new Object());
Thread.sleep(1000);
assertThat(blockingThread.isReachedAfterGet(), is(true));
assertThat(blockingThread.isWasInterrupted(), is(false));
assertThat(blockingThread.isThrowableThrown(), is(false));
blockingThread.join();
}
在插入之前,所有标志都应为false。 如果是这样的话,我们把一个新的元素到队列中,检查标志reachedAfterGet设置为true。 所有其他标志仍应为false。 最后,我们可以join() blockingThread 。
测试正确性
先前的测试仅显示了如何测试阻塞操作。 更有趣的是真正的多线程测试用例,其中我们有多个线程在队列中添加元素,还有一堆工作线程从队列中检索这些值。 基本上,这意味着创建一些将新元素插入队列的生产者线程,并设置一堆调用get()的工作线程。
但是,我们如何知道工作线程从队列中获得了与生产者线程之前插入的元素完全相同的元素呢? 一种可能的解决方案是让第二个队列在其中基于某些唯一ID(例如UUID)添加和删除元素。 但是,由于我们处于多线程环境中,因此还必须同步对第二个队列的访问,并且某些唯一ID的创建也将强制进行某种同步。
更好的解决方案是一些无需任何额外同步即可实现的数学方法。 最简单的方法是使用整数值作为队列元素,这些值是从线程本地随机生成器中检索的。 特殊的类ThreadLocalRandom为每个线程管理一个随机生成器。 因此,我们没有任何同步开销。 生产者线程计算由它们插入的元素的总和,而工作线程也计算其本地和。 最后,将所有生产者线程的总和与所有消费者线程的总和进行比较。 如果相同,则可以假定我们很有可能已检索到之前插入的所有任务。
以下单元测试通过将使用者线程和生产者线程作为任务提交到固定线程池来实现这些想法:
@Test
public void testParallelInsertionAndConsumption() throws InterruptedException, ExecutionException {
final SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<Integer>();
ExecutorService threadPool = Executors.newFixedThreadPool(NUM_THREADS);
final CountDownLatch latch = new CountDownLatch(NUM_THREADS);
List<Future<Integer>> futuresPut = new ArrayList<Future<Integer>>();
for (int i = 0; i < 3; i++) {
Future<Integer> submit = threadPool.submit(new Callable<Integer>() {
public Integer call() {
int sum = 0;
for (int i = 0; i < 1000; i++) {
int nextInt = ThreadLocalRandom.current().nextInt(100);
queue.put(nextInt);
sum += nextInt;
}
latch.countDown();
return sum;
}
});
futuresPut.add(submit);
}
List<Future<Integer>> futuresGet = new ArrayList<Future<Integer>>();
for (int i = 0; i < 3; i++) {
Future<Integer> submit = threadPool.submit(new Callable<Integer>() {
public Integer call() {
int count = 0;
try {
for (int i = 0; i < 1000; i++) {
Integer got = queue.get();
count += got;
}
} catch (InterruptedException e) {
}
latch.countDown();
return count;
}
});
futuresGet.add(submit);
}
latch.await();
int sumPut = 0;
for (Future<Integer> future : futuresPut) {
sumPut += future.get();
}
int sumGet = 0;
for (Future<Integer> future : futuresGet) {
sumGet += future.get();
}
assertThat(sumPut, is(sumGet));
}
我们使用CountDownLatch来等待所有线程完成。 最后,我们可以计算所有提交和检索的整数的总和,并断言两者相等。
很难预测执行不同线程的顺序。 它取决于许多动态因素,例如操作系统处理的中断以及调度程序如何选择下一个要执行的线程。 为了实现更多的上下文切换,可以调用Thread.yield()方法。 这为调度程序提供了一个提示,即当前线程愿意让CPU支持其他线程。 如javadoc所述,这只是一个提示,即JVM可以完全忽略该提示并进一步执行当前线程。 但是出于测试目的,可以使用这种方法引入更多的上下文切换,从而引发竞争条件等。
测试性能
除了类的正确行为,另一方面是它的性能。 在许多实际应用中,性能是一项重要要求,因此必须进行测试。
我们可以利用ExecutorService设置不同数量的线程。 每个线程都应该将一个元素插入到我们的队列中,然后再从中获取它。 在外部循环中,我们增加线程数,以查看线程数如何影响吞吐量。
@Test
public void testPerformance() throws InterruptedException {
for (int numThreads = 1; numThreads < THREADS_MAX; numThreads++) {
long startMillis = System.currentTimeMillis();
final SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<Integer>();
ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++) {
threadPool.submit(new Runnable() {
public void run() {
for (long i = 0; i < ITERATIONS; i++) {
int nextInt = ThreadLocalRandom.current().nextInt(100);
try {
queue.put(nextInt);
nextInt = queue.get();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
threadPool.shutdown();
threadPool.awaitTermination(5, TimeUnit.MINUTES);
long totalMillis = System.currentTimeMillis() - startMillis;
double throughput = (double)(numThreads * ITERATIONS * 2) / (double) totalMillis;
System.out.println(String.format("%s with %d threads: %dms (throughput: %.1f ops/s)", LinkedBlockingQueue.class.getSimpleName(), numThreads, totalMillis, throughput));
}
}
为了了解我们的简单队列实现的性能,我们可以将其与JDK的实现进行比较。 一个候选的是LinkedBlockingQueue 。 它的两个方法put()和take()工作方式与我们的实现类似,期望LinkedBlockingQueue有选择地受限制,因此必须跟踪所插入元素的数量,并在队列已满时让当前线程进入休眠状态。 此功能需要额外的簿记和插入操作检查。 另一方面,JDK实现不使用同步块,并且已经通过繁琐的性能测量来实现。
当我们使用LinkedBlockingQueue实现与上述相同的测试用例时,两个测试用例的输出如下:
该图清楚地表明,对于LinkedBlockingQueue实现,吞吐率(即每时间单位执行的操作数)几乎是其两倍。 但是它也表明,只有一个线程,两种实现每秒执行大约相同数量的操作。 添加更多线程可提高吞吐量,尽管曲线很快会针对其饱和值收敛。 添加更多线程不会再提高应用程序的性能。
3.测试框架
您可以查看可用的测试框架,而不必编写自己的框架来为应用程序实现多线程测试用例。 本节重点介绍其中两个:JMock和Grobo Utils。
JMock
为了进行压力测试,模拟框架JMock提供了Blitzer类。 该类实现的功能与我们在“测试正确性”部分中所做的类似,因为它在内部设置了一个ThreadPool向其提交执行某些特定操作的任务。 您提供要执行的任务/动作的数量以及构造函数的线程数量:
Blitzer blitzer = new Blitzer(25000, 6);
此实例具有方法blitz() ,您只需向其提供Runnable接口的实现:
@Test
public void stressTest() throws InterruptedException {
final SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<Integer>();
blitzer.blitz(new Runnable() {
public void run() {
try {
queue.put(42);
queue.get();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
assertThat(queue.getSize(), is(0));
}
因此,与使用ExecutorService相比, Blitzer类使压力测试的实现更加简单。
Grobo实用程序
Grobo Utils是一个框架,为测试多线程应用程序提供支持。 本文描述了该框架背后的思想。
与前面的示例相似,我们拥有类MultiThreadedTestRunner ,该类在内部构造线程池并以给定数量的Runnable实现作为单独的线程执行。 Runnable实例必须实现一个称为TestRunnable的特殊接口。 值得一提的是,它的唯一方法runTest()会引发异常。 这样,线程内引发的异常会影响测试结果。 当我们使用普通的ExecutorService时,情况并非如此。 这里的任务必须实现Runnable并且它的唯一方法run()不会引发任何异常。 这些任务中引发的异常会被吞没,并且不会破坏测试。
构造完MultiThreadedTestRunner之后,我们可以调用其runTestRunnables()方法并提供在测试失败之前应等待的毫秒数。 最后, assertThat()调用将验证队列再次为空,因为所有测试线程都已删除了先前添加的元素。
public class SimpleBlockingQueueGroboUtilTest {
private static class MyTestRunnable extends TestRunnable {
private SimpleBlockingQueue<Integer> queue;
public MyTestRunnable(SimpleBlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void runTest() throws Throwable {
for (int i = 0; i < 1000000; i++) {
this.queue.put(42);
this.queue.get();
}
}
}
@Test
public void stressTest() throws Throwable {
SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<Integer>();
TestRunnable[] testRunnables = new TestRunnable[6];
for (int i = 0; i < testRunnables.length; i++) {
testRunnables[i] = new MyTestRunnable(queue);
}
MultiThreadedTestRunner mttr = new MultiThreadedTestRunner(testRunnables);
mttr.runTestRunnables(2 * 60 * 1000);
assertThat(queue.getSize(), is(0));
}
}
翻译自: https://www.javacodegeeks.com/2015/09/testing-concurrent-applications.html