【问题标题】:Python script to query the S3 CloudFront access logs from AWS Athena and import the select query to an S3 bucket用于从 AWS Athena 查询 S3 CloudFront 访问日志并将选择查询导入 S3 存储桶的 Python 脚本
【发布时间】:2020-11-25 07:51:39
【问题描述】:

我计划每天在 Lambda 函数上运行此脚本并获取 IP 地址列表并查找任何可疑 IP 地址并添加到 WAF 阻止规则。但我的问题出在脚本中,我可能需要手动更改表名称,因为它无法用 AWS Athena 上的相同现有表覆盖,并且需要更新当天的 S3 bucket_input。那么有没有办法自动化这个呢

#!/usr/bin/env python3
import boto3

#Function for executing athena queries
def run_query(query, database, s3_output):
    client = boto3.client('athena')
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': s3_accesslog1
            },
        ResultConfiguration={
            'OutputLocation': s3_output,
            }
        )
    print('Execution ID: ' + response['QueryExecutionId'])
    return response
    
#Athena configuration
s3_input = 's3://smathena/athenatest/'
s3_ouput = 's3://python-demo/Test-Athena/'
database = 's3_accesslog1'
table = 'Test_output'

#Athena database and table definition
create_database = "CREATE DATABASE IF NOT EXISTS %s;" % (database)
create_table = \
  """CREATE EXTERNAL TABLE IF NOT EXISTS %s.%s (
  `Date` DATE,
   Time STRING,
   Location STRING,
   SCBytes BIGINT,
   RequestIP STRING,
   Method STRING,
   Host STRING,
   Uri STRING,
   Status INT,
   Referrer STRING,
   UserAgent STRING,
   UriQS STRING,
   Cookie STRING,
   ResultType STRING,
   RequestId STRING,
   HostHeader STRING,
   Protocol STRING,
   CSBytes BIGINT,
   TimeTaken FLOAT,
   XForwardFor STRING,
   SSLProtocol STRING,
   SSLCipher STRING,
   ResponseResultType STRING,
   CSProtocolVersion STRING,
   FleStatus STRING,
   FleEncryptedFields INT,
   CPort INT,
   TimeToFirstByte FLOAT,
   XEdgeDetailedResult STRING,
   ScContent STRING,
   ScContentLen BIGINT,
   ScRangeStart BIGINT,
   ScRangeEnd BIGINT
   )
   ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
   LOCATION '%s'
   TBLPROPERTIES ('skip.header.line.count' = '2');""" % ( database, table, s3_input )

#Query definitions
query_1 = "SELECT requestip, count(*) FROM %s.%s group by requestip order by count(*) desc" % (database, table)
query_2 = "SELECT * FROM %s.%s where useragent = googlebot" % (database, table)

#Execute all queries
queries = [ create_database, create_table, query_1, query_2 ]
for q in queries:
   print("Executing query: %s" % (q))
   res = run_query(q, database, s3_ouput)

【问题讨论】:

  • 查看您的~/.aws/config 文件。第 7 行是否包含import boto3?如果是这样,那不应该在那里。
  • 是的,~/.aws/config 文件有 import boto3,我已经删除并执行了脚本,但现在我得到 Executing query: $ ./Athena_Script.py CREATE DATABASE IF NOT EXISTS s3_accesslog1;回溯(最后一次调用):文件“./Athena_Script.py”,第 75 行,在 res = run_query(q, database, s3_ouput) 文件“./Athena_Script.py”,第 10 行,在 run_query 'Database ': s3_accesslog1 NameError: name 's3_accesslog1' is not defined
  • 请编辑您的问题以显示当前的错误输出,而不是将其放在(难以阅读的)评论中。您可以删除您现在已修复的错误。
  • 谢谢,我已经编辑了问题并更新了错误。
  • 表示您使用的 IAM 用户无权使用 Athena。事实上,它表示权限明确拒绝访问。它还提到了一个名为primary 的工作组,所以也许您可以在命令中指定一个不同的工作组,它可能会起作用?或者,它可能在整个用户身上。

标签: python amazon-web-services amazon-s3 amazon-cloudfront


【解决方案1】:

我认为问题在于该命令正在运行:

QueryExecutionContext={
            'Database': s3_accesslog1
            },

但是,数据库并不存在——事实上,您正在尝试创建它!

因此,请尝试删除该条目并查看它是否有效。这意味着您需要在每个查询中指定数据库/模式,但这通常是避免歧义的好主意。

【讨论】:

  • 感谢约翰的回复。删除 QueryExecutionContext 后,我​​收到了上面更新的错误。
  • 当我的查询出现错误时,我通常会看到 no viable alternative 错误。要对其进行调试,请获取显示的查询并在 Amazon Athena 界面中手动运行它。我刚刚这样做了,似乎表名不能包含连字符。可以使用下划线代替。
猜你喜欢
  • 2019-08-26
  • 2020-09-19
  • 1970-01-01
  • 2017-07-04
  • 1970-01-01
  • 2019-04-13
  • 1970-01-01
  • 1970-01-01
  • 2022-11-11
相关资源
最近更新 更多