【问题标题】:Java multi-thread scalability issueJava多线程可伸缩性问题
【发布时间】:2014-02-17 02:59:42
【问题描述】:

更多更新

正如所选答案中所解释的,问题出在 JVM 的垃圾收集算法中。 JVM 使用卡片标记算法来跟踪对象字段中修改的引用。对于字段的每个引用分配,它会将卡中的关联位标记为真——这会导致错误共享,从而阻止缩放。细节在这篇文章中有很好的描述:https://blogs.oracle.com/dave/entry/false_sharing_induced_by_card

选项 -XX:+UseCondCardMark(在 Java 1.7u40 及更高版本中)缓解了该问题,并使其几乎完美地扩展。


更新

我发现(Park Eung-ju 暗示)将一个对象分配给一个字段变量会产生影响。如果我删除分配,它会完美扩展。 我认为这可能与 Java 内存模型有关——例如,对象引用必须指向一个有效地址才能可见,但我并不完全确定。 double 和 Object 引用(可能)在 64 位机器上都有 8 字节大小,所以在我看来,分配 double 值和 Object 引用在同步方面应该是相同的。

谁有合理的解释?


这里有一个奇怪的 Java 多线程可伸缩性问题。

我的代码只是遍历一个数组(使用访问者模式)来计算简单的浮点运算并将结果分配给另一个数组。没有数据依赖,也没有同步,所以它应该是线性扩展的(2 线程速度快 2 倍,4 线程速度快 4 倍)。

当使用原始(双)数组时,它可以很好地扩展。当使用对象类型(例如字符串)数组时,它根本不会缩放(即使根本没有使用字符串数组的值...)

这是完整的源代码:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CyclicBarrier;

class Table1 {
    public static final int SIZE1=200000000;
    public static final boolean OBJ_PARAM;

    static {
        String type=System.getProperty("arg.type");
        if ("double".equalsIgnoreCase(type)) {
            System.out.println("Using primitive (double) type arg");
            OBJ_PARAM = false;
        } else {
            System.out.println("Using object type arg");
            OBJ_PARAM = true;
        }
    }

    byte[] filled;
    int[] ivals;
    String[] strs;

    Table1(int size) {
        filled = new byte[size];
        ivals = new int[size];
        strs = new String[size];

        Arrays.fill(filled, (byte)1);
        Arrays.fill(ivals, 42);
        Arrays.fill(strs, "Strs");
    }

    public boolean iterate_range(int from, int to, MyVisitor v) {
        for (int i=from; i<to; i++) {
            if (filled[i]==1) {
                // XXX: Here we are passing double or String argument
                if (OBJ_PARAM) v.visit_obj(i, strs[i]);
                else v.visit(i, ivals[i]);
            }
        }
        return true;
    }
}

class HeadTable {
    byte[] filled;
    double[] dvals;
    boolean isEmpty;

    HeadTable(int size) {
        filled = new byte[size];
        dvals = new double[size];
        Arrays.fill(filled, (byte)0);

        isEmpty = true;
    }

    public boolean contains(int i, double d) {
        if (filled[i]==0) return false;

        if (dvals[i]==d) return true;
        return false;
    }

    public boolean contains(int i) {
        if (filled[i]==0) return false;

        return true;
    }
    public double groupby(int i) {
        assert filled[i]==1;
        return dvals[i];
    }
    public boolean insert(int i, double d) {
        if (filled[i]==1 && contains(i,d)) return false;

        if (isEmpty) isEmpty=false;
        filled[i]=1;
        dvals[i] = d;
        return true;
    }
    public boolean update(int i, double d) {
        assert filled[i]==1;
        dvals[i]=d;
        return true;
    }
}


class MyVisitor {
    public static final int NUM=128;

    int[] range = new int[2];
    Table1 table1;
    HeadTable head;
    double diff=0;
    int i;
    int iv;
    String sv;

    MyVisitor(Table1 _table1, HeadTable _head, int id) { 
        table1 = _table1;
        head = _head;
        int elems=Table1.SIZE1/NUM;
        range[0] = elems*id;
        range[1] = elems*(id+1);
    }

