【问题标题】:Unable to manage redis connections properly in express无法在 express 中正确管理 redis 连接
【发布时间】:2021-05-25 08:59:26
【问题描述】:

我正在使用 redis 包,我遇到了很多连接问题,连接突然出现 ECONNREFUSED。

我怀疑是因为我做了错误的连接管理。

这个项目的问题是我的应用程序向 api(ip 和端口)发送参数,api 必须根据这些值创建一个连接,获取一些数据并返回它。 我有数百台服务器,所以我不知道如何管理所有这些连接。

到目前为止,我在一个连接中管理它。这就是为什么我认为它失败了。

目前看起来像这样..

let redisClient;

const killRedis = () => {
    redisClient.quit()
    console.log("redis client shut down")
}


const createRedisClient = async (port, url) => {
    redisClient = require('redis').createClient(port, url, {
        no_ready_check: true,
        db: 1
    })
    redisClient.on('error', function (err) {
        console.log('Error ' + err);
        killRedis();
        return undefined;
    });
    redisClient.on('connect', function () {
        console.log('Connected to Redis');
    });

    return redisClient;
}
module.exports = { createRedisClient, }

它有点工作,但最终会失败,不时拒绝连接。

我在路线中像下面这样使用它。

const scanAll = async (port, url) => {
    const redisClient = await createRedisClient(port, url)
    if (!redisClient) return 500
    const scan = promisify(redisClient.scan).bind(redisClient);
    const found = [];
    let cursor = '0';
    do {
        const reply = await scan(cursor, 'MATCH', '*');
        cursor = reply[0];
        found.push(...reply[1]);
    } while (cursor !== '0');
    return found;
};

/* Return all the users id */
router.post('/user/list', async function (req, res, next) {
    const data = await scanAll(req.body.port, req.body.ip);
    console.log("data ", data)
    if (data === 500) {
        res.status(500).json({
            error: "Error, server connection refused"
        })
    }
    else if (data.length === 0) {
        res.status(204).json(data)
    } else {
        res.status(200).json(data);
    }

})

如何进行正确的连接管理?

编辑:我的新尝试,但我认为在进行 2 次同时请愿时我的人脉已经泛滥

let connections = []


findConnection = (ip, port) => {
    let connection = connections.filter(i => i.ip == ip && i.port == port)
    console.log("pre")
    console.log(connection)
    if (connection && connection.connection) {
        console.log("opcion1: ", connection.ip)
        console.log("connection already exists")
        return connection[0].connection
    } else {
        console.log("opcion2")
        console.log(connections)
        connections.push({
            ip: ip,
            port: port,
            connection: require('redis').createClient(port, ip, {
                no_ready_check: true,
                db: 1
            })
        })
        return connections.filter(i => i.ip == ip && i.port == port)[0].connection
    }
}
const createRedisClient = async (port, url) => {
    let redisClient = findConnection(url, port)
    redisClient.on('error', function (err) {
        console.log('Error ' + err);
        redisClient.quit()
        return undefined;
    });
    redisClient.on('connect', function () {
        console.log('Connected to Redis');
    });

    return redisClient;
}
module.exports = { createRedisClient, }

我注意到我收到以下错误

MaxListenersExceededWarning:可能的 EventEmitter 内存泄漏 检测到。添加了 11 个错误侦听器。使用emitter.setMaxListeners() 增加限制

编辑:最后一个实现问题

My current implementation is the following

    let connections = []

const killRedis = (redisClient, ip, port) => {
    redisClient.quit()
    connections = connections.filter((i) => { return i.ip !== ip && i.port != port })
}

const subscribe = (redisClient, url, port) => {
    redisClient.on('error', function (err) {
        console.log('Error ' + err);
        killRedis(redisClient, url, port)
        return undefined;
    });
    redisClient.on('connect', function () {
        console.log('Connected to Redis');
        return redisClient;
    });
}

