【问题标题】:Test the double check locking mechanism in singleton for 100% code coverage在单例中测试双重检查锁定机制以实现 100% 的代码覆盖率
【发布时间】:2017-08-22 15:04:20
【问题描述】:

我正在尝试实现 100% 的代码覆盖率并尝试为单例中的双重检查锁定机制编写单元测试。

if (test == null) {
    synchronized (Test.class) {
        if (test == null) {
            test = injector.getInstance(Test.class);
        }
    }
    return test;
}

如何测试第二次空检查被命中的场景?我尝试了很多阅读,但找不到方法。

【问题讨论】:

标签: java unit-testing junit singleton


【解决方案1】:

更新:我从 100% 可靠变体的测试中删除了完全不必要的线程代码


不可能对使用 OP 中所示的 synchronized 块的经典双重检查锁定习语进行 100% 可靠测试,因为您需要在外部 ifsynchronized 之间进行一些挂钩块或最新同步块,以便您知道何时通过外部 if 并且现在正在等待 synchronized 块。或者更准确地说,是在读取字段值之后等待之前。

我的答案将被拆分,我首先将展示一种如何做一个漂亮(但不是 100%)可靠的方法来测试成语的“外部如果通过内部如果失败”的情况。然后我将展示三个选项,如何稍微修改生产代码以使其易于测试 100% 可靠。

还有一个免责声明,我的示例代码将在 Spock 中,因此是 Groovy 代码,因为恕我直言,即使您的生产代码使用 Java,编写测试也更胜一筹,尤其是在访问私有字段或方法时只要它们不是最终的,它就可以隐式地在 Groovy 中工作。但是代码应该很容易相应地转换为纯 JUnit Java 代码,特别是只要您的字段和方法不是私有的。


所以这里首先通过一些变化和内联解释来测试这个成语的非常可靠的方法。

def 'field should not get changed once assigned even if outer check succeeds'() {
    given:
        def insideSynchronizedBlock = new CountDownLatch(1)
        def continueTest = new CountDownLatch(1)
        // this is the value the field is set to after the
        // test passed the outer if and before checking the
        // inner if
        def testFieldValue = new Object()

    when:
        // get the object the synchronized block is synchronizing on
        // this can be some class object, internal field or whatever,
        // in OP this is the Test class object so we are using it here too
        def initializationLock = Test

        // this thread will enter the synchronized block and then
        // wait for the continueTest latch and thus hinder
        // the main thread to enter it prematurely but wait at
        // the synchronized block after checking the outer if
        //
        // if inside the synchronized block content there is something
        // that can be mocked like some generator method call,
        // alternatively to manually synchronizing on the lock and setting
        // the field in question, the generator method could be mocked
        // to first block on the continueTest latch and then return testFieldValue
        def synchronizedBlocker = Thread.start {
            synchronized (initializationLock) {
                // signal that we now block the synchronized block
                insideSynchronizedBlock.countDown()

                // wait for the continueTest latch that should be triggered
                // after the outer if was passed so that now the field
                // gets assigned so that the inner if will fail
                //
                // as we are in a different thread, an InterruptedException
                // would just be swallowed, so repeat the waiting until the
                // latch actually reached count 0, alternatively you could
                // mark the test as failed in the catch block if your test
                // framework supports this from different threads, you might
                // also consider to have some maximum waiting time that is
                // checked if an interrupt happened
                while (continueTest.count != 0) {
                    try {
                        continueTest.await()
                    } catch (InterruptedException ignore) {
                        // ignore, await is retried if necessary
                    }
                }
                // as the outer if was passed, now set the field value
                // in the testee to the test field value, we called the
                // testee field test here as was in OP example
                // after this the synchronized block will be exited
                // and the main test code can continue and check the inner if
                //
                // in the mocking case mentioned above, here testFieldValue
                // would be returned by the mocked injector method
                testee.test = testFieldValue
            }
        }

        // wait for the synchronizedBlocker to block the synchronized block
        // here if an InterruptedException happens, the test will fail,
        // alternatively you could also here do some while-loop that retries
        // the waiting, also with an optional maximum waiting time
        insideSynchronizedBlock.await()

        // this thread will trigger the continueTest latch and thus
        // implicitly cause the field to be set and the synchronized
        // block to be exited
        //
        // this is the problematic part why this solution is not 100% reliable
        // because this thread should start only after the main test code has
        // checked the outer if, otherwise the field value might be set before
        // the outer if is checked and so the outer if will already fail
        //
        // the test here will not detect this but stay green, the only way this
        // would be observed is because branch coverage would drop by one if this
        // happened; unfortunately that this happens is not too abstract but a
        // real "problem", so if you want to maintain 100% branch coverage, you
        // have to take additional measures, like for example these:
        //
        // - minimize the delay between the starting of this thread and the main
        //   test code actually passing the outer if check to increase probability
        //   that the if check was passed before this thread gets scheduled
        //   if you for example call the method in question via reflection because
        //   it is private and you don't want ot open it up for testing, this is
        //   slower than usual and you should find and get the method reference
        //   before starting this thread and you might also call it before on some
        //   dummy instance, so that the JIT compiler has a chance to optimize the
        //   access and so on
        //
        //   just do anything you can to minimize the time between starting this
        //   thread and passing the outer if in the main test code, but whatever
        //   you do, it is most probably not enough to have halfway reliable 100%
        //   branch coverage so you might need to take additional measures
        //
        // - add a yield() to this thread before calling countDown on the
        //   continueTest latch to give the main thread a chance to pass the outer
        //   if, if that is enough in your situation
        //
        //   if not, instead add a sleep() to this thread at the same place to
        //   give the main thread more time to pass the outer if, how long this
        //   sleep must be cannot be said and you just have to test what value
        //   you need to have halfway reliable results, it might already be
        //   enough to sleep for 50 ms or even less, just try it out
        //
        // - repeat the test X times so that at least with one of the tries
        //   the intended branch was hit and thus branch coverage will stay at
        //   100%, how big X is again has to be determined from case to case
        //   by trial and error or by wasting time by setting a very high value
        //
        //   in case of Spock being used the repeating can simply be achieved
        //   by adding a `where: i << (1..1000)` after the cleanup block to repeat
        //   the test a thousand times, for more information read about data
        //   driven testing in Spock, the data variable is simply ignored here
        //   and only cares for the identically repeated execution of the test
        def synchronizedUnBlocker = Thread.start {
            continueTest.countDown()
        }

    then:
        // here hopefully the outer if is passed before the synchronizedUnBlocker
        // is scheduled and then we wait at the synchronized block
        // then the synchronizedUnBlocker is scheduled and counts down the latch
        // the synchronizedBlocker is waiting for which in turn will set the
        // field value to testFieldValue
        // then the synchronizedBlocker exist and thus unblocks the synchronized
        // block, allowing the inner if to be tested here which now should fail
        // and the field value staying at testFieldValue
        // that the testFieldValue stays set is verified here, but this would also
        // be green if the outer if failed already because the synchronizedUnBlocker
        // was scheduled before the outer if was passed
        testee.getCommandPrefix(message) == testFieldValue

    cleanup:
        // wait for the helper threads to die,
        // this is not necessarily required they will die anyway
        synchronizedBlocker?.join()
        synchronizedUnBlocker?.join()
}

如前所述,(据我所知)100% 可靠地测试双重检查锁定的唯一方法是修改生产代码,以便能够在两个 if 检查之间和主线程阻塞之前挂钩一些监视器。更准确地说,是在读取外部 if 的字段值之后和线程阻塞之前。

这可以通过多种方式实现,我在这里展示了其中的一些,因此您可以选择您最喜欢的一种,或者根据它们开发自己的方式。这些替代方案没有经过性能测试,我不会对它们做任何性能说明,我只是说它们仍然可以正常工作,并且可以 100% 可靠地测试,并具有 100% 的分支覆盖率。如果您熟悉微基准测试,请使用这些方法进行一些适当的微基准测试性能测试,并希望我将它们添加到答案中,只需告诉我,我会添加信息。

这是我要修改的原始双重检查锁定Java方法:

public class Test {
    private final Object fieldInitializationLock = new Object();
    private volatile String field;
    public String getField() {
        String field = this.field;
        if (field == null) {
            synchronized (fieldInitializationLock) {
                field = this.field;
                if (field == null) {
                    field = "";
                    this.field = field;
                }
            }
        }
        return field;
    }
}

