【问题标题】:Insert/Update R data.table into PostgreSQL table将 R data.table 插入/更新到 PostgreSQL 表中
【发布时间】:2019-08-12 14:48:16
【问题描述】:

我有一个PostgreSQL 数据库,其中已经定义了一个表和列。表的主键是(Id, datetime) 列的组合。我需要定期将来自 R data.table 的不同 ID 的数据插入到数据库中。但是,如果特定(Id, datetime) 组合的数据已经存在,则应该更新(覆盖)它。如何使用 RPostgresRPostgreSQL 包做到这一点?

当我尝试插入一个 data.table 已经存在一些 (Id, datetime) 行时,我收到一条错误消息,指出违反了主键约束:

dbWriteTable(con, table, dt, append = TRUE, row.names = FALSE)

Error in connection_copy_data(conn@ptr, sql, value) : 
  COPY returned error: ERROR:  duplicate key value violates unique constraint "interval_data_pkey"
DETAIL:  Key (id, dttm_utc)=(a0za000000CSdLoAAL, 2018-10-01 05:15:00+00) already exists.
CONTEXT:  COPY interval_data, line 1

【问题讨论】:

  • 问题太宽泛了。请阅读教程/小插曲/书籍并认真尝试。然后,返回有关您的实施的具体问题。
  • 插入到一个新的(或重用但被截断的)表中,并将插入合并到目标表中。抱歉,这是一个纯 SQL 问题,而不是 R 问题...
  • @RYoda 你的意思是创建一个新的临时表,然后将其与目标表合并?您能否指出有关如何执行您提到的“合并插入”操作的资源?
  • 请参阅merge stmt PostgreSQL 文档:postgresql.org/message-id/attachment/23520/sql-merge.html
  • 我在尝试 MERGE 语句时遇到语法错误,这里的答案似乎表明 PostgreSQL 中没有 MERGE 语句? stackoverflow.com/questions/49368083/…

标签: r data.table rpostgresql


【解决方案1】:

您可以使用我的具有 upsert 功能的 pg 包,或者从那里获取 upsert 代码:https://github.com/jangorecki/pg/blob/master/R/pg.R#L249 基本上就是其他人在cmets中所说的。将数据写入临时表,然后使用on conflict 子句插入到目标表中。

pgSendUpsert = function(stage_name, name, conflict_by, on_conflict = "DO NOTHING", techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.character(on_conflict), length(on_conflict)==1L)
    cols = pgListFields(stage_name)
    cols = setdiff(cols, c("run_id","r_timestamp")) # remove techstamp to have clean column list, as the fresh one will be used, if any
    # sql
    insert_into = sprintf("INSERT INTO %s.%s (%s)", name[1L], name[2L], paste(if(techstamp) c(cols, c("run_id","r_timestamp")) else cols, collapse=", "))
    select = sprintf("SELECT %s", paste(cols, collapse=", "))
    if(techstamp) select = sprintf("%s, %s::INTEGER run_id, '%s'::TIMESTAMPTZ r_timestamp", select, get_run_id(), format(Sys.time(), "%Y-%m-%d %H:%M:%OS"))
    from = sprintf("FROM %s.%s", stage_name[1L], stage_name[2L])
    if(!missing(conflict_by)) on_conflict = paste(paste0("(",paste(conflict_by, collapse=", "),")"), on_conflict)
    on_conflict = paste("ON CONFLICT",on_conflict)
    sql = paste0(paste(insert_into, select, from, on_conflict), ";")
    pgSendQuery(sql, conn = conn, .log = .log)
}

#' @rdname pg
pgUpsertTable = function(name, value, conflict_by, on_conflict = "DO NOTHING", stage_name, techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.character(on_conflict), length(on_conflict)==1L)
    name = schema_table(name)
    if(!missing(stage_name)){
        stage_name = schema_table(stage_name)
        drop_stage = FALSE
    } else {
        stage_name = name
        stage_name[2L] = paste("tmp", stage_name[2L], sep="_")
        drop_stage = TRUE
    }
    if(pgExistsTable(stage_name)) pgTruncateTable(name = stage_name, conn = conn, .log = .log)
    pgWriteTable(name = stage_name, value = value, techstamp = techstamp, conn = conn, .log = .log)
    on.exit(if(drop_stage) pgDropTable(stage_name, conn = conn, .log = .log))
    pgSendUpsert(stage_name = stage_name, name = name, conflict_by = conflict_by, on_conflict = on_conflict, techstamp = techstamp, conn = conn, .log = .log)
}

【讨论】:

    猜你喜欢
    • 2022-01-26
    • 2016-02-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-10-17
    • 1970-01-01
    相关资源
    最近更新 更多