findConnection = (ip, port) => {
    let connection = connections.filter(i => i.ip == ip && i.port == port)

    if (connection && connection.length > 0) {
        subscribe(connection[0].connection)
        return connection[0].connection
    } else {
        connections.push({
            ip: ip,
            port: port,
            connection: require('redis').createClient(port, ip, {
                no_ready_check: true,
                db: 1
            })
        })
        subscribe(connections.filter(i => i.ip == ip && i.port == port)[0].connection, ip, port)
        return connections.filter(i => i.ip == ip && i.port == port)[0].connection
    }
}
const createRedisClient = async (port, url) => {
    let redisClient = findConnection(url, port)
    return redisClient
}
module.exports = { createRedisClient }

几乎可以用了,问题是我不知道如何处理错误

我不确定如何处理错误事件侦听器。如果失败我应该返回一个未定义的,但似乎它没有这样做,

【问题讨论】:

  • 你的findConnection函数在连接存在时不需要调用subscribe。使用完整代码查看我的新答案。

标签: node.js express redis node-redis


【解决方案1】:

您可以为现有连接构建一个缓存池并重复使用这些连接,而不是为每个新请求创建一个新的 Redis 连接。这样,您就不会超过每个 Redis 连接的事件侦听器的限制。 如果池中尚不存在连接,您只需创建一个连接即可。

PS - 为简单起见,我当前的缓存池实现为每对主机和端口创建一个连接并存储它们。如果需要,您可以在 Cache Pool 中实现 LRU 缓存以驱逐未使用的 Redis 连接。

这样您应该能够解决您的连接管理问题,因为您将创建一次连接并重复使用它们。

cache-pool.js

const redis = require('redis');
const Q = require('q');

class CachePool {
    constructor() {
        this.cachedClients = {};
        this.pendingCachedClients = {};
    }

    getConnection(host, port) {
        const deferred = Q.defer();
        const connectionId = CachePool.getConnectionId(host, port);
        if (this.cachedClients[connectionId]) {
            deferred.resolve(this.cachedClients[connectionId]);
        } else {
            this.cachedClients[connectionId] = redis.createClient({
                host,
                port,
            });
            this.cachedClients[connectionId].on('connect', (connection) => {
                deferred.resolve(this.cachedClients[connectionId]);
            });
            this.cachedClients[connectionId].on('error', (err) => {
                deferred.reject(err);
            });
        }
        return deferred.promise;
    }

    static getConnectionId(host, port) {
        return `${host}:${port}`;
    }
}

module.exports = new CachePool();

cache.js

const { promisify } = require('util');

const CachePool = require('./cache-pool');

class Cache {
    constructor(host, port) {
        this.host = host;
        this.port = port;
    }

    connect() {
        return CachePool.getConnection(this.host, this.port);
    }

    async scanAll() {
        const redisConnection = await this.connect();
        const scan = promisify(redisConnection.scan).bind(redisConnection);
        const found = [];
        let cursor = '0';
        do {
            const reply = await scan(cursor, 'MATCH', '*');
            cursor = reply[0];
            found.push(...reply[1]);
        } while (cursor !== '0');
        return found;
    }
}

module.exports = Cache;

test-sever.js

const express = require('express');

const Cache = require('./cache');

const app = express();

app.use(express.json({ type: '*/json' }));

const port = 3000;

/* Return all the users id */
app.post('/user/list', async function (req, res, next) {
    const redisClient = new Cache(req.body.ip, req.body.port);
    const data = await redisClient.scanAll();
    if (data === 500) {
        res.status(500).json({
            error: "Error, server connection refused"
        })
    }
    else if (data.length === 0) {
        res.status(204).json(data)
    } else {
        res.status(200).json(data);
    }

})

app.listen(port, () => {
  console.log(`Example app listening at http://localhost:${port}`)
});