这里根据我上面的解释进行相应的测试:

import spock.lang.Specification

import java.util.concurrent.CountDownLatch

class DoubleCheckedLockingTest extends Specification {
    def 'field should not get changed once assigned even if outer check succeeds'() {
        given:
            def testee = new Test()
            def insideSynchronizedBlock = new CountDownLatch(1)
            def continueTest = new CountDownLatch(1)
            def testFieldValue = new String()

        when:
            def synchronizedBlocker = Thread.start {
                synchronized (testee.fieldInitializationLock) {
                    insideSynchronizedBlock.countDown()
                    while (continueTest.count != 0) {
                        try {
                            continueTest.await()
                        } catch (InterruptedException ignore) {
                            // ignore, await is retried if necessary
                        }
                    }
                    testee.field = testFieldValue
                }
            }

            insideSynchronizedBlock.await()
            def synchronizedUnBlocker = Thread.start {
                continueTest.countDown()
            }

        then:
            testee.getField().is(testFieldValue)

        cleanup:
            synchronizedBlocker?.join()
            synchronizedUnBlocker?.join()
    }
}

第一个变体可能也是我最喜欢的一个。

它使用更高级别的同步工具 ReadWriteLock 来执行双重检查锁定,而不是在某个监控对象上使用 volatile 字段和同步块。请注意,对于此变体,该字段不再需要为 volatile,我们也不需要防止多次缓慢访问 volatile 字段的本地字段。

这是我的最爱,因为与所有其他 100% 可靠的解决方案一样,它需要更改生产源,但仍然只有生产代码。该逻辑只是使用不同的高级工具实现的,并且通过这样做突然变得可以正确测试。其他方法可能看起来更简单,需要的更改更少,但它们都引入了专门用于使习语可测试且没有生产价值的代码。

所以这里是修改后的Java源代码:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Test {
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private final Lock readLock = readWriteLock.readLock();
    private final Lock writeLock = readWriteLock.writeLock();
    private String field;
    public String getField() {
        readLock.lock();
        try {
            if (field == null) {
                readLock.unlock();
                try {
                    writeLock.lock();
                    try {
                        if (field == null) {
                            field = "";
                        }
                    } finally {
                        writeLock.unlock();
                    }
                } finally {
                    readLock.lock();
                }
            }
            return field;
        } finally {
            readLock.unlock();
        }
    }
}

这里是相应的测试。与原始测试的不同之处在于,它 100% 可靠并且不需要额外的线程。我们需要设置一个最终变量readLock,但由于反射,这工作得很好,并且可以用于测试代码恕我直言。如果这不可能或不想要,我们还可以使用一些模拟工具,例如 PowerMock,它能够模拟 new ReentrantReadWriteLock() 调用。

我们只是装饰了writeLock.lock() 调用。在我们进行实际锁定之前,我们设置了相关字段,这样内部的if 就会失败,这正是我们想要实现的。

import spock.lang.Specification

class DoubleCheckedLockingTest extends Specification {
    def 'field should not get changed once assigned even if outer check succeeds'() {
        given:
            def testee = new Test()
            def testFieldValue = new String()

        and:
            Test.getDeclaredField("writeLock")
                    .tap { it.accessible = true }
                    .set(testee, Spy(testee.writeLock))
            testee.writeLock.lock() >> {
                testee.field = testFieldValue
                callRealMethod()
            }

        expect:
            testee.getField().is(testFieldValue)
    }
}

所有其他变体都人为地引入了一些东西,以在被检查的 if 条件和被获取的监视器之间进行挂钩,以便可以向同步阻塞器发出信号以继续之前的方法已经自动执行的操作。


下面的变体引入了一些可以用来设置相关字段的东西,并且在生产时什么都不做。在此示例中,我在 Object 上使用 hashCode() 并将其替换为测试中的存根,但您可以使用任何其他变体,例如拥有一些什么都不做的类,只需创建一个新对象,然后使用 PowerMock 或相似的。唯一重要的一点是,您可以在测试时对其进行检测,在生产时它什么也不做或尽可能少做,并且不会被编译器优化掉。

这里是 Java 代码,其中只添加了带有 testHook 的两行:

