array(2) { ["docs"]=> array(10) { [0]=> array(10) { ["id"]=> string(3) "428" ["text"]=> string(77) "Visual Studio 2017 单独启动MSDN帮助(Microsoft Help Viewer)的方法" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(8) "DonetRen" ["tagsname"]=> string(55) "Visual Studio 2017|MSDN帮助|C#程序|.NET|Help Viewer" ["tagsid"]=> string(23) "[401,402,403,"300",404]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511400964" ["_id"]=> string(3) "428" } [1]=> array(10) { ["id"]=> string(3) "427" ["text"]=> string(42) "npm -v;报错 cannot find module "wrapp"" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(4) "zzty" ["tagsname"]=> string(50) "node.js|npm|cannot find module "wrapp“|node" ["tagsid"]=> string(19) "[398,"239",399,400]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511400760" ["_id"]=> string(3) "427" } [2]=> array(10) { ["id"]=> string(3) "426" ["text"]=> string(54) "说说css中pt、px、em、rem都扮演了什么角色" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(12) "zhengqiaoyin" ["tagsname"]=> string(0) "" ["tagsid"]=> string(2) "[]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511400640" ["_id"]=> string(3) "426" } [3]=> array(10) { ["id"]=> string(3) "425" ["text"]=> string(83) "深入学习JS执行--创建执行上下文(变量对象,作用域链,this)" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(7) "Ry-yuan" ["tagsname"]=> string(33) "Javascript|Javascript执行过程" ["tagsid"]=> string(13) "["169","191"]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511399901" ["_id"]=> string(3) "425" } [4]=> array(10) { ["id"]=> string(3) "424" ["text"]=> string(30) "C# 排序技术研究与对比" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(9) "vveiliang" ["tagsname"]=> string(0) "" ["tagsid"]=> string(2) "[]" ["catesname"]=> string(8) ".Net Dev" ["catesid"]=> string(5) "[199]" ["createtime"]=> string(10) "1511399150" ["_id"]=> string(3) "424" } [5]=> array(10) { ["id"]=> string(3) "423" ["text"]=> string(72) "【算法】小白的算法笔记:快速排序算法的编码和优化" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(9) "penghuwan" ["tagsname"]=> string(6) "算法" ["tagsid"]=> string(7) "["344"]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511398109" ["_id"]=> string(3) "423" } [6]=> array(10) { ["id"]=> string(3) "422" ["text"]=> string(64) "JavaScript数据可视化编程学习(二)Flotr2,雷达图" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(7) "chengxs" ["tagsname"]=> string(28) "数据可视化|前端学习" ["tagsid"]=> string(9) "[396,397]" ["catesname"]=> string(18) "前端基本知识" ["catesid"]=> string(5) "[198]" ["createtime"]=> string(10) "1511397800" ["_id"]=> string(3) "422" } [7]=> array(10) { ["id"]=> string(3) "421" ["text"]=> string(36) "C#表达式目录树(Expression)" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(4) "wwym" ["tagsname"]=> string(0) "" ["tagsid"]=> string(2) "[]" ["catesname"]=> string(4) ".NET" ["catesid"]=> string(7) "["119"]" ["createtime"]=> string(10) "1511397474" ["_id"]=> string(3) "421" } [8]=> array(10) { ["id"]=> string(3) "420" ["text"]=> string(47) "数据结构 队列_队列实例:事件处理" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(7) "idreamo" ["tagsname"]=> string(40) "C语言|数据结构|队列|事件处理" ["tagsid"]=> string(23) "["246","247","248",395]" ["catesname"]=> string(12) "数据结构" ["catesid"]=> string(7) "["133"]" ["createtime"]=> string(10) "1511397279" ["_id"]=> string(3) "420" } [9]=> array(10) { ["id"]=> string(3) "419" ["text"]=> string(47) "久等了,博客园官方Android客户端发布" ["intro"]=> string(288) "目录 ECharts 异步加载 ECharts 数据可视化在过去几年中取得了巨大进展。开发人员对可视化产品的期望不再是简单的图表创建工具,而是在交互、性能、数据处理等方面有更高的要求。 chart.setOption({ color: [ " ["username"]=> string(3) "cmt" ["tagsname"]=> string(0) "" ["tagsid"]=> string(2) "[]" ["catesname"]=> string(0) "" ["catesid"]=> string(2) "[]" ["createtime"]=> string(10) "1511396549" ["_id"]=> string(3) "419" } } ["count"]=> int(200) } 222 Hadoop fs -put bandwidth 暴力版 - 爱码网
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// scalastyle:off println
package com.weibo.tools

import java.io.{BufferedInputStream,FileInputStream}
import java.net.URI
import java.io.BufferedInputStream
import java.util.concurrent.TimeUnit

import org.apache.hadoop.conf.{Configuration => hdfsConfig}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.io.IOUtils

import org.apache.spark.{SparkConf, SparkContext}

object Bandwidthlimited_local2HDFS_Writer {
  val kiloByte = 1024
  def upload_one_buffer(inStream : java.io.BufferedInputStream,
    outputStream : org.apache.hadoop.fs.FSDataOutputStream,
    log_buffer : Array[Byte],
    pre_buffer_sum : Long,
    totalSize : Long
  ) : Long = {
    val readSize = inStream.read(log_buffer) 
    val buffer_sum = pre_buffer_sum + readSize
    outputStream.write(log_buffer.splitAt(readSize)._1)
    outputStream.flush
    TimeUnit.MILLISECONDS.sleep(999)
    // println(s"${inStream} uploading. ${buffer_sum} uploaded. readSize : ${readSize}. ${buffer_sum * 100 / totalSize}% finished. ")
    buffer_sum
  }
  def LocalLog2HDFS_Writer(sc : SparkContext, 
    localSrcPath : String, 
    remoteTarPath : String,
    bandwidth : String
  ) : Long = {
    sc.hadoopConfiguration.setBoolean("dfs.support.append",true)
    val hdfs = FileSystem.get(new URI("/"), sc.hadoopConfiguration)
    val filePath = new Path(remoteTarPath)
    val inStream = new BufferedInputStream(new FileInputStream(localSrcPath))
    val totalSize = inStream.available
    hdfs.exists(filePath) match {
      case false => hdfs.create(filePath).close
      case true => println(hdfs.getFileStatus(filePath).toString)
    }
    val outputStream = hdfs.append(filePath)
    val buffer_size = kiloByte * bandwidth.toInt
    val log_buffer = new Array[Byte](buffer_size)
    var buffer_sum = 0L
    try {
        while(inStream.available >= buffer_size) {
          val readSize = inStream.read(log_buffer) 
          buffer_sum += readSize
          outputStream.write(log_buffer.splitAt(readSize)._1)
          outputStream.flush
          outputStream.hflush
          println(s"${localSrcPath} uploading. ${buffer_sum} uploaded. readSize : ${readSize}. ${buffer_sum * 100 / totalSize}% finished. ")
          TimeUnit.MILLISECONDS.sleep(999)
        }
        if(inStream.available > 0) {
          val readSize = inStream.read(log_buffer) 
          buffer_sum += readSize
          outputStream.write(log_buffer.splitAt(readSize)._1)
          outputStream.flush
          println(s"${localSrcPath} uploading. ${buffer_sum} uploaded. readSize : ${readSize}. ${buffer_sum * 100 / totalSize}% finished. ")
        }
      } finally {
        inStream.close
        outputStream.close
      }  
      buffer_sum
  }
  def Local2HDFS_Writer(sc : SparkContext, args: Array[String]) : Long = {
    val helper_info = """    the file localSrcPath pointed limited 1.999G
    Bandwidthlimited_local2HDFS_Writer localSrcPath remoteTarPath bandwidth=10K(by KB)"""
    println(helper_info)
    require(args.size >= 3, helper_info)
    val localSrcPath = args(0)
    val remoteTarPath = args(1)
    val bandwidth = args(2)
    LocalLog2HDFS_Writer(sc, localSrcPath, remoteTarPath, bandwidth)
  }
  def LocalLogReducer2HDFS(sc : SparkContext, taskList : List[(String, String)], bandwidth : String) : Int = {
    var sum = 0
    taskList.iterator.map{
      case (localSrcPath, remoteTarPath) =>
      LocalLog2HDFS_Writer(sc, localSrcPath, remoteTarPath, bandwidth) 
      sum += 1
    }
    sum
  }
  def LocalLogReducer(sc : SparkContext, srcParentPath : String, bandwidth : String) = {}

  def main(args: Array[String]) {

    val conf = new SparkConf()
      .setAppName("Bandwidthlimited_local2HDFS_Writer")
      .setMaster("local[1]")
    val sc = new SparkContext(conf)
    Local2HDFS_Writer(sc, args)
    sc.stop()
  }
}

https://github.com/Suanec/Betn_repo/blob/32d56acd3b57efc15573389619ed7793efdf298c/joyCodes/assembly_lib/src/main/scala/Bandwidthlimited_local2HDFS_Writer.scala

 

暴力破解版,为了优先实现功能,利用Spark + Scala依托于Hadoop API,实现了一个上传限速的功能。存在的问题:

1. hdfs 官方说append本身是不安全的,不建议使用在生产环境中。

2. 限制网速是通过限制流的读写来实现的,可能会出现网速震荡,但平均值符合预期。

3. 网速限制以KB为单位,请留意。

4. 文件大小受限于读入流的问题,目前仅能保证1.999G文件正常使用,超过后可能出现,进度监控失败,重复上传,乱码等问题。

相关文章: