【发布时间】:2021-09-26 15:45:37
【问题描述】:
根据我的previous question 使用 Jedis 和线程。我将代码更改为使用JedisPool 而不是Jedis。但是仍然随着线程的增加程序卡住了。我试图增加.setMaxIdle(8000) 和.setMaxTotal(8000) 并临时修复它,但后来在其他运行中它在一些迭代后再次卡住。我猜是由于池中缺少连接(我关闭了它们),但似乎线程没有释放连接。
这是我的连接的更新版本:
import redis.clients.jedis.{JedisPool, JedisPoolConfig}
object redisOp{
@transient lazy val log: Logger = org.apache.log4j.LogManager.getLogger("myLogger")
def apply(set: RDD[Int]): Unit = {
val spark = SparkConstructor()
val sc = spark.sparkContext
// initialize Parents and Ranks key-values
val parents = set.map(i => ("p"+i, i.toString))
val ranks = set.map(i => ("r"+i, 1.toString))
sc.toRedisKV(parents) // using spark-redis packege here only, ignore it.
sc.toRedisKV(ranks)
log.warn("***Initialized Redis***")
}
val jedisConfig = new JedisPoolConfig() // Check from here (object's values and variables)
jedisConfig.setMaxIdle(8000) //TODO: a better configuration?
jedisConfig.setMaxTotal(8000)
lazy val pool = new JedisPool(jedisConfig, "localhost")
def find(u: Long): Option[Long] = { // returns leader of the set containing u
val r = pool.getResource
val res = Option(r.get(s"p$u")).flatMap(p => if (p.toLong == u) {
Some(u)
} else find(p.toLong))
r.close() // closing back to pool
res
}
// other methods are similar to find()...
}
【问题讨论】:
标签: scala apache-spark redis connection-pooling jedis