【问题标题】:How to insert data into a 4 node cassandra cluster using datastax cassandra-core-api?如何使用 datastax cassandra-core-api 将数据插入 4 节点 cassandra 集群?
【发布时间】:2015-10-17 07:26:15
【问题描述】:

我有一个单节点 (DataStax) Casandra 集群,我必须从文件中插入大约 10gb 的数据。我编写了一个 java 程序来读取文件并将数据存储如下:

 import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.Date;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;

 public class Xb {

//cluster and session for cassandra connection
private static Cluster cluster;
private static Session session;

//variables for storing file elements
private static String taxid;
private static String geneid;
private static String status;
private static String rna_version;
private static String rna_gi;

private static String protein_version;
private static String protein_gi;
private static String gen_nuc_ver;

private static String gen_nuc_gi;
private static String start_gen_acc;
private static String end_gen_acc;

private static String orientation;
private static String assembly;

     private static String mature_ver;

     private static String mature_gi;

     private static String symbol;

    //Connecting the cassandra node(local host)
    public static Cluster connect(String node){
    return Cluster.builder().addContactPoint(node).build();
   }
    public static void main(String[] args) {
    private static String symbol;
    long lStartTime = new Date().getTime();
    // TODO Auto-generated method stub
    //call connect by passing localhost 
    cluster =connect("localhost");
    session = cluster.connect();
    //session.execute("CREATE KEYSPACE test1 WITH REPLICATION =" +"{'class':'SimpleStrategy','replication_factor':3}");
    //session.createtable('genomics');
    //use test1 : triggers the use of test1 keyspace
    session.execute("USE test1");
    //for counting the lines in the file
    int lineCount=0;

    try
    {
        //Reading the file
        FileReader fr = new FileReader("/home/syedammar/gene2refseq/gene2refseq");
        BufferedReader bf = new BufferedReader(fr);
        String line;
        //iterating over each line in file
        while((line= bf.readLine())!=null){
                lineCount++;
                //splitting the line based on tab spaces
                String[] a =line.split("\\s+");
                System.out.println("Line Count now is ->"+lineCount);
                //System.out.println("This is content"+line+" OVER HERE");
                /*for(int i =0;i<a.length;i++){
                System.out.println(i+"->"+a[i]);
              }*/
                //assigning the values to the corresponding variables
                taxid =a[0];
                geneid=a[1];
                status=a[2];
                rna_version=a[3];
                rna_gi=a[4];
                protein_version=a[5];
                protein_gi=a[6]; 
                gen_nuc_ver=a[7];
                gen_nuc_gi=a[8];
                start_gen_acc=a[9];
                end_gen_acc=a[10];
                orientation=a[11];
                assembly=a[12];
                mature_ver=a[13];
                mature_gi=a[14];
                symbol=a[15];

            //Writing the insert query
            PreparedStatement statement = session.prepare(
            "INSERT INTO test.genomics " +
            "(taxid, " +
            "geneid, " +
            "status, " +
            "rna_version, " +
            "rna_gi, " +
            "protein_version, " +
            "protein_gi, " +
            "gen_nuc_ver, " +
            "gen_nuc_gi, " +
            "start_gen_acc, " +
            "end_gen_acc, " +
            "orientation, " +
            "assembly, " +
            "mature_ver, " +
            "mature_gi," +
            "symbol" + 
            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"); 

            //create the bound statement and initialise it with your prepared statement
            BoundStatement boundStatement = new BoundStatement(statement); 

            session.execute( // this is where the query is executed
            boundStatement.bind( // here you are binding the 'boundStatement'
            taxid,geneid,status,rna_version,rna_gi,protein_version,protein_gi,gen_nuc_ver,gen_nuc_gi,start_gen_acc,end_gen_acc,orientation,assembly,mature_ver,mature_gi,symbol));
    }//end of while
} //end of try
    catch(IOException e){
        e.printStackTrace();
    }   
        long lEndTime = new Date().getTime(); 
        long difference = lEndTime - lStartTime;
        int seconds = (int) (difference / 1000) % 60 ; //converting milliseconds to seconds
        System.out.println("Elapsed seconds: " + seconds);
        System.out.println("No of lines read are :"+ lineCount);
        System.out.println("Record's entered into cassandra successfully");

        session.close();
        cluster.close();http://stackoverflow.com/editing-help

    }//end of m}// end of class

这很好,我得到了存储在 Cassandra 中的记录。

现在我已经建立了一个 4 节点 Cassandra 集群,我想做同样的任务,读取同一个文件并将其内容存储到 4 节点集群中。

我的问题是我将如何做到这一点,我需要将这个程序提供给哪个节点。我该如何处理?

我的问题是如何与 4 节点集群建立连接,我必须在上面的代码中进行哪些更改。好像这部分会有一些变化

 public static Cluster connect(String node){
    return Cluster.builder().addContactPoint(node).build();
} 

会发生什么变化,N 我将该程序提供给哪个节点?我不清楚它会如何发生。还请告诉我,将整个数据插入 4 节点集群所需的时间是否与单节点相同,还是会更快。

谢谢

【问题讨论】:

    标签: java datastax datastax-java-driver cassandra-2.1 nosql


    【解决方案1】:

    有关如何使用 DataStax java 驱动程序以最佳方式将数据加载到 Cassandra 的一个很好的示例(参考程序),请查看 Brian Hess's Cassandra-loader

    我需要为这个程序提供哪个节点

    所有 cassandra 节点都是平等的,它们都可以写入。但是,司机会为您处理这件事。只需给它一些节点作为端点,当它建立连接时,它就会知道存在哪些节点。它还将知道哪些节点拥有哪些数据并相应地执行写入。

    在 4 中插入整个数据需要相同的时间吗 节点集群与单个节点一样还是会更快。

    考虑到复制因素后,您的集群将随着您添加节点而线性扩展。因此,您将能够线性增加吞吐量。即,如果 3 个节点 RF3 可以进行 X 次写入,则具有 RF3 的 6 个节点可以进行 ~2X 次写入。

    【讨论】:

    • 谢谢!!我还有另一个问题。我在 3 节点集群中插入了大约 1700 万列,n 时查询 -"select count() from keyspace.table name";它给出了一个操作超时错误,好像我从 keyspace.tablename 限制 50000 查询“select count()”它可以工作。你能告诉我如何解决这个超时错误吗?此外,我对 Cassandra 中的计数器表还有另一个疑问,它与非列表有什么不同,它的局限性是什么
    • 我希望这不是您打算在生产中使用的查询。您的数据存储在多台机器上,并且您不会在全表扫描中获得良好的响应时间。如果这只是为了测试,您可以加快读取超时。
    • 抱歉,上面的帖子是 rows ,而不是 columns 。
    • 正如我之前所问的那样“在 3-4 节点集群中插入数据(大约 1700 万条记录)所花费的时间是否与在单个节点中所花费的时间相同,我需要更清楚地说明这一点. 我花了大约 4 个多小时才将数据加载到单个节点中,而当我尝试使用 3 个节点的集群时,RF=3 它花费了两倍以上的时间,是这样还是我做错事了。
    • 相同的硬件?你的射频是多少?
    猜你喜欢
    • 2020-08-19
    • 1970-01-01
    • 2014-08-28
    • 2012-11-29
    • 1970-01-01
    • 2016-08-14
    • 2012-05-02
    • 2014-11-15
    • 2015-12-24
    相关资源
    最近更新 更多