首先,我建议您去掉 .replace(/[\/\+\=]/g, ''),因为它会失去随机性,事实上,将一些仅在这些字符上不同的唯一 orderId 映射为相同。
我的建议是使用 base58 编码器base-x,它将直接编码为您想要的内容。这个编码器库允许您传入要用于编码的确切字符集,并且它只是使用它。
这是我建议的代码,您可以插入:
const base58Encode = require('base-x')('123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz').encode;
然后,在您创建 orderID 的地方,更改为:
const idSeed = crypto.randomBytes(16)
const orderId = base58Encode(idSeed);
我不知道 dup 的概率(你需要一个加密/统计学家),但我在没有 dup 的情况下运行了 10,000,000 orderId 值,我重复了 10 次但仍然没有得到一个重复。显然,这并不意味着它不会发生,但我也在做这种快速射击,Date.now() 可能甚至没有太大不同。我不能运行它超过 10,000,000 次,因为我用完内存试图将所有先前的 orderId 值存储在 Set 对象中以检查重复。您可以增加 nodejs 的内存并以更高的值运行它,或者将其放入 shell 脚本并一遍又一遍地运行。
如果您想自己反复运行,这是我的 dup 检查程序:
const crypto = require('crypto');
function addCommas(str) {
var parts = (str + "").split("."),
main = parts[0],
len = main.length,
output = "",
i = len - 1;
while(i >= 0) {
output = main.charAt(i) + output;
if ((len - i) % 3 === 0 && i > 0) {
output = "," + output;
}
--i;
}
// put decimal part back
if (parts.length > 1) {
output += "." + parts[1];
}
return output;
}
let set = new Set();
const numToTry = 10_000_000;
const debugMultiple = 100_000;
for (let i = 0; i < numToTry; i++) {
if (i !== 0 && i % debugMultiple === 0) {
console.log(`Attempt #${addCommas(i)}`);
}
const idSeed = crypto.randomBytes(16).toString('base64') + '' + Date.now();
const orderId = Buffer.from(idSeed).toString('base64').replace(/[\/\+\=]/g, '');
//console.log(orderId);
if (set.has(orderId)) {
console.log(`Found conflict after ${addCommas(i)} attempts`);
console.log(`Conflicting orderId = ${orderId}`);
process.exit(1);
}
set.add(orderId);
}
console.log(`No dups found after ${addCommas(numToTry)} attempts`);
在花费大量时间之前,我会调查您的数据库,看看它是否会为您生成一个可以用作 orderId 的唯一键。这是一个常见的数据库问题。
这是一个较新的版本,我可以通过它运行多达 1,000,000,000 个 ID。仍然没有冲突。因为我不可能在内存中拥有一个包含 1,000,000,000 个 ID 的巨型 Set 对象,所以我集思广益,想出了很多方法来做到这一点。我考虑过使用 redis 服务器并将 id 存储在其中,因为它可以使用更多内存。但是,后来我想出了一个基于磁盘的解决方案,可以根据需要扩展。基本思路如下:
-
您的一个 orderId 值如下所示:
zz6h6q6oRELJXmh4By4NUw1587006335064`
当我生成一个新的 orderId 时,如果我可以将它分成一个基于磁盘的“桶”,其中只包含具有相同开头字符的 id,那么我可以将所有 id 拆分到许多不同的文件中。
这个想法是,如果以相同的两个字符开头的每个 id 都存储在同一个文件中,那么任何其他文件中的其他 id 都不可能与该文件中的 id 匹配。
然后您可以分两次完成您的工作。第一遍生成 1,000,000,000 个 id,并在生成时根据 id 开头的字符将它们写入适当的存储桶文件。
在生成所有 id 并将其写入相应的存储桶文件后,第二遍是一次遍历每个存储桶文件,将所有 id 加载到 Set 对象中,看看是否任何冲突。如果不匹配,则清除该 Set 并转到下一个文件。这使您可以分段执行内存受限部分(处理 Set 对象),以便为大量 id 使用更少的内存。
-
那么,那么问题是如何将 id 分成桶文件?由于 base64 id 值中的每个字节最多代表 64 个可能的值,因此如果仅使用 id 的前两个字符来确定存储桶,您将获得多达 64*64=4096 个存储桶。出于某种原因(这一定与crypto.randomBytes(16) 的工作方式有关),我只发现实际 orderId 值中实际出现了大约 3800 个存储桶。
但是,如果您将 1,000,000,000 个值拆分为 3800 个存储桶,则每个存储桶将获得大约 263,000 个 ID。我们之前已经证明我们可以轻松地处理内存中的 15,000,000 个 id,因此这应该足以一次处理内存中的每个存储桶。事实上,如果我有足够的耐心,我们可能会使用仅基于前两个字符的存储桶达到 10,000,000,000。
如果您想要更多的存储桶,它们可以基于前三个字符,但是您开始为单个目录获取太多文件,并且您必须开始在目录之间拆分文件,这可以完成,但会使事情复杂化。
-
所以,我需要创建一个基于 id 前两个字符的存储桶文件名。 id 区分大小写(base64 使用大小写来表示不同的值)。我的 Windows 文件系统不区分大小写,所以我不能直接使用前两个字母作为文件名。因此,我创建了一个简单的算法,它采用两个字符的混合大小写前缀并将其转换为四个字符的小写名称。它将小写字母“a”映射到“a_”,将非小写字符(如“B”映射到“bb”)。因此,小写值后跟_,大写值后跟自身的第二个副本。所以,你会有这样的 id 映射:
"ab" => "a_b_"
"AB" => "aabb"
"aB" => "a_BB"
"Ab" => "aab_"
非字母字符(如数字)只是映射到自身的两倍,就像任何非小写字符一样。因此,有了这个,我可以获得一个 id 值,获取前两个字符,查看它属于哪个文件名并将其附加到该文件。
出于性能原因,我创建了一个 Bucket 类,该类维护一个等待写入内存的 id 缓存。当特定存储桶中的缓存达到一定长度(我现在已设置为 3000)时,我会立即将它们全部附加到文件中并清除存储桶缓存。当我完成生成所有 ID 后,我会遍历所有存储桶并清除所有剩余的 ID。使用这种写缓存,id 的生成主要受 CPU 限制,而不是磁盘限制。磁盘利用率约为 30%。 CPU 的一个核心在 id 生成期间被固定。这可能会通过一些 WorkerThread 来加速。
-
因此,一旦将所有 id 写入存储桶文件并且内存中根本没有任何内容,就可以一次读取每个存储桶文件,将它们的所有 id 加载到一个 Set 中,看看是否有任何冲突。每个存储桶文件都是以行分隔的 id 列表,它们都以相同的前缀开头,如下所示:
zzoexm2FE8DIrHnXpp8qw1587003338798
zzuP6LpusKIMeYrfl0WJnQ1587003338885
zz1itmTqA3yaFNo1KFUhg1587003338897
zz3TEFeqH965OTFCrFTjJQ1587003338904
zz8XQKvq11fCqn9kB4O2A1587003338904
zzaKMTFPct5ls7WW3YmcQ1587003338927
zzyX3htzIqi4zOq4Cxdg1587003338928
zzoHu6vIHMEgNMVY46Qw1587003338962
所以,我只是逐行读取给定的存储桶文件,并根据该存储桶文件的 Set 检查每个 id。如果它已经在集合中,则存在冲突。输出该冲突并中止。如果它不是 Set,请将其添加到 Set 并继续使用该存储桶文件中的其余 id。由于此存储桶文件包含以相同两个字符开头的所有 id,因此任何其他存储桶文件中的任何其他 id 都不会与这些冲突,因此您可以将所有这些 id 相互比较。
bucket 文件的读取受磁盘限制。在 3844 个桶文件中运行 1,000,000,000 个 id 时,每个桶文件约为 5MB,即 22GB 数据。每个文件都必须被读取并解析成行,然后将每个 id 添加到 Set 中。
我尝试了几种不同的机制来逐行读取文件,发现它们非常慢。我从readLine 接口开始,它允许您通过readStream 逐行迭代。它很慢。然后,我只是用fs.readFile() 将整个文件读入内存,变成一个巨大的字符串,然后在其上调用.split("\n") 将其分成几行。这实际上比readLine 好,但仍然很慢。我推测数据副本太多,这意味着垃圾收集器不得不大量工作。
因此,最后我编写了自己的readFile 版本,它将整个文件读入一个可重用的缓冲区,并通过直接解析二进制缓冲区将其拆分为行。这在此过程中至少保存了几个数据副本,并节省了大量的 GC 工作。它并不快,但它更快。重用缓冲区还为我节省了很多单独的 5MB 分配。
-
第一遍(生成 ID)受 CPU 限制。我推测我可以通过启动一些工作线程(可能像 6 个,因为我有一个 8 核 CPU)并让它们在生成 id 时加紧工作,从而加快速度。我会将 1/6 的数量分配给每个 Worker 线程,当它们累积到 1000 左右时,他们会将这 1000 个发送回主线程,主线程会将它们插入正确的存储桶中。但是,在我开始使用 WorkerThreads 之前,我需要做一些基准测试,看看在 crypto.randomBytes() 函数与其他地方相比,第一次传递的总时间有多少,以确保它是值得的。
-
第二次通过它完全是磁盘绑定,但实际的磁盘吞吐量是可怕的(比如 60MB/s)。要么我的磁盘真的很烂,nodejs 不太擅长这种类型的文件 I/O,要么在处理 3800 个大文件时开销很大(读取目录条目,为第一个扇区寻找磁盘,读取尽可能多的顺序扇区你可以,再次寻找,等等......)。我可以在我最快的 SSD 上试用它,但我真的不想每次玩这个时都将 20GB 写入我的 SSD。
我尝试增加 UV_THREADPOOL_SIZE,认为可能 nodejs 排队了太多的读/写。但是,当我增加线程池大小时,性能实际上变得更糟了。我猜它的默认值 4 足以让一个磁盘控制器保持忙碌。除此之外,您只是要求磁盘头在不同文件之间跳转,而读取一个文件的所有内容会更有效,然后转到下一个文件等等。
虽然第二遍主要是磁盘绑定,但仍有大约 30% 的时间花在与磁盘无关的内容上(基于我插入的一些高分辨率计时器)。因此,如果它不会对磁盘争用造成太大伤害,那么您可以将不同存储桶文件的处理分散到一组 WorkerThread 中。您至少会在该进程的 CPU 部分获得并行性。不过,您可能会遇到更多的磁盘争用,所以我不确定它是否会有所帮助。
最后,存储桶文件可以在驱动器之间拆分,甚至理想情况下在单独的 SATA 控制器之间拆分。我有很多驱动器和几个 SATA 控制器可以尝试,但它非常适合我的系统。
这是桶系统的代码。
// unique-test.js
const crypto = require('crypto');
const readline = require('readline');
const fs = require('fs');
const fsp = fs.promises;
const path = require('path');
const {fastReadFileLines} = require('./fast-read-file.js');
function delay(t, v) {
return new Promise(resolve => {
setTimeout(resolve, t, v);
})
}
function addCommas(str) {
var parts = (str + "").split("."),
main = parts[0],
len = main.length,
output = "",
i = len - 1;
while(i >= 0) {
output = main.charAt(i) + output;
if ((len - i) % 3 === 0 && i > 0) {
output = "," + output;
}
--i;
}
// put decimal part back
if (parts.length > 1) {
output += "." + parts[1];
}
return output;
}
// make a unique filename using first several letters of
// the string. Strings are case sensitive, bucket filenames
// cannot be so it has to be case neutralized while retaining
// uniqueness
function makeBucketKey(str) {
let piece = str.substr(0,2);
let filename = [];
// double up each character, but
for (let ch of piece) {
filename.push(ch);
if (ch >= 'a' && ch <= 'z') {
filename.push("_")
} else {
filename.push(ch);
}
}
return filename.join("").toLowerCase();
}
// this value times the number of total buckets has to fit in memory
const bucketCacheMax = 3000;
class Bucket {
constructor(filename, writeToDisk = true) {
this.items = [];
this.filename = filename;
this.cnt = 0;
this.writeToDisk = writeToDisk;
// We dither the bucketCacheMax so that buckets aren't all trying to write at the same time
// After they write once (and are thus spread out in time), then they will reset to full cache size
let dither = Math.floor(Math.random() * bucketCacheMax) + 10;
if (Math.random() > 0.5) {
dither = -dither;
}
this.bucketCacheMax = bucketCacheMax + dither;
}
// add an item to cache, flush to disk if necessary
async add(item) {
++this.cnt;
this.items.push(item);
if (this.items.length > this.bucketCacheMax) {
// the dithered cache size is only used on the first write
// to spread out the writes. After that, we want a full cache size
let priorBucketCacheMax = this.bucketCacheMax;
this.bucketCacheMax = bucketCacheMax;
await this.flush();
}
}
// write any cached items to disk
async flush() {
if (this.writeToDisk && this.items.length) {
let data = this.items.join("\n") + "\n";
this.items.length = 0;
if (this.flushPending) {
throw new Error("Can't call flush() when flush is already in progress");
}
function flushNow() {
this.flushPending = true;
return fsp.appendFile(this.filename, data).finally(() => {
this.flushPending = false;
});
}
// we write to disk with retry because we once go EBUSY (perhaps from a backup program)
let retryCntr = 0;
const retryMax = 10;
const retryDelay = 200;
const retryBackoff = 200;
let lastErr;
function flushRetry() {
if (retryCntr > retryMax) {
throw lastErr;
}
return flushNow.call(this).catch(err => {
lastErr = err;
console.log("flushNow error, retrying...", err);
return delay(retryDelay + (retryCntr++ * retryBackoff)).then(() => {
return flushRetry.call(this);
});
});
}
return flushRetry.call(this);
}
this.items.length = 0;
}
delete() {
return fsp.unlink(this.filename);
}
get size() {
return this.cnt;
}
}
class BucketCollection {
constructor(dir, writeToDisk = true) {
// map key is bucketID, value is bucket object for that key
this.buckets = new Map();
this.dir = dir;
}
add(key, data) {
let bucket = this.buckets.get(key);
if (!bucket) {
let filename = path.join(this.dir, key);
bucket = new Bucket(filename, writeToDisk);
this.buckets.set(key, bucket);
}
return bucket.add(data);
}
async flush() {
// this could perhaps be sped up by doing 4 at a time instead of serially
for (let bucket of this.buckets.values()) {
await bucket.flush();
}
}
async delete() {
// delete all the files associated with the buckets
for (let bucket of this.buckets.values()) {
await bucket.delete();
}
}
get size() {
return this.buckets.size;
}
getMaxBucketSize() {
let max = 0;
for (let bucket of this.buckets.values()) {
max = Math.max(max, bucket.size);
}
return max;
}
}
// program options
let numToTry = 100_000;
let writeToDisk = true;
let cleanupBucketFiles = true;
let skipAnalyze = false;
let analyzeOnly = false;
// -nodisk don't write to disk
// -nocleanup erase bucket files when done
// -analyzeonly analyze files in bucket directory only
if (process.argv.length > 2) {
let args = process.argv.slice(2);
for (let arg of args) {
arg = arg.toLowerCase();
switch(arg) {
case "-nodisk":
writeToDisk = false;
break;
case "-nocleanup":
cleanupBucketFiles = false;
break;
case "-skipanalyze":
skipAnalyze = true;
break;
case "-analyzeonly":
analyzeOnly = true;
break;
default:
if (/[^\d,]/.test(arg)) {
console.log(`Unknown argument ${arg}`);
process.exit(1);
} else {
numToTry = parseInt(arg.replace(/,/g, ""), 10);
}
}
}
}
let bucketDir = path.join(__dirname, "buckets");
let collection = new BucketCollection(bucketDir, writeToDisk);
console.log(`Running ${addCommas(numToTry)} random ids`);
const debugMultiple = 100_000;
async function analyze() {
let cntr = 0;
const cntrProgress = 10;
const cntrProgressN = 10n;
let buffer = null;
let times = [];
async function processFile(file) {
if (cntr !== 0 && cntr % cntrProgress === 0) {
let sum = 0n;
for (let i = 0; i < cntrProgress; i++) {
sum += times[i];
}
console.log(`Checking bucket #${cntr}, Average readFileTime = ${sum / cntrProgressN}`);
times.length = 0;
}
++cntr;
let set = new Set();
let startT = process.hrtime.bigint();
let buffer = null;
let result = await fastReadFileLines(file, buffer);
let data = result.lines;
// keep reusing buffer which may have been made larger since last time
buffer = result.buffer;
//let data = (await fsp.readFile(file, "utf8")).split("\n");
let afterReadFileT = process.hrtime.bigint();
for (const lineData of data) {
let line = lineData.trim();
if (line) {
if (set.has(line)) {
console.log(`Found conflict on ${data}`);
} else {
set.add(line);
}
}
}
let loopT = process.hrtime.bigint();
let divisor = 1000n;
let readFileTime = (afterReadFileT - startT) / divisor;
times.push(readFileTime);
// console.log(`readFileTime = ${readFileTime}, loopTime = ${(loopT - afterReadFileT) / divisor}`);
/*
let rl = readline.createInterface({input:fs.createReadStream(file), crlfDelay: Infinity});
for await (const line of rl) {
let data = line.trim();
if (data) {
if (set.has(data)) {
console.log(`Found conflict on ${data}`);
} else {
set.add(data);
}
}
}
*/
}
if (analyzeOnly) {
let files = await fsp.readdir(bucketDir);
for (let file of files) {
let fullPath = path.join(bucketDir, file)
await processFile(fullPath);
}
} else {
for (let bucket of collection.buckets.values()) {
await processFile(bucket.filename);
}
}
}
async function makeRandoms() {
let start = Date.now();
if (analyzeOnly) {
return analyze();
}
for (let i = 0; i < numToTry; i++) {
if (i !== 0 && i % debugMultiple === 0) {
console.log(`Attempt #${addCommas(i)}`);
}
const idSeed = crypto.randomBytes(16).toString('base64') + '' + Date.now();
const orderId = idSeed.toString('base64').replace(/[\/\+\=]/g, '');
//console.log(orderId);
let bucketKey = makeBucketKey(orderId);
await collection.add(bucketKey, orderId);
}
console.log(`Total buckets: ${collection.size}, Max bucket size: ${collection.getMaxBucketSize()}`);
//console.log(`No dups found after ${addCommas(numToTry)} attempts`);
await collection.flush();
let delta = Date.now() - start;
console.log(`Run time for creating buckets: ${addCommas(delta)}ms, ${addCommas((delta / numToTry) * 1000)}ms per thousand`);
if (!skipAnalyze) {
console.log("Analyzing buckets...")
await analyze();
}
if (cleanupBucketFiles) {
console.log("Cleaning up buckets...")
await collection.delete();
}
}
makeRandoms();
而且,这是我更快的 readfile 函数的依赖文件(位于同一目录中):
// fast-read-file.js
const fsp = require('fs').promises;
async function fastReadFile(filename, buffer = null) {
let handle = await fsp.open(filename, "r");
let bytesRead;
try {
let stats = await handle.stat();
if (!buffer || buffer.length < stats.size) {
buffer = Buffer.allocUnsafe(stats.size);
}
// clear any extra part of the buffer so there's no data leakage
// from a previous file via the shared buffer
if (buffer.length > stats.size) {
buffer.fill(0, stats.size);
}
let ret = await handle.read(buffer, 0, stats.size, 0);
bytesRead = ret.bytesRead;
if (bytesRead !== stats.size) {
// no data leaking out
buffer.fill(0);
throw new Error("bytesRead not full file size")
}
} finally {
handle.close().catch(err => {
console.log(err);
});
}
return {buffer, bytesRead};
}
async function fastReadFileLines(filename, buf = null) {
const {bytesRead, buffer} = await fastReadFile(filename, buf);
let index = 0, targetIndex;
let lines = [];
while (index < bytesRead && (targetIndex = buffer.indexOf(10, index)) !== -1) {
// the buffer may be larger than the actual file data
// so we have to limit our extraction of data to only what was in the actual file
let nextIndex = targetIndex + 1;
// look for CR before LF
if (buffer[targetIndex - 1] === 13) {
--targetIndex;
}
lines.push(buffer.toString('utf8', index, targetIndex));
index = nextIndex;
}
// check for data at end of file that doesn't end in LF
if (index < bytesRead) {
lines.push(buffer.toString('utf8', index, bytesRead));
}
return {buffer, lines};
}
module.exports = {fastReadFile, fastReadFileLines};
// if called directly from command line, run this test function
// A file of ids named "zzzz" must exist in this directory
if (require.main === module) {
let buffer = Buffer.alloc(1024 * 1024 * 10, "abc\n", "utf8");
fastReadFileLines("zzzz", buffer).then(result => {
let lines = result.lines;
console.log(lines[0]);
console.log(lines[1]);
console.log(lines[2]);
console.log("...");
console.log(lines[lines.length - 3]);
console.log(lines[lines.length - 2]);
console.log(lines[lines.length - 1]);
}).catch(err => {
console.log(err);
});
}
您首先在运行它的位置下创建一个名为“buckets”的子目录。然后,您从命令行运行它:
node unique-test.js 1,000,000,000
有一些受支持的命令行选项(主要在调试期间使用):
-nodisk Don't write to disk
-nocleanup Don't cleanup generated disk files when done
-skipAnalyze Just generate bucket files, don't analyze them
-analyzeOnly Use previously generated bucket files and analyze them
您在命令行中传递的数字是要生成多少个 id。如果什么都不通过,则默认为 100,000。为了便于阅读,它处理逗号。