    public void run() {
        table1.iterate_range(range[0], range[1], this);
    }

    //YYY 1: with double argument, this function is called
    public boolean visit(int _i, int _v) {
        i = _i;
        iv = _v;
        insertDiff();
        return true;
    }

    //YYY 2: with String argument, this function is called
    public boolean visit_obj(int _i, Object _v) {
        i = _i;
        iv = 42;
        sv = (String)_v;
        insertDiff();
        return true;
    }

    public boolean insertDiff() {
        if (!head.contains(i)) {
            head.insert(i, diff);
            return true;
        }
        double old = head.groupby(i);
        double newval=Math.min(old, diff);
        head.update(i, newval);
        head.insert(i, diff);
        return true;
    }
}
public class ParTest1 {
    public static int THREAD_NUM=4;

    public static void main(String[] args) throws Exception {
        if (args.length>0) {
            THREAD_NUM = Integer.parseInt(args[0]);
            System.out.println("Setting THREAD_NUM:"+THREAD_NUM);
        }
        Table1 table1 = new Table1(Table1.SIZE1);
        HeadTable head = new HeadTable(Table1.SIZE1);

        MyVisitor[] visitors = new MyVisitor[MyVisitor.NUM];
        for (int i=0; i<visitors.length; i++) {
            visitors[i] = new MyVisitor(table1, head, i);
        }

        int taskPerThread = visitors.length / THREAD_NUM;
        MyThread[] threads = new MyThread[THREAD_NUM];
        CyclicBarrier barrier = new CyclicBarrier(THREAD_NUM+1);
        for (int i=0; i<THREAD_NUM; i++) {
            threads[i] = new MyThread(barrier);
            for (int j=taskPerThread*i; j<taskPerThread*(i+1); j++) {
                if (j>=visitors.length) break;
                threads[i].addVisitors(visitors[j]);
            }
        }

        Runtime r=Runtime.getRuntime();
        System.out.println("Force running gc");
        r.gc(); // running GC here (excluding GC effect)
        System.out.println("Running gc done");

        // not measuring 1st run (excluding JIT compilation effect)
        for (int i=0; i<THREAD_NUM; i++) {
            threads[i].start();
        }
        barrier.await();

        for (int i=0; i<10; i++) {
            MyThread.start = true;

            long s=System.currentTimeMillis();
            barrier.await();
            long e=System.currentTimeMillis();
            System.out.println("Iter "+i+"  Exec time:"+(e-s)/1000.0+"s");
        }
    }

}

class MyThread extends Thread {
    static volatile boolean start=true;
    static int tid=0;

    int id=0;
    ArrayList<MyVisitor> tasks;
    CyclicBarrier barrier;
    public MyThread(CyclicBarrier _barrier) {
        super("MyThread"+(tid++));

        barrier = _barrier;
        id=tid;
        tasks = new ArrayList(256);
    }

    void addVisitors(MyVisitor v) {
        tasks.add(v);
    }

    public void run() {
        while (true) {
            while (!start) { ; }

            for (int i=0; i<tasks.size(); i++) {
                MyVisitor v=tasks.get(i);
                v.run();
            }
            start = false;
            try { barrier.await();}
            catch (InterruptedException e) { break; }
            catch (Exception e) { throw new RuntimeException(e); }
        }
    }
}

Java代码可以无依赖编译,可以通过以下命令运行:

java -Darg.type=double -server ParTest1 2

您将工作线程的数量作为参数传递(上面使用 2 个线程)。 设置好数组后(从测量时间中排除),它会执行相同的操作 10 次,在每次迭代时打印出执行时间。 使用上面的选项,它使用双数组,并且它在 1,2,4 线程时扩展性很好(即执行时间减少到 1/2 和 1/4),但是

java -Darg.type=Object -server ParTest1 2

使用此选项,它使用对象(字符串)数组,并且根本不缩放! 我测量了 GC 时间,但它微不足道(而且我还强制在测量时间之前运行 GC)。我已经用 Java 6(更新 43)和 Java 7(更新 51)进行了测试,但它是一样的。

