【发布时间】:2019-11-13 07:47:12
【问题描述】:
我在 Python 中有一个 lambda 函数,它可以生成一些东西并返回一些需要插入到 Redshift 中的值。在 lambda 中,我将值推送到 Kinesis,后者将它们复制到 S3 中,然后复制到 Redshift 中。
lambda 中的值是以字符串形式获取的,如下所示:
final_string = 'a;b;d;c'
每个字母是 Redshift 表中不同列的值,因此分隔符是“;”。然后我将数据推送到 Kinesis Stream:
put_response = kinesis_client.put_record(StreamName = 'PixelTrack',
Data=json.dumps(final_string),
PartitionKey='first')
然后,Kinesis 流向 Kinesis Firehose 流提供数据。使用 Kinesis Firehose 在 S3 中生成的文件类似于(包括文件中的引号):
"a;b;c;d;c"
最后,我使用以下语句(在 Kinesis firehose 中配置)将数据复制到 redshift:
copy table
from blabla
BLANKSASNULL
DELIMITER ';'
EMPTYASNULL
NULL AS 'null'
ESCAPE
FILLRECORD;
当 Kinesis 中仅缓冲一个结果时,我已设法使其工作并获取 Redshift 中的值(不过,在 Redshift 中创建了一个新列)。因此,当在缓冲时间内只执行了一个 lambda 时,Redshift 表如下所示:
A B C D no_info_column
"a b c d" <null>
当我多次执行 lambda 时出现问题,因为我在 S3 中得到一个文件,其中包含以下文本:
"a,b,c,d" "a1,b1,c1,d1"
我在 Redshift 中收到错误 Extra column(s) found,因为复制语句无法找到行分隔符。
我尝试了以下方法但没有成功:
- 在 lambda 中返回字符串
- 搜索如何在副本中设置行分隔符 (SO question)
- 将列表转换为 json 而不是字符串。然后我在打开列表时遇到了括号问题
- 在复制语句中使用 REMOVEQUOTES
我最初的问题是:“如何从 s3 复制到 redshift,不同的行用双引号分隔”,但问题可能出在我的第一种方法中,所以我决定稍微提一下这个问题更广泛一点。
那么,我该如何解决呢?
【问题讨论】:
-
为什么不去掉 kinesis 直接从你的 lambda 插入 Redshift 呢?您可以在 python 中以编程方式轻松连接到 Redshift,这将为您节省大量时间和金钱
-
您使用的是 Kinesis Streams 还是 Kinesis Firehose?如果您要发送到 Redshift,那么更简单的方法是使用 Kinesis Firehose,因为它会为您处理整个过程,自动缓冲并将内容发送到 Redshift。
-
我创建了一个 Kinesis Stream,它将信息发送到 Kinesis Firehose(也许这就是问题所在?)。我不是直接从 lambda 插入,因为我问过并且被告知插入语句效率非常低,我们谈论的是每天需要插入 400 万行,这最终可能会大大降低数据库速度
-
@John Rotenstein 你给了我使用 Kinesis Firehose 而不是两者的想法,所以如果你愿意,你可以回答这个问题
标签: python amazon-web-services amazon-s3 aws-lambda amazon-redshift