4 字节 (int) * 1_000_000_000 ~ 4 Gb
4 字节 (int) * 1_000_000_000 / 64 字节 = 62500000 次(对于 L1 缓存)
4 字节 (int) * 1_000_000_000 / 64 字节 = 62500000 次(对于 L2 缓存)
采取的延迟,对于主内存 100 ns from here,我们都应该知道 100 s。
如果全部在 L1 缓存内(英特尔为 64 字节行),则接近 31.25 毫秒。
但在此之前还有 L2/L3 缓存(相同的行大小)是 218,75 毫秒。
您可以看到顺序读取 1 Mb(换句话说,这是最好的情况),因此对于 4 Gb,它是 4024 * 250 µs = 1006000 µs ~= 1 s。
SSD 磁盘的延迟较小,但并非如此简单。有研究(现在可能已经过期)表明,大多数可供所有人购买的 SSD 磁盘都不能承受非常高的负载率(原因 - 它们失败了,更有趣的是 - 它们有自己的垃圾收集器,可以增加潜伏)。但也有适用于 SSD 磁盘环境的解决方案,例如 Aerospike,当然,一般来说,SSD 比 HDD 更快。
只是为了理解。在典型的笔记本电脑(我的:intel core i5、x64、16Gb RAM)上,我需要从 580 毫秒到 875 毫秒来计算 10 亿个 int 元素的长和。
我还可以看到 Clickhouse 速度从 300 Mb/s 到 354.66 Mb/s 来计算我 SSD 上 Int32 列的总和。
(请注意,两种情况下的 sum 都没有意义,因为类型溢出)
当然,我们也有 CUDA 作为变体,甚至是简单的线程(假设多个线程计算总和,我们可以轻松实现)。
那么...我们能做什么?
有两种缩放类型:垂直和水平。大多数数据库更喜欢水平扩展,我想原因很简单。
水平缩放比垂直缩放更简单。对于垂直扩展,您需要人员(或者您应该自己拥有)在不同领域非常好的专业知识。
例如,在我的生活中,我应该了解很多关于 Java/HotSpot/OS 架构/Many-many 技术/框架/DB 的知识,以便在创建高性能应用程序/算法时编写或理解不同决策的好处。
而这才刚刚开始,还有比我更难的专家。
其他数据库使用垂直扩展,更准确地说,它们使用针对特定场景/查询的特殊优化。
所有决策都是在不同操作之间进行妥协的。
例如,对于 Top N 问题,Vertica 和 Druid 有特定的实现,它们正好解决了这个任务。
在 Cassandra 中,为了使所有选择快速,您应该为具有不同表示的一个表创建多个表或多个视图,以便对特定查询有效,当然,由于数据重复,会花费更多存储空间。
最大的实际问题之一是即使您也可以在一秒钟内读取 10 亿行 - 您不可能同时在同一张表中写入。
换句话说,主要问题 - 很难同时满足所有用户对所有用户任务的请求。
是否有更好的、经过验证的方法来处理数十亿行?
一些例子:
- RAM 与内存映射文件的组合(以保持开销):当您使用内存映射文件(或随机访问文件)时,您可以存储更多数据,并且使用良好的磁盘可以获得高读/写率。李>
- Indexed memory-segments:这个想法是按索引拆分数据,该索引将与段关联,并在段内进行查询,而不处理所有数据。
- 任务的特定存储:当你知道你的数据和需求时,你可以想出存储,这对它来说非常有效,但对其他人来说不是(在你的情况下“找到一个名字”,你可以通过索引和分区数据字母、前缀等);
- 在 C/C++ 中进行复杂计算,有时会更快。 :) 这有点好笑,但是真实的故事。口口相传,C++ 的编程和支持更复杂,但如果你有足够的专业知识,在 C++ 上编写快速应用程序会更容易;
- 数据复制(针对不同查询以多种方式存储数据);
- 代码生成(动态生成代码,将针对每个特定任务进行优化);
- 当然是多线程:如果您可以有效地共享 cpu 资源,请在多个线程中执行一项任务;
- 当然是缓存:缓存结果,基于磁盘/RAM/网络(我指的是外部缓存服务器);
在某些情况下,使用自己的解决方案可能更昂贵(且有效),然后是定制。在某些情况下,它不是...
字符串的比较比较复杂,所以我想你需要从计算比较两个字符串需要多少时间开始。
这个简单的例子展示了我们需要多少时间来比较 Java 中的两个字符串。
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Threads(1)
public class StringEquals {
@Param({"0", "5", "10"})
int prefix;
String theSamePart, theSamePartQuery;
@Setup(Level.Invocation)
public void setData() {
String value = String.valueOf(ThreadLocalRandom.current().nextInt());
theSamePart = prefix > 0 ? value.substring(Math.min(prefix, value.length())) : value;
value = String.valueOf(ThreadLocalRandom.current().nextInt());
theSamePartQuery = prefix > 0 ? theSamePart + value.substring(Math.min(prefix, value.length())) : value;
}
@Benchmark
public boolean equals(StringEquals stringEquals) {
return stringEquals.theSamePart.equals(stringEquals.theSamePartQuery);
}
public static void main(String[] args) throws Exception {
new Runner(new OptionsBuilder()
.include(StringEquals.class.getSimpleName())
.measurementIterations(10)
.warmupIterations(10)
.build()).run();
}
}
结果:
Benchmark (prefix) Mode Cnt Score Error Units
StringEquals.equals 0 sample 3482270 0,047 ± 0,011 us/op
StringEquals.equals:equals·p0.00 0 sample 0,022 us/op
StringEquals.equals:equals·p0.50 0 sample 0,035 us/op
StringEquals.equals:equals·p0.90 0 sample 0,049 us/op
StringEquals.equals:equals·p0.95 0 sample 0,058 us/op
StringEquals.equals:equals·p0.99 0 sample 0,076 us/op
StringEquals.equals:equals·p0.999 0 sample 0,198 us/op
StringEquals.equals:equals·p0.9999 0 sample 8,636 us/op
StringEquals.equals:equals·p1.00 0 sample 9519,104 us/op
StringEquals.equals 5 sample 2686616 0,037 ± 0,003 us/op
StringEquals.equals:equals·p0.00 5 sample 0,021 us/op
StringEquals.equals:equals·p0.50 5 sample 0,028 us/op
StringEquals.equals:equals·p0.90 5 sample 0,044 us/op
StringEquals.equals:equals·p0.95 5 sample 0,048 us/op
StringEquals.equals:equals·p0.99 5 sample 0,060 us/op
StringEquals.equals:equals·p0.999 5 sample 0,238 us/op
StringEquals.equals:equals·p0.9999 5 sample 8,677 us/op
StringEquals.equals:equals·p1.00 5 sample 1935,360 us/op
StringEquals.equals 10 sample 2989681 0,039 ± 0,001 us/op
StringEquals.equals:equals·p0.00 10 sample 0,021 us/op
StringEquals.equals:equals·p0.50 10 sample 0,030 us/op
StringEquals.equals:equals·p0.90 10 sample 0,049 us/op
StringEquals.equals:equals·p0.95 10 sample 0,056 us/op
StringEquals.equals:equals·p0.99 10 sample 0,074 us/op
StringEquals.equals:equals·p0.999 10 sample 0,222 us/op
StringEquals.equals:equals·p0.9999 10 sample 8,576 us/op
StringEquals.equals:equals·p1.00 10 sample 325,632 us/op
因此假设您需要 1_000_000_000 个字符串,在 99.99% 的情况下,您需要大约 8_000_000_000 us = 8000 s 来处理 10 亿个字符串。
相比之下,我们可以尝试以并行方式进行:
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.concurrent.*;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(1)
public class SearchBillionForkJoin {
static final int availableProcessors = 4; // Runtime.getRuntime().availableProcessors()
static final int size = 10_000_000, bucketSize = size / availableProcessors;
static final int handlersCount = availableProcessors;
@Param({"50", "100"})
int spinner;
String[] a;
Callable<Integer>[] callables;
ForkJoinTask<Integer>[] tasks;
QueryHolder queryHolder;
@Setup(Level.Trial)
public void setup() {
callables = new Callable[handlersCount];
queryHolder = new QueryHolder();
a = new String[size];
for (int i = 0; i < callables.length; ++i) {
switch (i) {
case 0:
callables[i] = createForBucket(queryHolder, a, 0, bucketSize);
break;
case 1:
callables[i] = createForBucket(queryHolder, a, bucketSize, bucketSize * 2);
break;
case 2:
callables[i] = createForBucket(queryHolder, a, bucketSize * 2, bucketSize * 3);
break;
case 3:
callables[i] = createForBucket(queryHolder, a, bucketSize * 3, size);;
break;
}
}
tasks = new ForkJoinTask[handlersCount];
}
@Setup(Level.Invocation)
public void setData() {
for (int i = 0; i < a.length; ++i) {
a[i] = String.valueOf(ThreadLocalRandom.current().nextInt());
}
queryHolder.query = String.valueOf(ThreadLocalRandom.current().nextInt());
}
@Benchmark
public Integer forkJoinPoolWithoutCopy() {
try {
for (int i = 0; i < tasks.length; ++i) {
tasks[i] = ForkJoinPool.commonPool().submit(callables[i]);
}
Integer position = -1;
boolean findMore = true;
head:
while(position == -1 && findMore) {
findMore = false;
for (int i = 0; i < tasks.length; ++i) {
if (tasks[i].isDone() && !tasks[i].isCancelled()) {
final Integer value = tasks[i].get();
if (value > -1) {
position = value;
for (int j = 0; j < tasks.length; ++j) {
if (j != i && !tasks[j].isDone()) {
tasks[j].cancel(true);
}
}
break head;
}
} else {
findMore = true;
}
}
int counter = spinner;
while (counter > 0) --counter;
}
return position;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws Exception {
new Runner(new OptionsBuilder()
.include(SearchBillionForkJoin.class.getSimpleName())
.jvmArgs("-Xmx10G")
.measurementIterations(10)
.warmupIterations(10)
.build()).run();
}
static boolean isDone(ForkJoinTask[] tasks) {
for (int i = 0; i < tasks.length; ++i) {
if (!tasks[i].isDone()) {
return false;
}
}
return true;
}
static Callable<Integer> createForBucket(QueryHolder queryHolder, String[] a, int start, int end) {
return new Callable<Integer>() {
@Override
public Integer call() throws Exception {
for (int j = start; j < end; ++j) {
if (queryHolder.query.equals(a[j])) {
return j;
}
}
return -1;
}
};
}
static class QueryHolder {
String query = null;
}
}
我使用 10_000_000 和 4 个线程(用于 4 个 cpu 核心),因为我没有足够的内存。
结果看起来仍然不合适。
Benchmark (spinner) Mode Cnt Score Error Units
SearchBillionForkJoin.forkJoinPoolWithoutCopy 50 sample 166 47,136 ± 1,989 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.00 50 sample 5,521 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.50 50 sample 47,055 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.90 50 sample 54,788 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.95 50 sample 56,653 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.99 50 sample 61,352 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.999 50 sample 63,635 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.9999 50 sample 63,635 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p1.00 50 sample 63,635 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy 100 sample 162 51,288 ± 4,031 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.00 100 sample 5,448 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.50 100 sample 49,840 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.90 100 sample 67,030 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.95 100 sample 90,505 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.99 100 sample 110,920 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.999 100 sample 121,242 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p0.9999 100 sample 121,242 ms/op
SearchBillionForkJoin.forkJoinPoolWithoutCopy:forkJoinPoolWithoutCopy·p1.00 100 sample 121,242 ms/op
换句话说,63,635 毫秒 * 100 = 6363,5 毫秒 = 6 秒。
例如,如果您可以使用亲和锁(每个线程一个完整的 cpu),则可以改进此结果。但可能太复杂了。
让我们尝试使用片段来表达想法:
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(1)
public class SearchInMapBillionForkJoin {
static final int availableProcessors = 8; // Runtime.getRuntime().availableProcessors()
static final int size = 10_000_000, bucketSize = size / availableProcessors;
static final int handlersCount = availableProcessors;
Map<Integer, List<StringWithIndex>> strings;
QueryHolder queryHolder;
ForkJoinTask<Integer>[] tasks;
Callable<Integer>[] callables;
@Param({"50", "100"})
int spinner;
@Setup(Level.Trial)
public void setup() throws Exception {
queryHolder = new QueryHolder();
strings = new ConcurrentHashMap<>();
tasks = new ForkJoinTask[handlersCount];
callables = new Callable[handlersCount];
setData();
}
public void setData() throws Exception {
final int callableBucket = size / handlersCount;
for (int i = 0; i < handlersCount; ++i) {
callables[i] = createGenerateForBucket(strings, callableBucket);
tasks[i] = ForkJoinPool.commonPool().submit(callables[i]);
}
while(!isDone(tasks)) {
int counter = spinner;
while (counter > 0) --counter;
}
Map<Integer, Integer> distribution = new HashMap<>();
for (List<StringWithIndex> stringWithIndices : strings.values()) {
distribution.compute(stringWithIndices.size(), (key, value) -> value == null ? 1 : value + 1);
}
int maxListSize = 0;
for (int i = 0; i < handlersCount; ++i) {
Integer max = tasks[i].get();
if (max > maxListSize) {
maxListSize = max;
}
}
System.out.println("maxListSize = " + maxListSize);
System.out.println("list size distribution = " + distribution);
System.out.println("map size = " + strings.size());
distribution = null;
queryHolder.query = String.valueOf(ThreadLocalRandom.current().nextInt());
}
@Benchmark
public Integer findInSegment() {
final String query = this.queryHolder.query;
final Integer hashCode = query.hashCode();
final Map<Integer, List<StringWithIndex>> strings = this.strings;
if (strings.containsKey(hashCode)) {
List<StringWithIndex> values = strings.get(hashCode);
if (!values.isEmpty()) {
final int valuesSize = values.size();
if (valuesSize > 100_000) {
final int bucketSize = valuesSize / handlersCount;
callables[0] = createSearchForBucket(query, values, 0, bucketSize);
callables[1] = createSearchForBucket(query, values, bucketSize, bucketSize * 2);
callables[2] = createSearchForBucket(query, values, bucketSize * 2, bucketSize * 3);
callables[3] = createSearchForBucket(query, values, bucketSize * 3, values.size());
try {
for (int i = 0; i < callables.length; ++i) {
tasks[i] = ForkJoinPool.commonPool().submit(callables[i]);
}
Integer position = -1;
boolean findMore = true;
head:
while (position == -1 && findMore) {
findMore = false;
for (int i = 0; i < tasks.length; ++i) {
if (tasks[i].isDone() && !tasks[i].isCancelled()) {
final Integer value = tasks[i].get();
if (value > -1) {
position = value;
for (int j = 0; j < tasks.length; ++j) {
if (j != i && !tasks[j].isDone()) {
tasks[j].cancel(true);
}
}
break head;
}
} else {
findMore = true;
}
}
int counter = spinner;
while (counter > 0) --counter;
}
return position;
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
for (StringWithIndex stringWithIndex : values) {
if (query.equals(stringWithIndex.value)) {
return stringWithIndex.index;
}
}
}
}
}
return -1;
}
public static void main(String[] args) throws Exception {
new Runner(new OptionsBuilder()
.include(SearchInMapBillionForkJoin.class.getSimpleName())
.jvmArgs("-Xmx6G")
.measurementIterations(10)
.warmupIterations(10)
.build()).run();
}
static class StringWithIndex implements Comparable<StringWithIndex> {
final int index;
final String value;
public StringWithIndex(int index, String value) {
this.index = index;
this.value = value;
}
@Override
public int compareTo(StringWithIndex o) {
int a = this.value.compareTo(o.value);
if (a == 0) {
return Integer.compare(this.index, o.index);
}
return a;
}
@Override
public int hashCode() {
return this.value.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof StringWithIndex) {
return this.value.equals(((StringWithIndex) obj).value);
}
return false;
}
}
static class QueryHolder {
String query = null;
}
static Callable<Integer> createSearchForBucket(String query, List<StringWithIndex> values, int start, int end) {
return new Callable<Integer>() {
@Override
public Integer call() throws Exception {
for (int j = start; j < end; ++j) {
StringWithIndex stringWithIndex = values.get(j);
if (query.equals(stringWithIndex.value)) {
return stringWithIndex.index;
}
}
return -1;
}
};
}
static Callable<Integer> createGenerateForBucket(Map<Integer, List<StringWithIndex>> strings,
int count) {
return new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int maxListSize = 0;
for (int i = 0; i < count; ++i) {
String value = String.valueOf(ThreadLocalRandom.current().nextInt());
List<StringWithIndex> values = strings.computeIfAbsent(value.hashCode(), k -> new ArrayList<>());
values.add(new StringWithIndex(i, value));
if (values.size() > maxListSize) {
maxListSize = values.size();
}
}
return maxListSize;
}
};
}
static boolean isDone(ForkJoinTask[] tasks) {
for (int i = 0; i < tasks.length; ++i) {
if (!tasks[i].isDone()) {
return false;
}
}
return true;
}
}
结果:
Benchmark (spinner) Mode Cnt Score Error Units
SearchInMapBillionForkJoin.findInSegment 50 sample 5164328 ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.00 50 sample ≈ 10⁻⁵ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.50 50 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.90 50 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.95 50 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.99 50 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.999 50 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.9999 50 sample 0.009 ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p1.00 50 sample 18.973 ms/op
SearchInMapBillionForkJoin.findInSegment 100 sample 4642775 ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.00 100 sample ≈ 10⁻⁵ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.50 100 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.90 100 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.95 100 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.99 100 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.999 100 sample ≈ 10⁻⁴ ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p0.9999 100 sample 0.005 ms/op
SearchInMapBillionForkJoin.findInSegment:findInSegment·p1.00 100 sample 0.038 ms/op
在做任何全局性结论之前,很高兴知道对这个例子的一些批评:
- 因为人工基准数据在列表大小之间有很好的分布:一个例子:maxListSize = 3,列表大小分布 = {1=9954167, 2=22843, 3=49},地图大小 = 9977059。maxListSize 为所有迭代只有 4 次。
- 因此我们永远不会进入“if (valuesSize > 100_000)”分支;
- 此外,在大多数情况下,我们可能不会进入“} else { for (StringWithIndex stringWithIndex : values) {”,因为“if (strings.containsKey(hashCode))”条件;
- 与之前的测试相比,此测试在不同的 PC(8 cpu、32 Gb RAM、amd64)上运行;
在这里你可以明白,检查map(或内存段)中是否有key,显然,比检查所有数据更好。这个主题非常广泛。有很多人从事性能工作,可以说“性能优化是一个无限的过程”。 :)
我还应该提醒一下“预优化是不好的”,我补充说,这并不意味着你应该不加思索地设计你的系统,不合理。
免责声明:
所有这些信息都可能是错误的。它仅供参考,不得纳入任何合同。在将其用于生产场景之前,您应该自行检查。而且您不应该在生产代码中使用此信息指的是我。我不对可能的金钱损失负责。所有这些信息均不涉及我曾经工作过的任何公司。我不隶属于任何 MySQL/MongoDB/Cassandra/BigTable/BigData 以及 Apache Ignite/Hazelcast/Vertica/Clickhouse/Aerospike 或任何其他数据库。