【问题标题】:MongoDB Batch read implementation issue with change stream replica set更改流副本集的 MongoDB 批量读取实现问题
【发布时间】:2020-02-05 13:02:58
【问题描述】:

问题:
推理生成过程每秒将大约 300 个推理数据写入 MongoDB 集合。另一个进程利用 MongoDB 的更改流特性来回读这些推论并进行后处理。目前,调用变更流函数 API (mongoc_change_stream_next()) 时只返回单个推理数据。因此,总共需要 300 次这样的调用才能在 1 秒内获取存储的所有推理数据。但是,每次读取后,需要大约 50 毫秒的时间来执行单个/多个推理数据的后处理。由于采用单一数据返回模型,引入了 15 倍的有效延迟。为了解决这个问题,我们正在尝试实现一个与 MongoDB 的更改流功能一致的批量读取机制。我们尝试了各种选项来实现相同的功能,但在每次更改流 API 调用后仍然只获得一个数据。有没有办法解决这个问题?

平台:
操作系统:Ubuntu 16.04
Mongo-c 驱动程序:1.15.1
Mongo 服务器:4.0.12

已尝试的选项:
将光标的批量大小设置为大于 1。

int main(void) {
    const char *uri_string = "mongodb://localhost:27017/replicaSet=set0";
    mongoc_change_stream_t *stream;
    mongoc_collection_t *coll;
    bson_error_t error;
        mongoc_uri_t *uri;
    mongoc_client_t *client;

    /*
    * Add the Mongo DB blocking read and scall the inference parse function with the Json
                 * */
    uri = mongoc_uri_new_with_error (uri_string, &error);
    if (!uri) {
        fprintf (stderr,
        "failed to parse URI: %s\n"
        "error message:       %s\n",
        uri_string,
        error.message);
        return -1;
    }

    client = mongoc_client_new_from_uri (uri);
    if (!client) {
        return -1;
    }

    coll = mongoc_client_get_collection (client,  <DB-NAME>, <collection-name>);
    stream = mongoc_collection_watch (coll, &empty, NULL);
    mongoc_cursor_set_batch_size(stream->cursor, 20);
    while (1){
        while (mongoc_change_stream_next (stream, &doc)) {
            char *as_json = bson_as_relaxed_extended_json (doc, NULL); 
            ............
            ............
            //post processing consuming 50 ms of time
            ............
            ............
        }
        if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
            if (!bson_empty (err_doc)) {
                fprintf (stderr,
                "Server Error: %s\n",
                bson_as_relaxed_extended_json (err_doc, NULL));
            } else {
                fprintf (stderr, "Client Error: %s\n", error.message);
            }
            break;
        }
    }
    return 0;
}

【问题讨论】:

    标签: mongodb mongodb-replica-set changestream mongodb-replica


    【解决方案1】:

    目前,更改时仅返回单个推理数据 调用流函数API(mongoc_change_stream_next())

    从技术上讲,并不是返回单个文档。这是因为mongoc_change_stream_next() 迭代底层游标,将每个bson 设置为下一个文档。因此,即使返回的批量大小超过 1,它仍然需要对每个文档进行迭代。

    你可以试试:

    • 创建单独的线程来并行处理文档,因此您不必为每个文档等待 50 毫秒或累积等待 15 秒。

    • 循环遍历一批文档,即50个缓存然后执行批处理

    • 在单独的线程上对它们进行批处理(以上两者的组合)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2013-10-22
      • 1970-01-01
      • 2023-03-13
      • 1970-01-01
      • 2021-07-20
      • 2022-01-01
      • 1970-01-01
      相关资源
      最近更新 更多