代码中有 XXX 和 YYY 的 cmets 描述了使用 arg.type=double 或 arg.type=Object 选项时的区别。

你能弄清楚这里传递的字符串类型参数是怎么回事吗?

【问题讨论】:

  • 请在您的问题中包含代码;指向外部地点的链接可能会被删除。
  • 您能否执行配置文件以查看是否有大量 GC 活动?第 126 行的(String) 似乎也是一个潜在问题。您可以尝试直接接受String 进行检查吗?
  • 我用GarbageCollectorMXBean测量了GC时间;这并不重要(我还强制在测量时间之前运行 GC)。类型转换不是问题——我在没有转换的情况下进行了测试,结果是一样的。对于包含代码,包含的时间太长。一旦我有足够数量的答案,我会尝试包含它..
  • 请在此处添加代码 (sn-p)。不鼓励在外部源中包含代码。
  • 问题中包含源代码。

标签: java multithreading jvm primitive-types visitor-pattern


【解决方案1】:

HotSpot VM 为引用类型 putfield 字节码生成以下程序集。

mov ref, OFFSET_OF_THE_FIELD(this)  <- this puts the new value for field.

mov this, REGISTER_A
shr 0x9, REGISTER_A
movabs OFFSET_X, REGISTER_B
mov %r12b, (REGISTER_A, REGISTER_B, 1)

putfield 操作在 1 条指令中完成。 但后面还有更多说明。

它们是“卡片标记”说明。 (http://www.ibm.com/developerworks/library/j-jtp11253/)

将引用字段写入卡片中的每个对象(512 字节),将将值存储在相同的内存地址中

我猜,从多个内核存储到相同的内存地址会导致缓存和管道混乱。

添加

byte[] garbage = new byte[600];

到 MyVisitor 定义。

然后每个 MyVisitor 实例将被间隔足够大,不共享卡片标记位,您将看到程序缩放。

【讨论】:

    【解决方案2】:

    这不是一个完整的答案,但可能会为您提供提示。

    我修改了你的代码

    Table1(int size) {
    filled = new byte[size];
    ivals = new int[size];
    strs = new String[size];
    
    Arrays.fill(filled, (byte)1);
    Arrays.fill(ivals, 42);
    Arrays.fill(strs, "Strs");
    }
    

    Table1(int size) {
    filled = new byte[size];
    ivals = new int[size];
    strs = new String[size];
    
    Arrays.fill(filled, (byte)1);
    Arrays.fill(ivals, 42);
    Arrays.fill(strs, new String("Strs"));
    }
    

    更改后,对象类型数组的 4 个线程的运行时间减少了。

    【讨论】:

    • 那么 JVM 不使用并发哈希映射来存储字符串吗?
    【解决方案3】:

    根据http://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.7

    出于 Java 编程语言内存模型的目的,对非易失性 long 或 double 值的单次写入被视为两次单独的写入:每个 32 位一半。这可能会导致线程从一次写入中看到 64 位值的前 32 位,而从另一次写入中看到后 32 位。

    volatile long 和 double 值的写入和读取始终是原子的。

    对引用的写入和读取始终是原子的,无论它们是作为 32 位还是 64 位值实现的。

    分配引用总是原子的, 并且 double 不是原子的,除非它被定义为 volatile。

    问题是 sv 可以被其他线程看到,并且它的分配是原子的。 因此,使用 ThreadLocal 包装访问者的成员变量(i, iv, sv)即可解决问题。

    【讨论】:

      【解决方案4】:

      "sv = (字符串)_v;"有所作为。我还确认类型转换不是因素。仅仅访问 _v 并不能有所作为。为 sv 字段分配一些值会有所不同。但我无法解释为什么。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2011-10-10
        • 1970-01-01
        • 1970-01-01
        • 2011-02-16
        • 2015-09-23
        • 2012-05-26
        • 2010-10-09
        相关资源
        最近更新 更多