【问题标题】:Bulk insert/update using R to mongoDB使用 R 批量插入/更新到 mongoDB
【发布时间】:2016-03-24 19:10:23
【问题描述】:

我正在尝试使用 R 将大量数据(数百万个文档)从我将在不同时间获得的各种数据帧中插入到 mongodb 中。

每个数据框都将具有相同的主 ID,但可以具有相同或不同的属性。

如果记录存在,我想添加任何新属性并附加任何现有属性。如果记录不存在,我想创建它。

这在 R 中是否可能有效?我曾尝试使用美妙的 mongolite 包,但由于存在重复记录,插入选项失败。

非常感谢任何指针。

谢谢

伊恩

id<-LETTERS[1:5]
value1<-paste0("value1_",letters[1:5])
value2<-paste0("value2_",letters[1:5])
value3<-paste0("additional_value_1",letters[1:5])
df1<-as.data.frame(cbind(id,value1))
df2<-as.data.frame(cbind(id,value2))
df3<-as.data.frame(cbind(id,value3))

colnames(df1)<-c('_id','value1')
colnames(df2)<-c('_id','value2')
colnames(df3)<-c('_id','value1')

desired_value1<-paste0( "[",paste(paste0("'",value1,"'"),paste0("'",value3,"'"),sep=","),"]")
df4<-cbind(id,desired_value1,value2)
df4<-as.data.frame(cbind(id,desired_value1,value2))
colnames(df4)<-c("_id","value1","value2")

【问题讨论】:

  • 抱歉,现在应该修复了。它是值1和值3的串联。df4显示了按顺序插入df1、df2和df3后应该在mongodb中的值。 df1 和 df3 都包含数据 value1,因此应该附加在一起而不是覆盖。

标签: r mongodb


【解决方案1】:

注意

这个答案使用library(rmongodb),Cran 不再支持它。


这个答案将部分取决于您如何获得“新”data.frames。在不知道您的数据设置和数据大小的情况下,我也无法回答 有效 部分,但希望这可以帮助您入门。另外,我发现从 R 向 mongo 插入/检索数百万条记录非常慢。

这样做的一种方法是,对于您获得的每个新 data.frame,将匹配的记录带回 R 并“加入/更新”它们,然后仅针对这些文档更新数据库,同时使用update / upsert查询。

我的大部分 r-mongodb 工作也使用library(rmongodb)

稍微修改您的数据以使用id 而不是_id

id<-LETTERS[1:5]
value1<-paste0("value1_",letters[1:5])
value2<-paste0("value2_",letters[1:5])
value3<-paste0("additional_value_1",letters[1:5])
df1<-as.data.frame(cbind(id,value1), stringsAsFactors = F)  ## removed factor levels
df2<-as.data.frame(cbind(id,value2), stringsAsFactors = F)
df3<-as.data.frame(cbind(id,value3), stringsAsFactors = F)

colnames(df1)<-c('id','value1')
colnames(df2)<-c('id','value2')
colnames(df3)<-c('id','value1')

desired_value1<-paste0( "[",paste(paste0("'",value1,"'"),paste0("'",value3,"'"),sep=","),"]")
df4<-cbind(id,desired_value1,value2)
df4<-as.data.frame(cbind(id,desired_value1,value2))
colnames(df4)<-c("_id","value1","value2")

第一步是插入数据库

library(rmongodb)  ## my preferred r mongodb package 
library(jsonlite)  ## for viewing/checking results 
library(data.table) ## for fast rbind & data frame manipulation

mongo <- mongo.create()
mongo.is.connected(mongo)
# [1] TRUE

db <- "test"
coll <- "test"

bs <- mongo.bson.from.df(df1)
ns <- paste0(db, ".", coll)

## insert.batch - insert each 'row' of the df as a document
mongo.insert.batch(mongo = mongo, 
                   ns = ns, 
                   lst = bs)  
# [1] TRUE

检索所有文件以检查上传

f <- mongo.bson.from.list(list("_id" = 0))  ## to ignore the _id field
res <- mongo.find.all(mongo = mongo, ns = ns, fields = f)
toJSON(res, pretty=T)
# [
#   {
#     "id": ["A"],
#     "value1": ["value1_a"]
#   },
#   {
#     "id": ["B"],
#     "value1": ["value1_b"]
#   },
#   {
#     "id": ["C"],
#     "value1": ["value1_c"]
#   },
#   {
#     "id": ["D"],
#     "value1": ["value1_d"]
#   },
#   {
#     "id": ["E"],
#     "value1": ["value1_e"]
#   }
# ] 

现在,如果我们想将 df2$value2 添加到这些文档中,我们可以将它们带回 R 中进行操作,然后更新数据库

qry <- list("id" = list("$in" = df2$id))
## mongo shell query: db.test.find({"id" : { "$in" : ["A", "B", ..., ]}})
qry <- mongo.bson.from.list(qry)
f <- list("_id" = 0)
res <- mongo.find.all(mongo = mongo, 
                      ns = ns,
                      query = qry,
                      fields = f)

dt_res <- rbindlist(res)

## set our df2 to data.table, and join onto dt_res
setDT(df2)

## add a new row to df2, with a new id, to check the update.upsert works
df2 <- rbindlist(list(df2, data.table(id = "Z", value2 = "value2_z")))

dt_res <- dt_res[ df2, on="id"]  ## left join to keep our 'z' row
dt_res
#    id   value1   value2
# 1:  A value1_a value2_a
# 2:  B value1_b value2_b
# 3:  C value1_c value2_c
# 4:  D value1_d value2_d
# 5:  E value1_e value2_e
# 6:  Z       NA value2_z

我们现在可以使用 udpate and upsert 使用这些新值更新数据库

for(i in 1:nrow(dt_res)){

  crit <- mongo.bson.from.list(list("id" = dt_res[i, id]))
  d <- c(dt_res[i, ])
  mongo.update(mongo = mongo, 
               ns = ns, 
               criteria = crit, 
               objNew = d, 
               flags = c(mongo.update.upsert))    
}

通过返回所有内容检查文档网络是否已更新

f <- mongo.bson.from.list(list("_id" = 0))  ## to ignore the _id field
res <- mongo.find.all(mongo = mongo, ns = ns, fields = f)
toJSON(res, pretty=T)

# [
#   {
#     "id": ["A"],
#     "value1": ["value1_a"],
#     "value2": ["value2_a"]
#   },
#   {
#     "id": ["B"],
#     "value1": ["value1_b"],
#     "value2": ["value2_b"]
#   },
#   {
#     "id": ["C"],
#     "value1": ["value1_c"],
#     "value2": ["value2_c"]
#   },
#   {
#     "id": ["D"],
#     "value1": ["value1_d"],
#     "value2": ["value2_d"]
#   },
#   {
#     "id": ["E"],
#     "value1": ["value1_e"],
#     "value2": ["value2_e"]
#   },
#   {
#     "id": ["Z"],
#     "value1": {},
#     "value2": ["value2_z"]
#   }
# ] 

请注意,这包括我们的新 'z' id

【讨论】:

  • 非常有用。是否可以对 Mongo 进行批量写入而不是循环遍历数据框中的所有行?
  • @Iain - 目前我不确定如何在没有循环的情况下进行批量 update。我很想知道你是否找到一个!该解决方案在扩展到您的完整数据集时效果如何?
猜你喜欢
  • 1970-01-01
  • 2014-10-06
  • 1970-01-01
  • 1970-01-01
  • 2017-07-12
  • 1970-01-01
  • 2011-09-29
  • 2011-04-15
  • 2020-12-01
相关资源
最近更新 更多