public class Test {
    private final Object fieldInitializationLock = new Object();
    private volatile String field;
    private Object testHook;
    public String getField() {
        String field = this.field;
        if (field == null) {
            testHook.hashCode();
            synchronized (fieldInitializationLock) {
                field = this.field;
                if (field == null) {
                    field = "";
                    this.field = field;
                }
            }
        }
        return field;
    }
}

以及相应的测试代码,其中testHook 被检测存根替换:

import spock.lang.Specification

class DoubleCheckedLockingTest extends Specification {
    def 'field should not get changed once assigned even if outer check succeeds'() {
        given:
            def testee = new Test()
            def testFieldValue = new String()

        and:
            testee.testHook = Stub(Object)
            testee.testHook.hashCode() >> {
                testee.field = testFieldValue
                1
            }

        expect:
            testee.getField().is(testFieldValue)
    }
}

以下变体将if 中的等号检查替换为Objects.isNull(...),然后使用PowerMock 调用仪器。从生产代码的角度来看,这个变体也不算太糟糕,但目前的缺点是,必须使用 PowerMock,并且 PowerMock 与默认和推荐的 JaCoCo on-the-fly 工具不协调,所以测试的类将具有0% 的测试覆盖率,或者必须使用 JaCoCo 离线检测进行检测。只是因为可以做好,不管容易还是相反,我在这里提一下。

Java 类:

import static java.util.Objects.isNull;

public class Test {
    private final Object fieldInitializationLock = new Object();
    private volatile String field;
    public String getField() {
        String field = this.field;
        if (isNull(field)) {
            synchronized (fieldInitializationLock) {
                field = this.field;
                if (field == null) {
                    field = "";
                    this.field = field;
                }
            }
        }
        return field;
    }
}

以及相应的测试:

import groovy.transform.CompileStatic
import org.junit.runner.RunWith
import org.mockito.stubbing.Answer
import org.powermock.core.classloader.annotations.PrepareForTest
import org.powermock.modules.junit4.PowerMockRunner
import spock.lang.Specification

import static org.mockito.ArgumentMatchers.isNull
import static org.mockito.Mockito.when
import static org.mockito.Mockito.withSettings
import static org.powermock.api.mockito.PowerMockito.mockStatic

@RunWith(PowerMockRunner)
@PrepareForTest(Test)
class DoubleCheckedLockingTest extends Specification {
    // this helper is necessary, or PowerMock cannot properly mock
    // the system class, as it instruments the test class to intercept
    // and record the call to the system class
    // without compile static the dynamic Groovy features prevent this
    @CompileStatic
    def staticallyCompiledHelper(Answer answer) {
        when(Objects.isNull(isNull())).thenAnswer(answer)
    }

    def 'field should not get changed once assigned even if outer check succeeds'() {
        given:
            def testee = new Test()
            def testFieldValue = new String()

        and:
            mockStatic(Objects, withSettings().stubOnly())
            staticallyCompiledHelper {
                // as the field is value is already read
                // we can already trigger the continueTest latch
                // first and do the null-check second
                testee.field = testFieldValue
                it.callRealMethod()
            }

        expect:
            testee.getField().is(testFieldValue)
    }
}

