【问题标题】:Cassandra Batch InvalidQueryException - Batch too largeCassandra Batch InvalidQueryException - 批处理太大
【发布时间】:2017-05-24 06:59:17
【问题描述】:

我正在使用 Batch 将数据插入 Cassandra。当我运行作业时,我遇到了异常。

caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large   
  at com.datastax.driver.core.Responses$Error.asException(Responses.java:136)   
  at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)     
  at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:184)    
  at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:43)    
  at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:798)   
  at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:617)    
  at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)  
  at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)   
  at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)     
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)   
  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)     
  at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)     
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)   
  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)

我已经阅读了很多关于这个问题的博客。但这没有帮助。我尝试在初始化时将“spark.cassandra.output.batch.size.bytes”设置为 spark conf。这仍然没有解决我的问题。我遇到了同样的错误。我的批处理有大约 1000 个插入语句。

请在下面找到我的代码。

CassandraConnector connector = CassandraConnector.apply(javaSparkContext.getConf());  
pairRDD.mapToPair(earnCalculatorKeyIterableTuple2 -> {  
            if (condition) {  
                do something......
            }  
            else {  
                Session session = connector.openSession();  
                BatchStatement batch = new   BatchStatement(BatchStatement.Type.UNLOGGED);            batch.setConsistencyLevel(ConsistencyLevel.valueOf(LOCAL_QUOROM));  
                PreparedStatement statement = session.prepare('my insert query');  
                for (condition) {  
                    if (!condition) {  
                        break;  
                    }  
                    Tuple2._2.forEach(s -> {  
                        if (!condition) {  
                            LOG.info(message);  
                        }  
                        else {  
                            BoundStatement boundStatement = statement.bind("bind variables");  
                            batch.add(boundStatement);  
                        }  
                    });  
                    session.execute(batch);  
                    batch.clear();  
                }  
                session.close();  
            }  
            return Tuple2;  
        });  
        return s;  
    }  

感谢任何帮助。

【问题讨论】:

  • 您真的在使用 Spark 吗?我问是因为您的跟踪似乎没有任何 Spark Cassandra 连接器级别,并且更改 batch.size.bytes 会更改插入中的语句数。
  • 是的,我正在使用 spark-cassandra 连接器。我试着给 batch.size.bytes = auto。仍然没有解决这个问题。
  • 你能提供一个代码示例吗?
  • 添加了代码。请看一下。

标签: apache-spark cassandra spark-cassandra-connector cassandra-3.0


【解决方案1】:

您正在手动创建批次并且您的批次太大。向每个批次添加更少的行。有很多方法可以手动执行此操作,但也许最简单的方法是添加一个计数器,每次添加 X 语句时提交一个批处理?

您更改的参数仅与saveToCassandra 完成的自动批处理有关。

【讨论】:

    猜你喜欢
    • 2017-05-10
    • 2021-08-20
    • 2020-09-09
    • 1970-01-01
    • 2019-12-21
    • 2016-08-05
    • 1970-01-01
    • 2017-03-07
    • 2016-06-29
    相关资源
    最近更新 更多