【发布时间】:2016-10-31 21:44:19
【问题描述】:
有没有办法将数据从 Lambda 函数推送到 Kinesis 流?我搜索了互联网,但没有找到任何与之相关的示例。
谢谢。
【问题讨论】:
标签: amazon-web-services aws-lambda amazon-kinesis
有没有办法将数据从 Lambda 函数推送到 Kinesis 流?我搜索了互联网,但没有找到任何与之相关的示例。
谢谢。
【问题讨论】:
标签: amazon-web-services aws-lambda amazon-kinesis
是的,您可以将信息从 Lambda 发送到 Kinesis Stream,这非常简单。确保您以正确的权限运行 Lambda。
const AWS = require('aws-sdk');
const kinesisConstant = require('./kinesisConstants'); //Keep it consistent
const kinesis = new AWS.Kinesis({
apiVersion: kinesisConstant.API_VERSION, //optional
//accessKeyId: '<you-can-use-this-to-run-it-locally>', //optional
//secretAccessKey: '<you-can-use-this-to-run-it-locally>', //optional
region: kinesisConstant.REGION
});
const savePayload = (payload) => {
//We can only save strings into the streams
if( typeof payload !== kinesisConstant.PAYLOAD_TYPE) {
try {
payload = JSON.stringify(payload);
} catch (e) {
console.log(e);
}
}
let params = {
Data: payload,
PartitionKey: kinesisConstant.PARTITION_KEY,
StreamName: kinesisConstant.STREAM_NAME
};
kinesis.putRecord(params, function(err, data) {
if (err) console.log(err, err.stack);
else console.log('Record added:',data);
});
};
exports.save = (payload) => {
const params = {
StreamName: kinesisConstant.STREAM_NAME,
};
kinesis.describeStream(params, function(err, data) {
if (err) console.log(err, err.stack);
else {
//Make sure stream is able to take new writes (ACTIVE or UPDATING are good)
if(data.StreamDescription.StreamStatus === kinesisConstant.STATE.ACTIVE
|| data.StreamDescription.StreamStatus === kinesisConstant.STATE.UPDATING ) {
savePayload(payload);
} else {
console.log(`Kinesis stream ${kinesisConstant.STREAM_NAME} is ${data.StreamDescription.StreamStatus}.`);
console.log(`Record Lost`, JSON.parse(payload));
}
}
});
};
module.exports = {
STATE: {
ACTIVE: 'ACTIVE',
UPDATING: 'UPDATING',
CREATING: 'CREATING',
DELETING: 'DELETING'
},
STREAM_NAME: '<your-stream-name>',
PARTITION_KEY: '<string-value-if-one-shard-anything-will-do',
PAYLOAD_TYPE: 'String',
REGION: '<the-region-where-you-have-lambda-and-kinesis>',
API_VERSION: '2013-12-02'
}
const kinesis = require('./kinesis');
exports.handler = (event, context, callback) => {
console.log('LOADING handler');
const done = (err, res) => callback(null, {
statusCode: err ? '400' : '200',
body: err || res,
headers: {
'Content-Type': 'application/json',
},
});
kinesis.save(event); // here we send it to the stream
done(null, event);
}
【讨论】:
这应该完全像在您的计算机上那样完成。
这是nodejs中的一个例子:
let aws = require('aws');
let kinesis = new aws.Kinesis();
// data that you'd like to send
let data_object = { "some": "properties" };
let data = JSON.stringify(data_object);
// push data to kinesis
const params = {
Data: data,
PartitionKey: "1",
StreamName: "stream name"
}
kinesis.putRecord(params, (err, data) => {
if (err) console.error(err);
else console.log("data sent");
}
请注意,这段代码不会工作,因为Lambda 对您的信息流没有任何权限。
通过Lambda访问AWS资源时,最好使用IAM角色;
Lambda时,可以选择现有/创建角色。IAM,然后转到角色,然后选择您分配给Lambda 函数的角色名称。putRecord、putRecords)。然后,测试Lambda。
【讨论】:
是的,这可以做到,我试图完成同样的事情,并且能够在 Lambda 中使用 Node.js 4.3 运行时做到这一点,并且它也适用于 6.10 版。
代码如下:
在您的 Lambda 函数顶部声明以下内容:
var AWS = require("aws-sdk");
var kinesis = new AWS.Kinesis();
function writeKinesis(rawdata){
data = JSON.stringify(rawdata);
params = {Data: data, PartitionKey: "<PARTITION_KEY>", StreamName: "<STREAM_NAME>"};
kinesis.putRecord(params, (err, data) => {
if (err) console.error(err);
else console.log("data sent");
});
}
现在,在exports.handler 中调用函数:
writeKinesis(<YOUR_DATA>);
需要注意的几点... Kinesis 要摄取数据,必须对其进行编码。在下面的示例中,我具有从 CloudWatch 获取日志并将它们发送到 Kinesis 流的函数。
请注意,我将 buffer.toString('utf8') 的内容插入到 writeKinesis 函数中:
exports.handler = function(input, context) {
...
var zippedInput = new Buffer(input.awslogs.data, 'base64');
zlib.gunzip(zippedInput, function(error, buffer) {
...
writeKinesis(buffer.toString('utf8'));
...
}
...
}
最后,在 IAM 中配置适当的权限。您的 Lambda 函数必须在包含以下以下权限的 IAM 角色的上下文中运行。就我而言,我只是修改了默认的 lambda_elasticsearch_execution 角色,以包含一个名为“lambda_kinesis_execution”的策略,代码如下:
"Effect": "Allow",
"Action": [
"kinesis:*"
],
"Resource": [
"<YOUR_STREAM_ARN>"
]
【讨论】: