【问题标题】:Inserting into Cassandra with Akka Streams使用 Akka Streams 插入 Cassandra
【发布时间】:2016-09-07 11:29:18
【问题描述】:

我正在学习 Akka Streams,作为练习,我想将日志插入 Cassandra。问题是我无法让流将日志插入数据库。

我天真地尝试了以下方法:

object Application extends AkkaApp with LogApacheDao {

  // The log file is read line by line
  val source: Source[String, Unit] = Source.fromIterator(() => scala.io.Source.fromFile(filename).getLines())

  // Each line is converted to an ApacheLog object
  val flow: Flow[String, ApacheLog, Unit] = Flow[String]
    .map(rawLine => {
      rawLine.split(",") // implicit conversion Array[String] -> ApacheLog
    })

  // Log objects are inserted to Cassandra
  val sink: Sink[ApacheLog, Future[Unit]] = Sink.foreach[ApacheLog] { log => saveLog(log) }

  source.via(flow).to(sink).run()

}

saveLog() 在 LogApacheDao 中是这样定义的(为了更清晰的代码,我省略了列值):

val session = cluster.connect()

session.execute(s"CREATE KEYSPACE IF NOT EXISTS $keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};")

session.execute(s"DROP TABLE IF EXISTS $keyspace.$table;")

session.execute(s"CREATE TABLE $keyspace.$table (...)")

val preparedStatement = session.prepare(s"INSERT INTO $keyspace.$table (...) VALUES (...);")

def saveLog(logEntry: ApacheLog) = {
    val stmt = preparedStatement.bind(...)

    session.executeAsync(stmt)
  }

进入接收器时从 Array[String] 到 ApacheLog 的转换没有问题(使用 println 验证)。还有,keyspace和table都创建好了,但是执行到saveLog的时候,好像有什么东西被阻塞了,没有插入。

我没有收到任何错误,但 Cassandra 驱动程序核心 (3.0.0) 不断给我:

Connection[/172.17.0.2:9042-1, inFlight=0, closed=false] was inactive for 30 seconds, sending heartbeat
Connection[/172.17.0.2:9042-2, inFlight=0, closed=false] heartbeat query succeeded

我应该补充一点,我使用的是 dockerized Cassandra。

【问题讨论】:

  • 我只阅读了有关 akka 流的信息,所以这只是一个猜测,您确定 saveLog 没有抛出被吞下的异常吗?可以肯定的是,我会在异常上打印/捕捉它。这些 cassandra 日志显示连接在 30 秒内处于非活动状态,然后发送心跳以保持其打开。
  • 我将尝试从 saveLog 中捕获最终的异常,感谢您的建议
  • 即使在处理异常时(使用 onComplete(...)),我也无法收到任何错误消息。在我看来,执行似乎无法执行bind() 方法。

标签: scala cassandra akka-stream


【解决方案1】:

【讨论】:

    猜你喜欢
    • 2018-02-06
    • 2019-04-13
    • 1970-01-01
    • 1970-01-01
    • 2018-02-22
    • 1970-01-01
    • 1970-01-01
    • 2022-01-10
    • 1970-01-01
    相关资源
    最近更新 更多