【问题标题】:How to move columns from one keyspace to other in Cassandra如何在 Cassandra 中将列从一个键空间移动到另一个键空间
【发布时间】:2019-08-16 07:40:22
【问题描述】:

我可以将下面提到的一些列(cc_payment,keyid)从 Cassandra 键空间 billing 移动到其他 Cassandra payments 键空间吗? payment_info 将是一个新表。

有什么办法可以移动吗?还是我需要 COPY TO csv 文件并使用 COPY FROM 选项导入?由于数据巨大,我正在寻找直接从一个键空间移动到另一个键空间的选项。我们正在使用 datastax cassandra。

感谢您的帮助。

    FROM
========

keyspace:  billing
create table if not exists billing_info (
      user_id text,
      billing_id timeuuid,
      cc_payment frozen<cc_payment>,
      keyid text;
      PRIMARY KEY((user_id), billing_id)
) WITH CLUSTERING ORDER BY (billing_id DESC);

    TO
======
keyspace:  payments
create table if not exists payment_info (
      user_id text,
      payment_id timeuuid,
      cc_payment frozen<cc_payment>,
      keyid text;
      PRIMARY KEY((user_id), payment_id)
) WITH CLUSTERING ORDER BY (payment_id DESC);

【问题讨论】:

  • 如果记录超过 200 万,COPY 将不起作用。两个表中的主键也不同,因此理想情况下,如果没有任何手动工作,您将无法直接执行此操作。我认为您可能会编写代码来在现有行中添加新列。
  • @AnilKapoor 我之前使用 COPY 导出了超过 3 亿行。一切都是为了控制页面和批量大小。
  • @Aaron 你能提供一些细节吗?我有兴趣看到这个。
  • @AnilKapoor 我提到描述了这个答案的相关属性:stackoverflow.com/questions/41448374/…

标签: cassandra datastax-enterprise cassandra-3.0


【解决方案1】:

有多种方法可以做到这一点:

直接复制文件,然后更改表结构

由于表只有一个列名不同,直接复制文件会快很多,如下:

  • 创建一个与billing.billing_info具有完全相同结构的表payments.payment_info
  • 停止写信给billing.billing_info

然后在集群的每个节点上,执行以下操作:

  • 为它刷新:nodetool flush billing billing_info
  • 切换到 Cassandra 的数据目录
  • 在运行 Cassandra 的同一用户下将文件 billing/billing_info-&lt;ID_of_the_table&gt;/* 复制到 payments/payment_info-&lt;ID_of_the_table&gt;/
  • 执行nodetool refreshpayments.payment_info`
  • 在 cqlsh 中检查数据是否可用
  • 使用以下命令重命名列:ALTER TABLE payments.payment_info RENAME billing_id TO payment_id

通过复制来迁移数据,例如,使用 DSBulk 或 Spark。

如果您使用的是 DSE,那么您可以使用DSBulk(最好使用最新版本)从一个表中卸载数据并加载到另一个表中。通过将数据写入标准输出并通过 Unix 管道从标准输入读取数据,此命令可以在不创建中间副本的情况下工作,尽管在这种情况下它会变慢,因为它无法实现必要的并行性。

在最简单的情况下,它将被如下调用,提供更改的字段名称之间的映射(有关详细信息,请参阅文档0:

dsbulk unload -k ks1 -t table1 -c json | dsbulk load -k ks2 -t table2 -c json -m "mapping_to_accomodate_changes_in_field_names"

但是,如果您不仅需要复制数据,还需要复制其他内容(例如 TTL 和 WriteTime),那么任务将更加复杂 - 在这种情况下,您需要显式导出它,然后加载数据分几批,每列分开。

【讨论】:

    【解决方案2】:

    Spark 你可以用这个小sn-p。你可以在 updateColumns 中做你需要做的事情

    val myKeyspace = "oldkeyspace" 
    val myTable = "oldtable"
    val newKeyspace = "newkeyspace" 
    val newTable = "newtabl"
    
    def updateColumns(row: CassandraRow): CassandraRow = { 
         val inputMap = row.toMap val newData = Map( "newColumn" -> "somevalue" ) 
         var outputMap = inputMap ++ newData CassandraRow.fromMap(outputMap) 
    }
    
    val result = sc.cassandraTable(myKeyspace, myTable) .map(updateColumns(_)) 
      .saveToCassandra(newKeyspace, newTable)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-01-01
      • 2017-03-22
      • 2021-10-12
      • 2021-04-12
      • 2019-12-30
      • 1970-01-01
      • 2013-08-22
      相关资源
      最近更新 更多