【讨论】:

    【解决方案2】:

    单连接解决方​​案或缓存池解决方案都可用。这两种解决方案会遇到一些问题。

    • 单连接解决方​​案:ECONNREFUSED

    我猜是redis maxclient到达时触发的错误,因为你的单连接解决方​​案在处理每个请求时都没有关闭客户端。因此,当未达到 maxclient 限制但以 ECONNREFUSED 结束时,它有点工作。

    你可以尝试解决,在request返回前加killRedis即可。

    const scanAll = async (port, url) => {
        const redisClient = await createRedisClient(port, url)
        if (!redisClient) return 500
        const scan = promisify(redisClient.scan).bind(redisClient);
        const found = [];
        let cursor = '0';
        do {
            const reply = await scan(cursor, 'MATCH', '*');
            cursor = reply[0];
            found.push(...reply[1]);
        } while (cursor !== '0');
        killRedis(); // add code here.
        return found;
    };
    
    • 缓存池解决方案:MaxListenersExceededWarning

    参考答案MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 message lis teners added. Use emitter.setMaxListeners() to increase limit

    对于每个请求,您在createRedisClient 函数中运行两次redisClient.on(),您执行两次订阅,然后添加了两个新的侦听器。默认限制为 10。由于缺少取消订阅,它将以 MaxListenersExceededWarning 结束。解决方案是将所有redisClient.on代码从createRedisClient 函数移动到findConnection函数,创建连接时订阅两次,与用户请求无关。

    • 添加注释代码
    findConnection = (ip, port) => {
        let connection = connections.filter(i => i.ip == ip && i.port == port)
    
        if (connection && connection.length > 0) {
            // subscribe(connection[0].connection) // should be deleted
            return connection[0].connection
        } else {
            connections.push({
                ip: ip,
                port: port,
                connection: require('redis').createClient(port, ip, {
                    no_ready_check: true,
                    db: 1
                })
            })
            subscribe(connections.filter(i => i.ip == ip && i.port == port)[0].connection, ip, port)
            return connections.filter(i => i.ip == ip && i.port == port)[0].connection
        }
    }
    

    【讨论】:

    • 这样的工作。但我面临的问题是我无法处理错误。如果连接失败,我只会得到 ````UnhandledPromiseRejectionWarning: AbortError: Stream connection ends and command aborted.`` 错误监听器会触发它。即使连接失败,createRedisClient 中的 redisClient 变量也是用连接对象定义的。所以我不确定如何识别错误
    • 我可以尝试记录 console.log(redisClient._events.error) ,但它返回一个 [Function] 并且不知道该怎么做。并且事件监听器只是帮助在控制台中记录错误,但它不会返回 undefined 或类似的东西
    • 如果出现连接错误,可以添加connect retry_strategy来停止无限连接重试循环,参考npmjs.com/package/redis。然后,当您运行 promisify 扫描功能时会发出错误。您可以在代码const reply = await scan(cursor, 'MATCH', '*'); 中添加try catch 来捕获错误,然后处理错误并返回
    【解决方案3】:

    你可以使用这个 npm 包

    npm i connect-redis
    

    【讨论】:

      【解决方案4】:

      我的最终实现按预期工作

      let connections = []
      
      const killRedis = (redisClient, ip, port) => {
          redisClient.quit()
          connections = connections.filter((i) => { return i.ip !== ip && i.port != port })
      }
      
      const subscribe = (redisClient, url, port) => new Promise((resolve, reject) => {
          redisClient.on('error', function (err) {
              killRedis(redisClient, url, port)
              reject(err)
          });
          redisClient.on('connect', function () {
              resolve(redisClient)
          });
      })
      
      findConnection = async (ip, port) => {
          let connection = connections.filter(i => i.ip == ip && i.port == port)
      
          if (connection && connection.length > 0) {
              return connection[0].connection
          } else {
              try {
                  let newConnection = require('redis').createClient(port, ip, {
                      no_ready_check: true,
                      db: 1
                  })
                  const client = await subscribe(newConnection, ip, port)
                  connections.push({
                      ip: ip,
                      port: port,
                      connection: newConnection
                  })
                  return client
              } catch (error) {
                  return undefined
              }
          }
      }
      const createRedisClient = async (port, url) => {
          let redisClient = await findConnection(url, port)
          return redisClient
      }
      module.exports = { createRedisClient }
      

      【讨论】:

        猜你喜欢
        • 2015-04-04
        • 2019-02-23
        • 2020-07-31
        • 1970-01-01
        • 2021-06-18
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-08-15
        相关资源
        最近更新 更多