首先,正如@xtreme-biker 所说,性能很大程度上取决于您的硬件。具体来说,我的第一个建议是检查您是在虚拟机还是本机主机上运行。在我的情况下,使用带有 SDD 驱动器的 i7 上的 CentOS VM,我每秒可以读取 123,000 个文档,但在同一驱动器上的 Windows 主机上运行的完全相同的代码每秒可以读取多达 387,000 个文档。
接下来,假设您确实需要阅读完整的合集。这就是说您必须执行全扫描。假设您不能更改 MongoDB 服务器的配置,只能优化您的代码。
然后一切都归结为什么
collection.find().forEach((Block<Document>) document -> count.increment());
确实如此。
快速展开 MongoCollection.find() 表明它实际上是这样做的:
ReadPreference readPref = ReadPreference.primary();
ReadConcern concern = ReadConcern.DEFAULT;
MongoNamespace ns = new MongoNamespace(databaseName,collectionName);
Decoder<Document> codec = new DocumentCodec();
FindOperation<Document> fop = new FindOperation<Document>(ns,codec);
ReadWriteBinding readBinding = new ClusterBinding(getCluster(), readPref, concern);
QueryBatchCursor<Document> cursor = (QueryBatchCursor<Document>) fop.execute(readBinding);
AtomicInteger count = new AtomicInteger(0);
try (MongoBatchCursorAdapter<Document> cursorAdapter = new MongoBatchCursorAdapter<Document>(cursor)) {
while (cursorAdapter.hasNext()) {
Document doc = cursorAdapter.next();
count.incrementAndGet();
}
}
这里 FindOperation.execute() 相当快(不到 10 毫秒),大部分时间都花在了 while 循环中,特别是在私有方法 QueryBatchCursor.getMore() 中
getMore() 调用DefaultServerConnection.command() 其时间主要用于两个操作:1) 从服务器获取字符串数据和2) 转换字符串数据进入 BsonDocument。
事实证明,Mongo 非常聪明,它会执行多少次网络往返来获取大型结果集。它将首先使用 firstBatch 命令获取 100 个结果,然后获取更大的批次,其中 nextBatch 是批次大小,具体取决于集合大小达到限制。
所以,在木头下会发生这样的事情来取第一批。
ReadPreference readPref = ReadPreference.primary();
ReadConcern concern = ReadConcern.DEFAULT;
MongoNamespace ns = new MongoNamespace(databaseName,collectionName);
FieldNameValidator noOpValidator = new NoOpFieldNameValidator();
DocumentCodec payloadDecoder = new DocumentCodec();
Constructor<CodecProvider> providerConstructor = (Constructor<CodecProvider>) Class.forName("com.mongodb.operation.CommandResultCodecProvider").getDeclaredConstructor(Decoder.class, List.class);
providerConstructor.setAccessible(true);
CodecProvider firstBatchProvider = providerConstructor.newInstance(payloadDecoder, Collections.singletonList("firstBatch"));
CodecProvider nextBatchProvider = providerConstructor.newInstance(payloadDecoder, Collections.singletonList("nextBatch"));
Codec<BsonDocument> firstBatchCodec = fromProviders(Collections.singletonList(firstBatchProvider)).get(BsonDocument.class);
Codec<BsonDocument> nextBatchCodec = fromProviders(Collections.singletonList(nextBatchProvider)).get(BsonDocument.class);
ReadWriteBinding readBinding = new ClusterBinding(getCluster(), readPref, concern);
BsonDocument find = new BsonDocument("find", new BsonString(collectionName));
Connection conn = readBinding.getReadConnectionSource().getConnection();
BsonDocument results = conn.command(databaseName,find,noOpValidator,readPref,firstBatchCodec,readBinding.getReadConnectionSource().getSessionContext(), true, null, null);
BsonDocument cursor = results.getDocument("cursor");
long cursorId = cursor.getInt64("id").longValue();
BsonArray firstBatch = cursor.getArray("firstBatch");
然后cursorId 用于获取每个下一批。
在我看来,驱动程序实现的“问题”是 String 到 JSON 解码器被注入,但 JsonReader(decode() 方法所依赖的)没有注入。这种方式甚至可以达到com.mongodb.internal.connection.InternalStreamConnection,您已经在套接字通信附近。
因此,我认为你几乎没有什么可以改进MongoCollection.find() 除非你深入到InternalStreamConnection.sendAndReceiveAsync()
您无法减少往返次数,也无法更改将响应转换为 BsonDocument 的方式。并非没有绕过驱动程序并编写自己的客户端,我怀疑这是一个好主意。
P.D.如果您想尝试上面的一些代码,您需要 getCluster() 方法,该方法需要对mongo-java-driver 进行恶意破解。
private Cluster getCluster() {
Field cluster, delegate;
Cluster mongoCluster = null;
try {
delegate = mongoClient.getClass().getDeclaredField("delegate");
delegate.setAccessible(true);
Object clientDelegate = delegate.get(mongoClient);
cluster = clientDelegate.getClass().getDeclaredField("cluster");
cluster.setAccessible(true);
mongoCluster = (Cluster) cluster.get(clientDelegate);
} catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
System.err.println(e.getClass().getName()+" "+e.getMessage());
}
return mongoCluster;
}