【问题标题】:Concatenate S3 files to read in EMR连接 S3 文件以在 EMR 中读取
【发布时间】:2013-05-02 23:06:35
【问题描述】:

我有一个 S3 存储桶,其中包含要连接的日志文件,然后用作 EMR 作业的输入。日志文件的路径如下:bucket-name/[date]/product/out/[hour]/[minute-based-file]。我想在所有日期目录中的所有小时目录中获取所有分钟日志,并将它们连接到一个文件中。我想将该文件用作 EMR 作业的输入。需要保留原始日志文件,新合并的日志文件可能会写入不同的 S3 存储桶。

我尝试通过 SSH 在 EMR 主节点上使用 hadoop fs -getmerge,但收到此错误:

This file system object (file:///) does not support access to the request path 's3://target-bucket-name/merged.log'

源 S3 存储桶中有一些其他文件,因此我不想包含其所有文件。通配符匹配如下所示:s3n://bucket-name/*/product/out/*/log.*

其目的是解决将数以万计/数十万个小 (10k-3mb) 输入文件输入 EMR 的问题,而是给它一个可以更有效地拆分的大文件。

【问题讨论】:

  • 我可以通过以下命令使用 Hadoop shell 成功获取这些文件的列表:hadoop fs -ls s3n://bucket-name/*/product/out/*/log*
  • 一种选择是创建一个新的 EMR 作业来为您进行连接。例如,将所有事件分组到日或小时存储桶中,然后将其写回 S3。不确定这会有多有效 - 但它肯定会留下一大群人来输入你的映射器。
  • 你可以像上面提到的@seedhead那样做,而不是将它写回s3,你可以将它写入hdfs并使用同一个集群来运行你的实际工作。

标签: hadoop amazon-web-services amazon-s3 elastic-map-reduce emr


【解决方案1】:

我最终只是编写了一个脚本,其中包含一些 Hadoop 文件系统命令来执行此操作。

#!/usr/bin/env ruby

require 'date'

# Merge minute-based log files into daily log files
# Usage: Run on EMR master (e.g. SSH to master then `ruby ~/merge-historical-logs.rb [FROM [TO]]`)

SOURCE_BUCKET_NAME      = 's3-logs-bucket'
DESTINATION_BUCKET_NAME = 's3-merged-logs-bucket'

# Optional date inputs
min_date = if ARGV[0]
  min_date_args = ARGV[0].split('-').map {|item| item.to_i}
  Date.new(*min_date_args)
else
  Date.new [2012, 9, 1]
end

max_date = if ARGV[1]
  max_date_args = ARGV[1].split('-').map {|item| item.to_i}
  Date.new(*max_date_args)
else
  Date.today
end

# Setup directories
hdfs_logs_dir = '/mnt/tmp/logs'
local_tmp_dir = './_tmp_merges'

puts "Cleaning up filesystem"
system "hadoop fs -rmr #{hdfs_logs_dir}"
system "rm -rf #{local_tmp_dir}*"

puts "Making HDFS directories"
system "hadoop fs -mkdir #{hdfs_logs_dir}"

# We will progress backwards, from max to min
date = max_date
while date >= min_date
  # Format date pieces
  year  = date.year
  month = "%02d" % date.month
  day   = "%02d" % date.day

  # Make a directory in HDFS to store this day's hourly logs
  today_hours_dir = "#{hdfs_logs_dir}/#{year}-#{month}-#{day}"
  puts "Making today's hourly directory"
  system "hadoop fs -mkdir #{today_hours_dir}"

  # Break the day's hours into a few chunks
  # This seems to avoid some problems when we run lots of getmerge commands in parallel
  [*(0..23)].each_slice(8).to_a.each do |hour_chunk|
    hour_chunk.each do |_hour|
      hour = "%02d" % _hour

      # Setup args to merge minute logs into hour logs
      source_file = "s3://#{SOURCE_BUCKET_NAME}/#{year}-#{month}-#{day}/product/out/#{hour}/"
      output_file = "#{local_tmp_dir}/#{hour}.log"

      # Launch each hour's getmerge in the background
      full_command = "hadoop fs -getmerge #{source_file} #{output_file}"
      puts "Forking: #{full_command}"
      fork { system full_command }
    end

    # Wait for this batch of the germerge's to finish
    Process.waitall
  end

  # Delete the local temp files Hadoop created
  puts "Removing temp files"
  system "rm #{local_tmp_dir}/.*.crc"

  # Move local hourly logs to hdfs to free up local space
  puts "Moving local logs to HDFS"
  system "hadoop fs -put #{local_tmp_dir}/* #{today_hours_dir}"

  puts "Removing local logs"
  system "rm -rf #{local_tmp_dir}"

  # Merge the day's hourly logs into a single daily log file
  daily_log_file_name = "#{year}-#{month}-#{day}.log"
  daily_log_file_path = "#{local_tmp_dir}_day/#{daily_log_file_name}"
  puts "Merging hourly logs into daily log"
  system "hadoop fs -getmerge #{today_hours_dir}/ #{daily_log_file_path}"

  # Write the daily log file to another s3 bucket
  puts "Writing daily log to s3"
  system "hadoop fs -put #{daily_log_file_path} s3://#{DESTINATION_BUCKET_DIR}/daily-merged-logs/#{daily_log_file_name}"

  # Remove daily log locally
  puts "Removing local daily logs"
  system "rm -rf #{local_tmp_dir}_day"

  # Remove the hourly logs from HDFS
  puts "Removing HDFS hourly logs"
  system "hadoop fs -rmr #{today_hours_dir}"

  # Go back in time
  date -= 1
end

【讨论】:

    猜你喜欢
    • 2014-08-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多