【问题标题】:Prevent to create object twice防止两次创建对象
【发布时间】:2017-12-27 21:06:30
【问题描述】:

我有以下代码:

  type Request = EitherT[IO, Throwable, KkProducerRecordMetadata]

  def create(producer: => KkProducerCreator)
  : IO[Producer[String, String]]
  = IO {
    try {
      new KafkaProducer[String, String](properties(producer))
    } catch {
      case e: InstanceAlreadyExistsException => ???
    }

  }

  def send(producer: => IO[Producer[String, String]])
          (record: => KkProducerRecord)
  : Request = EitherT(for {
    p <- producer
    m <- IO {
      try {
        //If MaxBlockMs is not set,
        //then after 60s it will throw an exception
        val pr = new ProducerRecord[String, String](record.topic, record.key, record.value)
        val meta = p.send(pr).get()
        p.flush()

        Right(KkProducerRecordMetadata(meta.hasOffset,
          meta.hasTimestamp,
          meta.offset,
          meta.partition,
          meta.timestamp,
          meta.topic))
      } catch {
        case e: Exception => Left(e)
      }
    }
  } yield m)

  def close(producer: => IO[Producer[String, String]])
  : IO[Unit]
  = producer.map { p =>
    p.flush()
    p.close()
  }  

函数create创建一个kafka生产者并包装到IO中,因为它可能产生副作用。

函数send 会将消息发送到kafka,正如您在第一个参数中看到的那样,它需要producer

我使用的功能如下:

  //Creates a producer
  private val pSignIn: IO[Producer[String, String]] =
    KkProducer.create(KkProducerCreator(sys.env.get("KAFKA_SERVER").get,
      "AUTH-SIGNIN-PRODUCER",
      List(MaxBlockMsConfig(4000))))

并向kafka发送消息:

KkProducer.send(pSignIn)(KkProducerRecord(AuthTopology.SignInReqTopic,
        AuthTopology.SignInKey, a))  

如您所见,每次调用 send 函数时,都会创建一个新的 producer 实例,然后我得到一个异常:

javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=AUTH-SIGNIN-PRODUCER  

如何防止生产者以功能方式创建两次? 我正在考虑创建一个生产者地图并将 id 作为键?
会是StateT 的解决方案吗?

【问题讨论】:

    标签: scala functional-programming kafka-producer-api


    【解决方案1】:

    确保您只创建一次生产者。将生产者实例分配给vallazy val,然后将 val 传递给函数。

    sendclose函数的参数由名称调用改为普通参数(按值调用)。

    通过为每个 send 调用创建生产者,每次通过变量名称引用时都会评估按名称调用参数。

    像下面这样声明你send

    def send(producer: IO[Producer[String, String]])(record: => KkProducerRecord)
    

    同样适用于close

    例子:

    def foo(code: => Int) = code + code + 1
    
    def bar(code: Int) = code + code + 1
    

    Scala REPL

    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    def foo(code: => Int) = code + code + 1
    
    def bar(code: Int) = code + code + 1
    
    
    // Exiting paste mode, now interpreting.
    
    foo: (code: => Int)Int
    bar: (code: Int)Int
    
    scala> foo({println("evaluated"); 1})
    evaluated
    evaluated
    res1: Int = 3
    
    scala> bar({println("evaluated"); 1})
    evaluated
    res2: Int = 3
    

    注意:如果foo println 块被评估两次,不像 bar。

    【讨论】:

    • 我以为Call by name 只会评估一次。不是偷懒评价?
    • @zero_coding 传递给按名称调用变量的代码将保持未评估状态,并且每次调用变量名称时都会进行评估
    • 不起作用,仍然得到例外:javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=AUTH-SIGNIN-PRODUCER
    • @zero_coding 按名称调用和lazy val 是两个不同的东西
    • @zero_coding 你不止一次地调用生产者创建两个。检查你的代码。或粘贴你的新代码
    猜你喜欢
    • 1970-01-01
    • 2014-06-01
    • 1970-01-01
    • 2020-03-05
    • 2014-08-13
    • 1970-01-01
    • 2018-09-29
    • 1970-01-01
    • 2010-09-05
    相关资源
    最近更新 更多