【讨论】:

    【解决方案2】:

    更新:在与@vampire 讨论后,我得出的结论是:

    如果没有纯双重检查锁定代码的检测,这是不可能实现的。

    所以,这只有在您满足以下条件时才有可能:

    • 通过在外部ifsynchronized 关键字之间添加一些可以模拟的内容来检测代码(当然,这违背了双重检查锁定习惯的目的,即最小化外部if 之间的线程干扰机会检查和synchronized 块)
    • 可以模拟你的注射器和
    • 有办法在同步块之外泄漏test 引用

    示例

    双重检查锁定代码:

    private volatile TestObject testObject;
    
    private AtomicBoolean state = new AtomicBoolean(false);
    
    public void ensureTestObject() {
      if (testObject == null) {
        injector.tick();
        synchronized (TestObject.class) {
          if (testObject == null) {
            testObject = injector.getInstance(TestObject.class);
          } else {
            state.set(true);
          }
        }
      }
    }
    
    // @VisibleForTesting
    void setTestObject(TestObject testObject) {
      this.testObject = testObject;
    }
    
    public boolean failedInnerIf() {
      return state.get();
    }
    

    测试失败的内部if 检查:

    @Test
    public void shouldFailInnerNullCheck() throws Exception {
      for (int i = 0; i < 10000; i++) {
        // given
        CountDownLatch insideSynchronizedBlock = new CountDownLatch(1);
        CountDownLatch insideOuterIf = new CountDownLatch(1);
        CountDownLatch continueTest = new CountDownLatch(1);
        Injector injector = mock(Injector.class);
        when(injector.getInstance(any()))
            .thenAnswer(invocation -> {
              insideSynchronizedBlock.countDown();
              try {
                continueTest.await();
              } catch (InterruptedException e) {
                fail();
              }
              return new TestObject();
            })
            .thenReturn(new TestObject());
        when(injector.tick())
            .thenReturn("tic")
            .thenAnswer(invocation -> {
              insideOuterIf.countDown();
              try {
                continueTest.await();
              } catch (InterruptedException e) {
                fail();
              }
              return "tac";
            })
            .thenReturn("toc");
    
        DoubleCheckedLocking doubleCheckedLocking = new DoubleCheckedLocking(injector);
    
        // when
        Thread thread1 = new Thread(() ->
            doubleCheckedLocking.ensureTestObject()
        );
        Thread thread2 = new Thread(() -> {
          try {
            insideOuterIf.await();
          } catch (InterruptedException e) {
            fail();
          }
          doubleCheckedLocking.setTestObject(new TestObject());
          continueTest.countDown();
        });
        Thread thread3 = new Thread(() -> {
          try {
            insideSynchronizedBlock.await();
          } catch (InterruptedException e) {
            fail();
          }
          doubleCheckedLocking.ensureTestObject();
        });
    
        thread1.start();
        thread2.start();
        thread3.start();
    
        thread1.join();
        thread2.join();
        thread3.join();
    
        // then
        assertTrue("Failed at try " + i, doubleCheckedLocking.failedInnerIf());
      }
    }
    

    它是如何工作的?

    • 我们并行启动 3 个线程
    • thread1 块内的 synchronized
    • thread2thread3 在外部if 检查之后伪造testObject 的并行初始化(这是我们需要检测代码injector.tick() 的原因)
    • 'thread3' 第二次调用ensureTestObject(),一旦thread1 离开synchronized 块并且thread2 同时更改了测试对象,内部if 检查将失败

    【讨论】:

    • 如何确保在主线程继续之前没有运行thread2?我试图在这里实现您的建议,但除非我在await 之后和setTestObject 之前在thread2 内添加任意Thread.sleep(...)Thread.yield(),否则我最常做内部空检查失败,因为首先安排了thread2 并且在主线程有机会进行外部null 检查之前设置测试对象。
    • 实际上,setTestObject 不是多余的吗?鉴于可以以某种方式确保首先调度主线程并在线程2倒计时continueTest之前执行外部null检查,那么在主线程可以执行内部线程之前,线程1在任何情况下都已经为test分配了一个值null-check,因为它必须等待 thread1 离开同步块。
    • 最好不要在thread2和主线程中两次insideSynchronizedBlock.await(),而是先在主线程中执行insideSynchronizedBlock.await(),然后才真正启动thread2,这也增加了概率在thread2运行之前进行外部null检查,因为主线程仍然是活动线程,在启动thread2后可以直接继续,但这仍然取决于运气。
    • 包装continueTest.await() 可能是个好主意,它实际上是在线程1 上执行的try-catch 和try-catch 一会儿检查计数是否为0,或者thread1 可以如果等待被中断,则继续过早。
    • 对于您的第一个问题:我认为我没有完全理解这个问题。整个练习的目标是我们得到一个"inner null check failed"。这就是这个测试的全部内容。对于您的第二个问题:当一个线程在外部if 内部但在内部if 内部之前,您如何更改test 的值?对于你的第三个问题:有趣的提议。我会考虑并试一试。对于你的第 4 个问题:是的,但我宁愿把它放在一个 try-catch 中,然后如果它被打断就失败。
    猜你喜欢
    • 2023-03-21
    • 1970-01-01
    • 2020-03-23
    • 1970-01-01
    • 1970-01-01
    • 2012-01-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多