OpenTSDB源码详解之写入数据到HBase

1.前言

openTSDB只是一个介质(工具),其底层还是把数据交给了HBase存储,但是这个整的一个数据流程是什么样的呢?见如下分析。

2.源码分析

openTSDB写入数据是通过RPC调用完成的。

final class RpcHandler extends IdleStateAwareChannelUpstreamHandler {...}

其有一个很重要的类叫做RpcHandler。这个类管理所有的Rpc连接。openTSDB的源码作者们对该类的释义为:Stateless handler for all RPCs: telnet-style, built-in or plugin HTTP.。即:针对所有RPCs的无状态的调用,telnet-style,built-in或者插件HTTP
该类中有一个接收 Rpc 消息方法 messageReceived,如下:

 //消息接收
  @Override
  public void messageReceived(final ChannelHandlerContext ctx,
                              final MessageEvent msgevent) {
    try {
      final Object message = msgevent.getMessage();
      if (message instanceof String[]) {
        handleTelnetRpc(msgevent.getChannel(), (String[]) message);
      } else if (message instanceof HttpRequest) {//如果是HttpRequest请求,那么使用下面的handleHttpQuery进行处理
        handleHttpQuery(tsdb, msgevent.getChannel(), (HttpRequest) message);
      } else {
        logError(msgevent.getChannel(), "Unexpected message type "
                 + message.getClass() + ": " + message);
        exceptions_caught.incrementAndGet();
      }
    } catch (Exception e) {
      Object pretty_message = msgevent.getMessage();
      if (pretty_message instanceof String[]) {
        pretty_message = Arrays.toString((String[]) pretty_message);
      }
      logError(msgevent.getChannel(), "Unexpected exception caught"
               + " while serving " + pretty_message, e);
      exceptions_caught.incrementAndGet();
    }
  }

该方法有两个参数: final ChannelHandlerContext ctxfinal MessageEvent msgevent。这两个都是Netty中的类。有关Netty的知识,菜鸟我在这里暂不叙述。
在方法 messageReceived(final ChannelHandlerContext ctx,final MessageEvent msgevent)中打上断点,查看到对象msgevent中的值如下:

msgevent = {[email protected]1839} "[id: 0x69793482, /192.168.211.2:4104 => /192.168.211.2:4399] RECEIVED: DefaultHttpRequest(chunked: false) POST /api/put?summary HTTP/1.1 Content-Length: 86 Content-Type: text/plain; charset=ISO-8859-1 Host: 192.168.211.2:4399 Connection: Keep-Alive User-Agent: Apache-HttpClient/4.5.2 (Java/1.8.0_77) Accept-Encoding: gzip,deflate"
channel = {[email protected]1869} "[id: 0x69793482, /192.168.211.2:4104 => /192.168.211.2:4399]"
message = {[email protected]1864} "DefaultHttpRequest(chunked: false)\r\nPOST /api/put?summary HTTP/1.1\r\nContent-Length: 86\r\nContent-Type: text/plain; charset=ISO-8859-1\r\nHost: 192.168.211.2:4399\r\nConnection: Keep-Alive\r\nUser-Agent: Apache-HttpClient/4.5.2 (Java/1.8.0_77)\r\nAccept-Encoding: gzip,deflate"
remoteAddress = {[email protected]1870} "/192.168.211.2:4104"

可以看到上述的值分成四个部分,分别是:msgevent,channel,message,remoteAddress。其
message部分有一句DefaultHttpRequest(chunked: false),请读者注意一下这个非chunkedrequest。下面会再次遇到。
if (message instanceof String[]) {...}是对message进行一个判断,如果messagestring[],那么使用handleTelnetRpc(msgevent.getChannel(), (String[]) message)方法;如果message是一个HttpRequest对象,则进入handleHttpQuery(tsdb, msgevent.getChannel(), (HttpRequest) message);。最后对一个异常情况进行了处理,并且使用execption_caught计数。
handleHttpQuery(tsdb, msgevent.getChannel(), (HttpRequest) message);这个方法就是用来解析Http Request。该方法的实现如下:

注: 这个handlerHttpQuery()方法也是RpcHandler类中的方法
 /**
   * Finds the right handler for an HTTP query (either built-in or user plugin) 
   * and executes it. Also handles simple and pre-flight CORS requests if 
   * configured, rejecting requests that do not match a domain in the list.
   * 寻找对于一个Http query的正确的handler(要么是内建的或者是用户插件)并且去执行它。
   * 如果已配置Cors的话,会同时处理简单的并且 pre-flight CORS 请求,拒绝与list中不匹配域的请求
   *
   * @param chan The channel on which the query was received.
   * @param req The parsed HTTP request.
   *
   *  01.这个类非常重要,只要是通过浏览器调用,则会来到这个方法中进行query处理
   *  后期的介绍会频繁的介绍此类,希望大家稍加记忆
   */
  private void handleHttpQuery(final TSDB tsdb, final Channel chan, final HttpRequest req) {
    AbstractHttpQuery abstractQuery = null;//定义个一个抽象的查询对象
    try {
      //创建查询实例对象  
      abstractQuery = createQueryInstance(tsdb, req, chan);
      if (!tsdb.getConfig().enable_chunked_requests() && req.isChunked()) {
        logError(abstractQuery, "Received an unsupported chunked request: "
            + abstractQuery.request());
        abstractQuery.badRequest(new BadRequestException("Chunked request not supported."));
        return;
      }
      // NOTE: Some methods in HttpQuery have side-effects (getQueryBaseRoute and 
      // setSerializer for instance) so invocation order is important here.
      final String route = abstractQuery.getQueryBaseRoute();
      if (abstractQuery.getClass().isAssignableFrom(HttpRpcPluginQuery.class)) {
        if (applyCorsConfig(req, abstractQuery)) {
          return;
        }
        final HttpRpcPluginQuery pluginQuery = (HttpRpcPluginQuery) abstractQuery;
        final HttpRpcPlugin rpc = rpc_manager.lookupHttpRpcPlugin(route);
      if (rpc != null) {
          rpc.execute(tsdb, pluginQuery);
      } else {
          pluginQuery.notFound();
        }
      } else if (abstractQuery.getClass().isAssignableFrom(HttpQuery.class)) {
        final HttpQuery builtinQuery = (HttpQuery) abstractQuery;
        builtinQuery.setSerializer();
        if (applyCorsConfig(req, abstractQuery)) {
          return;
        }
        final HttpRpc rpc = rpc_manager.lookupHttpRpc(route);
        if (rpc != null) {
          rpc.execute(tsdb, builtinQuery);
        } else {
          builtinQuery.notFound();
        }
      } else {
        throw new IllegalStateException("Unknown instance of AbstractHttpQuery: " 
            + abstractQuery.getClass().getName());
      }
    } catch (BadRequestException ex) {
      if (abstractQuery == null) {
        LOG.warn("{} Unable to create query for {}. Reason: {}", chan, req, ex);
        sendStatusAndClose(chan, HttpResponseStatus.BAD_REQUEST);
      } else {
        abstractQuery.badRequest(ex);
      }
    } catch (Exception ex) {
      exceptions_caught.incrementAndGet();
      if (abstractQuery == null) {
        LOG.warn("{} Unexpected error handling HTTP request {}. Reason: {} ", chan, req, ex);
        sendStatusAndClose(chan, HttpResponseStatus.INTERNAL_SERVER_ERROR);
      } else {
        abstractQuery.internalError(ex);
      }
    }
  }

首先abstractQuery = createQueryInstance(tsdb, req, chan);使用方法createQueryInstance(tsdb,req,chan)去创建一个query对象。并使引用abstractQuery指向此对象。
其中createQueryInstance(tsdb, req, chan)的方法如下:

 /**
   * Using the request URI, creates a query instance capable of handling 
   * the given request.
   * 使用请求的URI,创建一个有能力处理给出的请求的查询实例
   *
   * @param tsdb the TSDB instance we are running within
   *             我们正在运行的TSDB实例
   * @param request the incoming HTTP request
   *                即将到来的HTTP请求
   * @param chan the {@link Channel} the request came in on.
   *             到来的request对象 所附带的Channel
   * @return a subclass of {@link AbstractHttpQuery}
   *            返回一个AbstractHttpQuery的抽象类
   * @throws BadRequestException if the request is invalid in a way that
   * can be detected early, here.
   */
  private AbstractHttpQuery createQueryInstance(final TSDB tsdb,
        final HttpRequest request,
        final Channel chan) 
            throws BadRequestException {
    final String uri = request.getUri();//得到请求的uri
	
    if (Strings.isNullOrEmpty(uri)) {
      throw new BadRequestException("Request URI is empty");
    } else if (uri.charAt(0) != '/') {//要求uri以斜杠开头
      throw new BadRequestException("Request URI doesn't start with a slash");
    } else if (rpc_manager.isHttpRpcPluginPath(uri)) {      
	  //http_plugin_rpcs_received是一个AtomicLong类型,用于标记请求次数
	  http_plugin_rpcs_received.incrementAndGet();
      return new HttpRpcPluginQuery(tsdb, request, chan);//返回一个构造后的实例对象
    } else {
      http_rpcs_received.incrementAndGet();
      HttpQuery builtinQuery = new HttpQuery(tsdb, request, chan);
      return builtinQuery;
    }
  }

分为如下几个步骤:

  • 拿到请求的uri
  • 接着对请求的uri进行判断,如果不符合要求,则直接抛出异常
  • 判断是不是HttpRpcPluginPath类型,【对于大多数request,这里得返回值都是false】
  • 最后,修改http_rpcs_received的数目
  • 产生一个HttpQuery对象
    其中的HttpQuery类如下:
 /**
   * Constructor.
   * @param request The request in this HTTP query.
   *                HTTP查询中的请求
   * @param chan The channel on which the request was received.
   *                接收请求的通道
   */
  public HttpQuery(final TSDB tsdb, final HttpRequest request, final Channel chan) {
    super(tsdb, request, chan);//调用父类的构造器
    this.show_stack_trace =
      tsdb.getConfig().getBoolean("tsd.http.show_stack_trace");

    //新建一个序列化器(用于解析Http Json); 参数是这个query
    this.serializer = new HttpJsonSerializer(this);
  }
  • 其构造参数有三个,一是:TSDB实例tsdb;二是HttpRequest请求对象request;三是NIO中的一个Channel对象【对于java nio/rpc 以及netty这块儿,我还不大理解,希望大牛们还可以多多指点】。
  • 调用父类的构造器AbstractHttpQuery(final TSDB tsdb,final HttpRequest request, final Channel chan)。主要是将query对象的几个属性赋值
  • show_stack_trace变量表示的是是否应该在输出中展示堆栈跟踪。这个值会根据Config类中或者.properties文件中的值来决定是否赋值
  • 最后使query对象中的 serializer 指向一个 HttpJsonSerializer实例对象
    到此,已经创建好了一个内置的query对象,即 builtinQuery。同时 abstractQuery 指向这个对象。回到 handleHttpQuery方法。有一个if判断。如下:
      if (!tsdb.getConfig().enable_chunked_requests() && req.isChunked()) {
        logError(abstractQuery, "Received an unsupported chunked request: "
            + abstractQuery.request());
        abstractQuery.badRequest(new BadRequestException("Chunked request not supported."));
        return;
      }

下面这个if是用于判断是否是chunked_request()请求。各读者可以看 enable_chunked_requests()方法。代码如下:

/** @return whether or not chunked requests are supported
   *        是否支持组块请求  => what is chunked request?
   * */
  public boolean enable_chunked_requests() {
    return enable_chunked_requests;
  }

为此,对于块请求,我搜了一下,stackoverflow上有一个回答如下:

I’ve finally found the reason it thinks my request is chunked: OpenTSDB internally uses Netty for networking, and if Netty reads a block that doesn’t contain the complete request, then it’s flagged as chunked, even if there’s no Transfer-Encoding header in the request.

回答链接地址为:https://stackoverflow.com/questions/27841071/what-is-a-chunked-request-in-opentsdb
但是可以很肯定的告诉作者,只要是正常调用,都会进入到此分支。
接着获取查询的路由值——route,debug得到的值为下图:
OpenTSDB源码详解之写入数据到HBase可以看到该值为 api/put
接着判断abstractQuery是什么类型的query。因为我们创建的时候,是一个HttpQuery对象,所以这里执行如下分支:

else if (abstractQuery.getClass().isAssignableFrom(HttpQuery.class)) {
        final HttpQuery builtinQuery = (HttpQuery) abstractQuery;
        //为下面这个builtinQuery建立一个serializer
          //01.可以看到这个方法没有返回值,只是对
          builtinQuery.setSerializer();
        if (applyCorsConfig(req, abstractQuery)) {//注意一下这里的返回值
          return;
        }
        //注意下面rpc的值是多少?
        final HttpRpc rpc = rpc_manager.lookupHttpRpc(route);
        if (rpc != null) {
          rpc.execute(tsdb, builtinQuery);
        } else {
          builtinQuery.notFound();
        }
      }
  • step 1.将query对象强转成HttpQuery对象
  • step 2.setSerializer()方法设置一个serializer.
  • step 3.因为没有在服务器端配置CORS,所以下面的if(applyCorsConfig(req,abstractQuery))也不会执行,直接通过rpc_manager.lookupHttpRpc(route)得到一个rpc对象。
    笔者在debug的时候,想查看rpc的值是多少来着,哪知道这里的rpc并非一个值,而是一个不可描述的东西【希望有大佬给予指点】
    OpenTSDB源码详解之写入数据到HBase
  • 其中loolupHttpRpc方法实现如下
/**
   * Lookup a built-in {@link HttpRpc} based on the given {@code queryBaseRoute}.
   * 根据给定的queryBaseRoute,查找一个内置的HttpRpc
   *
   * The lookup is based on exact match of the input parameter and the registered
   * 查找基于输入参数和注册参数的精确匹配
   *
   * {@link HttpRpc}s.
   * @param queryBaseRoute the HTTP query's base route, with no trailing or leading slashes.  For example: {@code api/query}
   *                       Http请求的基础路由,没有前导/末尾的斜杠。例如:api/query
   *
   
   * @return the {@link HttpRpc} for the given {@code queryBaseRoute} or
   * {@code null} if not found.
			对于指定的queryBaseRoute,返回HttpRpc;如果没有queryBaseRoute被发现,则为null
   
   */
  HttpRpc lookupHttpRpc(final String queryBaseRoute) {
    return http_commands.get(queryBaseRoute);
  }
  • 最关键的一步就是execute()方法,开始执行此query。该execute()方法有很多实现方法,但是在此我们会执行的是PutDataPointRpc这个类中的execute()方法。
    OpenTSDB源码详解之写入数据到HBase
    接下来看看最重要的方法execute()。先把整个代码贴出来,如下:
 /**
   * Handles HTTP RPC put requests
   * 处理HTTP RPC put请求
   *
   * @param tsdb The TSDB to which we belong
   * @param query The HTTP query from the user
   *              来自用户的HTTP 查询请求
   * @throws IOException if there is an error parsing the query or formatting 
   * the output 如果在解析查询或者格式化输出的时候有一个错误,则会抛出一个异常
   *
   * @throws BadRequestException if the user supplied bad data
   *         如果用户提供了错误的数据
   * @since 2.0
   */
  public void execute(final TSDB tsdb, final HttpQuery query)
    throws IOException {
    requests.incrementAndGet();
    
    if (query.method() != HttpMethod.POST) {
      throw new BadRequestException(HttpResponseStatus.METHOD_NOT_ALLOWED, 
          "Method not allowed", "The HTTP method [" + query.method().getName() +
          "] is not permitted for this endpoint");
    }
    //query.serializer()用于得到一个序列化器  =>  默认是HttpJsonSerializer
      //01.如下的dps中存储的就是我们所需要的写入hbase的值
      //02.注意观察dps的值
      //03.执行到query.serializer().parsePutV1()时,稍稍停顿了1s
    final List<IncomingDataPoint> dps = query.serializer().parsePutV1();
    if (dps.size() < 1) {
      throw new BadRequestException("No datapoints found in content");
    }

    //判断query中是否包含这些参数:details -> summay -> sync -> sync_timeout
    final boolean show_details = query.hasQueryStringParam("details");
    final boolean show_summary = query.hasQueryStringParam("summary");
    final boolean synchronous = query.hasQueryStringParam("sync");
    final int sync_timeout = query.hasQueryStringParam("sync_timeout") ? 
        Integer.parseInt(query.getQueryStringParam("sync_timeout")) : 0;
    // this is used to coordinate timeouts

      final boolean host = query.hasQueryStringParam("host");
     
    final AtomicBoolean sending_response = new AtomicBoolean();
    sending_response.set(false);
        
    final ArrayList<HashMap<String, Object>> details = show_details
      ? new ArrayList<HashMap<String, Object>>() : null;
    int queued = 0;

    //又见Deferred类型
    final List<Deferred<Boolean>> deferreds = synchronous ? 
        new ArrayList<Deferred<Boolean>>(dps.size()) : null;
    for (final IncomingDataPoint dp : dps) {
        /** Handles passing a data point to the storage exception handler if
         *  we were unable to store it for any reason
         *
         * */
      final class PutErrback implements Callback<Boolean, Exception> {
        public Boolean call(final Exception arg) {
          handleStorageException(tsdb, dp, arg);
          hbase_errors.incrementAndGet();
          
          if (show_details) {
            details.add(getHttpDetails("Storage exception: " 
                + arg.getMessage(), dp));
          }
          return false;
        }
        public String toString() {
          return "HTTP Put Exception CB";
        }
      }
      
      /**Simply marks the put as successful
       * 简单标记put是成功的
       * */
      final class SuccessCB implements Callback<Boolean, Object> {
        @Override
        public Boolean call(final Object obj) {
          return true;
        }
        public String toString() {
          return "HTTP Put success CB";
        }
      }

        //判断数据点中的属性是否符合要求
      try {
          //01.首先判断metric
          //在if中再次使用if(show_details)的原因是,如果用户想查看detail信息。
        if (dp.getMetric() == null || dp.getMetric().isEmpty()) {
          if (show_details) {
            details.add(this.getHttpDetails("Metric name was empty", dp));
          }
          LOG.warn("Metric name was empty: " + dp);
          illegal_arguments.incrementAndGet();
          continue; //注意这里使用的是continue,而不是break
        }

          //02.再判断时间戳
        if (dp.getTimestamp() <= 0) {
          if (show_details) {
            details.add(this.getHttpDetails("Invalid timestamp", dp));
          }
          LOG.warn("Invalid timestamp: " + dp);
          illegal_arguments.incrementAndGet();
          continue;
        }

        //03.接着判断值
        if (dp.getValue() == null || dp.getValue().isEmpty()) {
          if (show_details) {
            details.add(this.getHttpDetails("Empty value", dp));
          }
          LOG.warn("Empty value: " + dp);
          invalid_values.incrementAndGet();
          continue;
        }

        //04.最后判断Tags
        if (dp.getTags() == null || dp.getTags().size() < 1) {
          if (show_details) {
            details.add(this.getHttpDetails("Missing tags", dp));
          }
          LOG.warn("Missing tags: " + dp);
          illegal_arguments.incrementAndGet();
          continue;
        }
        final Deferred<Object> deferred;

        //这里使用Tags类中的looksLikeInteger()方法显得有点儿不合适,应该将这个方法抽成一个单独的Util
          //不然你会以为这是在处理tags
        if (Tags.looksLikeInteger(dp.getValue())) {
          //addPoint()方法返回一个Deferred对象
            //01.addPoint()方法
            deferred = tsdb.addPoint(
                  dp.getMetric(),
                  dp.getTimestamp(),
                  Tags.parseLong(dp.getValue()), dp.getTags());
        } else {
          deferred = tsdb.addPoint(dp.getMetric(), dp.getTimestamp(), 
              Float.parseFloat(dp.getValue()), dp.getTags());
        }
        if (synchronous) {
          deferreds.add(deferred.addCallback(new SuccessCB()));
        }
        deferred.addErrback(new PutErrback());
        ++queued;
      } catch (NumberFormatException x) {
        if (show_details) {
          details.add(this.getHttpDetails("Unable to parse value to a number", 
              dp));
        }
        LOG.warn("Unable to parse value to a number: " + dp);
        invalid_values.incrementAndGet();
      } catch (IllegalArgumentException iae) {
        if (show_details) {
          details.add(this.getHttpDetails(iae.getMessage(), dp));
        }
        LOG.warn(iae.getMessage() + ": " + dp);
        illegal_arguments.incrementAndGet();
      } catch (NoSuchUniqueName nsu) {
        if (show_details) {
          details.add(this.getHttpDetails("Unknown metric", dp));
        }
        LOG.warn("Unknown metric: " + dp);
        unknown_metrics.incrementAndGet();
      }
    }
     /** A timer task that will respond to the user with the number of timeouts
     * for synchronous writes. */
    class PutTimeout implements TimerTask {
      final int queued;
      public PutTimeout(final int queued) {
        this.queued = queued;
      }
      @Override
      public void run(final Timeout timeout) throws Exception {
        if (sending_response.get()) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Put data point call " + query + 
                " already responded successfully");
          }
          return;
        } else {
          sending_response.set(true);
        }
        
        // figure out how many writes are outstanding
        int good_writes = 0;
        int failed_writes = 0;
        int timeouts = 0;
        for (int i = 0; i < deferreds.size(); i++) {
          try {
            if (deferreds.get(i).join(1)) {
              ++good_writes;
            } else {
              ++failed_writes;
            }
          } catch (TimeoutException te) {
            if (show_details) {
              details.add(getHttpDetails("Write timedout", dps.get(i)));
            }
            ++timeouts;
          }
        }
        writes_timedout.addAndGet(timeouts);
        final int failures = dps.size() - queued;
        if (!show_summary && !show_details) {
          throw new BadRequestException(HttpResponseStatus.BAD_REQUEST,
              "The put call has timedout with " + good_writes 
                + " successful writes, " + failed_writes + " failed writes and "
                + timeouts + " timed out writes.", 
              "Please see the TSD logs or append \"details\" to the put request");
        } else {
          final HashMap<String, Object> summary = new HashMap<String, Object>();
          summary.put("success", good_writes);
          summary.put("failed", failures + failed_writes);
          summary.put("timeouts", timeouts);
          if (show_details) {
            summary.put("errors", details);
          }
          
          query.sendReply(HttpResponseStatus.BAD_REQUEST, 
              query.serializer().formatPutV1(summary));
        }
      }
    }
    
    // now after everything has been sent we can schedule a timeout if so
    // the caller asked for a synchronous write.
    final Timeout timeout = sync_timeout > 0 ? 
        tsdb.getTimer().newTimeout(new PutTimeout(queued), sync_timeout, 
            TimeUnit.MILLISECONDS) : null;
    
    /** Serializes the response to the client */
    class GroupCB implements Callback<Object, ArrayList<Boolean>> {
      final int queued;
      public GroupCB(final int queued) {
        this.queued = queued;
      }
      
      @Override
      public Object call(final ArrayList<Boolean> results) {
        if (sending_response.get()) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Put data point call " + query + " was marked as timedout");
          }
          return null;
        } else {
          sending_response.set(true);
          if (timeout != null) {
            timeout.cancel();
          }
        }
        int good_writes = 0;
        int failed_writes = 0;
        for (final boolean result : results) {
          if (result) {
            ++good_writes;
          } else {
            ++failed_writes;
          }
        }
        
        final int failures = dps.size() - queued;
        if (!show_summary && !show_details) {
          if (failures + failed_writes > 0) {
            query.sendReply(HttpResponseStatus.BAD_REQUEST, 
                query.serializer().formatErrorV1(
                    new BadRequestException(HttpResponseStatus.BAD_REQUEST,
                "One or more data points had errors", 
                "Please see the TSD logs or append \"details\" to the put request")));
          } else {
            query.sendReply(HttpResponseStatus.NO_CONTENT, "".getBytes());
          }
        } else {
          final HashMap<String, Object> summary = new HashMap<String, Object>();
          if (sync_timeout > 0) {
            summary.put("timeouts", 0);
          }
          summary.put("success", results.isEmpty() ? queued : good_writes);
          summary.put("failed", failures + failed_writes);
          if (show_details) {
            summary.put("errors", details);
          }
          
          if (failures > 0) {
            query.sendReply(HttpResponseStatus.BAD_REQUEST, 
                query.serializer().formatPutV1(summary));
          } else {
            query.sendReply(query.serializer().formatPutV1(summary));
          }
        }
        
        return null;
      }
      @Override
      public String toString() {
        return "put data point serialization callback";
      }
    }
    
    /** Catches any unexpected exceptions thrown in the callback chain */
    class ErrCB implements Callback<Object, Exception> {
      @Override
      public Object call(final Exception e) throws Exception {
        if (sending_response.get()) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Put data point call " + query + " was marked as timedout");
          }
          return null;
        } else {
          sending_response.set(true);
          if (timeout != null) {
            timeout.cancel();
          }
        }
        LOG.error("Unexpected exception", e);
        throw new RuntimeException("Unexpected exception", e);
      }
      @Override
      public String toString() {
        return "put data point error callback";
      }
    }
    
    if (synchronous) {
      Deferred.groupInOrder(deferreds).addCallback(new GroupCB(queued))
        .addErrback(new ErrCB());
    } else {
      new GroupCB(queued).call(EMPTY_DEFERREDS);
    }
  • only accept POST【仅仅接收post请求】 => 为什么只接受post请求呢?
  • final List<IncomingDataPoint> dps = query.serializer().parsePutV1();得到的是预写入到hbase中的值,包括metric,tag pair,value。在调试过程中,可以看到此值如下:
    OpenTSDB源码详解之写入数据到HBase如果写入的值不对,会抛出一个No datapoints found in content异常。
  • step 2 判断query中是否包含detail,summary…等参数。之所以这么玩儿,是后面需要进行if判断
  • step 3 定义一个发送响应的 AtomicBoolean对象,应该是用于判断是否向Client发送了响应,初始化情况下的值为false(使用sending_response.set(false)方法)
  • 接着使用for()循环处理List<IncomingDataPoint>对象dps,其每个对象都是IncomingDataPoint 类型。处理程序分正常情况和异常处理。于是接着定义了两个内部类。用于异常处理和正常处理。
    • Errback
 final class PutErrback implements Callback<Boolean, Exception> {
        public Boolean call(final Exception arg) {
          handleStorageException(tsdb, dp, arg);
          hbase_errors.incrementAndGet();
          
          if (show_details) {
            details.add(getHttpDetails("Storage exception: " 
                + arg.getMessage(), dp));
          }
          return false;
        }
        public String toString() {
          return "HTTP Put Exception CB";
        }
      }
  • 成功情况
      
      /**Simply marks the put as successful
       * 简单标记put是成功的
       * */
      final class SuccessCB implements Callback<Boolean, Object> {
        @Override
        public Boolean call(final Object obj) {
          return true;
        }
        public String toString() {
          return "HTTP Put success CB";
        }
      }
  • 接着使用if代码块判断每个 IncomingDataPoint对象dp中的属性是否符合要求
   //01.首先判断metric
          //在if中再次使用if(show_details)的原因是,如果用户想查看detail信息。
        if (dp.getMetric() == null || dp.getMetric().isEmpty()) {
          if (show_details) {
            details.add(this.getHttpDetails("Metric name was empty", dp));
          }
          LOG.warn("Metric name was empty: " + dp);
          illegal_arguments.incrementAndGet();
          continue; //注意这里使用的是continue,而不是break
        }

          //02.再判断时间戳
        if (dp.getTimestamp() <= 0) {
          if (show_details) {
            details.add(this.getHttpDetails("Invalid timestamp", dp));
          }
          LOG.warn("Invalid timestamp: " + dp);
          illegal_arguments.incrementAndGet();
          continue;
        }

        //03.接着判断值
        if (dp.getValue() == null || dp.getValue().isEmpty()) {
          if (show_details) {
            details.add(this.getHttpDetails("Empty value", dp));
          }
          LOG.warn("Empty value: " + dp);
          invalid_values.incrementAndGet();
          continue;
        }

        //04.最后判断Tags
        if (dp.getTags() == null || dp.getTags().size() < 1) {
          if (show_details) {
            details.add(this.getHttpDetails("Missing tags", dp));
          }
          LOG.warn("Missing tags: " + dp);
          illegal_arguments.incrementAndGet();
          continue;
        }
  • 开始定义一个Deferred对象
final Deferred<Object> deferred;

这个deferred是openTSDB里面设计的一个关键所在,正是通过此Deferred才实现了多线程异步处理。

        if (Tags.looksLikeInteger(dp.getValue())) {
          //addPoint()方法返回一个Deferred对象
            //01.addPoint()方法
            deferred = tsdb.addPoint(
                  dp.getMetric(),
                  dp.getTimestamp(),
                  Tags.parseLong(dp.getValue()), dp.getTags());
        } else {
          deferred = tsdb.addPoint(dp.getMetric(), dp.getTimestamp(), 
              Float.parseFloat(dp.getValue()), dp.getTags());
        }
        if (synchronous) {
          deferreds.add(deferred.addCallback(new SuccessCB()));
        }
        deferred.addErrback(new PutErrback());
        ++queued;

对dp中的值进行判断,如果是int型 ->
public Deferred<Object> addPoint(final String metric, final long timestamp, final long value, final Map<String, String> tags);否则执行public Deferred<Object> addPoint(final String metric, final long timestamp, final float value, final Map<String, String> tags)。注意他们的返回对象都是Deferred<Object>。queued表示的是写入到Hbase中的值
deferreds.add(deferred.addCallback(new SuccessCB()));deferred.addErrback(new PutErrback()); 则是往这个deferred对象中添加回调链。
进入到addPoint()方法之后,代码如下:

 public Deferred<Object> addPoint(final String metric,
                                   final long timestamp,
                                   final long value,
                                   final Map<String, String> tags) {
    //用字节数组v来接收值value
      final byte[] v;

      //首先判断是不是byte类型值
    if (Byte.MIN_VALUE <= value && value <= Byte.MAX_VALUE) {
      v = new byte[] { (byte) value };
    } else if (Short.MIN_VALUE <= value && value <= Short.MAX_VALUE) {//接着判断是不是short类型值
      v = Bytes.fromShort((short) value);
    } else if (Integer.MIN_VALUE <= value && value <= Integer.MAX_VALUE) {//最后判断是不是Integer类型值
      v = Bytes.fromInt((int) value);
    } else {//否则将其从Long型转为Byte数组
      v = Bytes.fromLong(value);
    }

    final short flags = (short) (v.length - 1);  // Just the length.
    return addPointInternal(metric, timestamp, v, tags, flags);
  }

该类的作用是 “Adds a single integer value data point in the TSDB.”。它返回一个代表完成请求的deferred对象。该方法的实现步骤主要如下:

  • 定义一个byte[]数组
  • 接着判断传入的值value是否在各个类型范围类,找到最合适的范围 => 为了减少网络传输
  • 将value转为一个byte[]
  • 最后再调用 addPointInternal(...)方法

addPointInternal(...)方法如下

//在检验完metric,tag k-v是否符合标准之后,进入到addPointInternal()方法
  private Deferred<Object> addPointInternal(final String metric,
                                            final long timestamp,
                                            final byte[] value,
                                            final Map<String, String> tags,
                                            final short flags) {
    // we only accept positive unix epoch timestamps in seconds or milliseconds
    //对时间戳进行判断
    if (timestamp < 0 || ((timestamp & Const.SECOND_MASK) != 0 && 
        timestamp > 9999999999999L)) {
      throw new IllegalArgumentException((timestamp < 0 ? "negative " : "bad")
          + " timestamp=" + timestamp
          + " when trying to add value=" + Arrays.toString(value) + '/' + flags
          + " to metric=" + metric + ", tags=" + tags);
    }

    IncomingDataPoints.checkMetricAndTags(metric, tags);
    final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags);
    final long base_time;//这个值是用来做啥的???=> 应该就是后期在compact row key的时候用来检测是否old的标志

      //注意研究这里的qualifier的值
      final byte[] qualifier = Internal.buildQualifier(timestamp, flags);
    
    if ((timestamp & Const.SECOND_MASK) != 0) {
      // drop the ms timestamp to seconds to calculate the base timestamp
      base_time = ((timestamp / 1000) - 
          ((timestamp / 1000) % Const.MAX_TIMESPAN));
    } else {
      base_time = (timestamp - (timestamp % Const.MAX_TIMESPAN));
    }
    
    /** Callback executed for chaining filter calls to see if the value
     * should be written or not.
     * 为链接过滤器而执行的回调,去检测是否应该写入数据
     *
     * 注意这里的WriteCB实现了Callback接口
     * WriteCB 见名思意,其是write 类的callback.
     *
     * 01.进入到这个方法中, 这个WriteCB方法会被调用么?如果没有被调用,那么会在什么时候进行调用呢?
     * */
    final class WriteCB implements Callback<Deferred<Object>, Boolean> {

        //call()方法就是需要回调的函数
        //传入的参数是allowed
      @Override
      public Deferred<Object> call(final Boolean allowed) throws Exception {
        if (!allowed) {
          rejected_dps.incrementAndGet();//如果拒绝写入,则原子增加一个值
          return Deferred.fromResult(null);//
        }

        //setInt():使用大端法
          //对不理解base_time这个值
        Bytes.setInt(row, (int) base_time, metrics.width() + Const.SALT_WIDTH());
        RowKey.prefixKeyWithSalt(row);//为 row添加前缀


        //这个result用于接收返回后的参数
        Deferred<Object> result = null;
        if (config.enable_appends()) {//如果开启追加功能的话
          final AppendDataPoints kv = new AppendDataPoints(qualifier, value);
          final AppendRequest point = new AppendRequest(table, row, FAMILY, 
              AppendDataPoints.APPEND_COLUMN_QUALIFIER, kv.getBytes());
          result = client.append(point);
        } else {
          //这个base_time 应该就是后期在compact row key的时候用来检测是否old的标志
          scheduleForCompaction(row, (int) base_time);

          final PutRequest point = new PutRequest(table, row, FAMILY, qualifier, value);

          //put:Store data in hbase
          result = client.put(point);//这个地方开始真正的put数据了
        }

       ···
        return result;
      }
      @Override
      public String toString() {
        return "addPointInternal Write Callback";
      }
    }

    //这里的ts_filter 应该为null[在大多数情况下,都是null]
    if (ts_filter != null && ts_filter.filterDataPoints()) {
      return ts_filter.allowDataPoint(metric, timestamp, value, tags, flags)
          .addCallbackDeferring(new WriteCB());
    }

    return Deferred.fromResult(true).addCallbackDeferring(new WriteCB());
  }
  • 判断写入点的时间戳是否符合要求
  • 使用 checkMetricAndTags(metric,tags)检查metric,tags是否符合要求。
    该方法的实现如下:
  static void checkMetricAndTags(final String metric,
      final Map<String, String> tags) {
    if (tags.size() <= 0) {//首先判断tags长度是否达标
      throw new IllegalArgumentException("Need at least one tag (metric="
          + metric + ", tags=" + tags + ')');
    } else if (tags.size() > Const.MAX_NUM_TAGS()) {//判断tag的长度是否符合要求
      throw new IllegalArgumentException("Too many tags: " + tags.size()
          + " maximum allowed: " + Const.MAX_NUM_TAGS() + ", tags: " + tags);
    }

    //轮流判断metric,tag k-v【因为tags是一个map,所以里面的每个tags都需要遍历检验】
    Tags.validateString("metric name", metric);
    //entrySet():a set view of the mappings contained in this map
    for (final Map.Entry<String, String> tag : tags.entrySet()) {
      Tags.validateString("tag name", tag.getKey());
      Tags.validateString("tag value", tag.getValue());
    }
  }

OpenTSDB源码详解之写入数据到HBase
实现很简单,这里不再叙述

  • 接着定义了一个了字节数组 row[] => final byte[] row = IncomingDataPoints.rowKeyTemplate(this, metric, tags);。需要注意了。这里的row[]可是一个很重要的东西,这玩意儿就是后面的row key
  • 接着定义一个字节数组 qualifier[] => final byte[] qualifier = Internal.buildQualifier(timestamp, flags);。这个也是写入到hbase中的一个关键值。
  • 看一下Const类的实现
/** Constants used in various places.
 *  在不同的地方(类中)使用的常数
 * */
public final class Const {...}
  /** Maximum number of tags allowed per data point.
   *  每个数据点所允许的最大tags数。(默认值为8)
   *  01.静态变量
   *  02.有一个专门返回此变量的方法 —— MAX_NUM_TAGS()
   * */
  private static short MAX_NUM_TAGS = 8;
  public static short MAX_NUM_TAGS() {
    return MAX_NUM_TAGS;
  }
  • 再看一下 validateString()
 /**
   * Ensures that a given string is a valid metric name or tag name/value.
   * 确保给出的字符串是一个有效的metric 或者是一个tag name/value
   *
   *
   * @param what A human readable description of what's being validated.
   *             以人类可读方式的去描述什么正在被验证。例如:如果现在正在检查metric部分,那么这里的what="metric name";
   *             如果正在检验tag name,那么这里的what = "tag name"
   * @param s The string to validate.
   *          需要验证的字符串
   * @throws IllegalArgumentException if the string isn't valid.如果字符串是无效的,则抛出错误
   */
  public static void validateString(final String what, final String s) {
    if (s == null) {//先判断s是否为null
      throw new IllegalArgumentException("Invalid " + what + ": null");
    } else if ("".equals(s)) {//如果s="",抛出空字符串异常
      throw new IllegalArgumentException("Invalid " + what + ": empty string");
    }
    final int n = s.length();//获取字符串s的长度
    for (int i = 0; i < n; i++) {
      final char c = s.charAt(i);

      //如下这个逻辑用于判断metric/tag kv字段的字符是否符合要求
      //Character.isLetter(c): Determines if the specified character is a letter.
      //只要求下面的有一项为true即可
      if(!(
              ('a' <= c && c <= 'z')
              || ('A' <= c && c <= 'Z')
              || ('0' <= c && c <= '9')
              || c == '-'
              || c == '_'
              || c == '.'
              || c == '/'
              || Character.isLetter(c)
              || isAllowSpecialChars(c) //一般情况下,这会返回false
      )) {
        throw new IllegalArgumentException("Invalid " + what
            + " (\"" + s + "\"): illegal character: " + c);
      }
    }
  }
  • 接下来中的内部类 WriteCB 也是很为关键的。它实现了Callback接口。其中的关键设计有如下几点
    • Bytes.setInt(row, (int) base_time, metrics.width() + Const.SALT_WIDTH());这里预设计一个完整大小的row[]。【可以看到里面使用了base_tiem,metric.width(),以及salt的长度。为的就是腾出空间】
    • RowKey.prefixKeyWithSalt(row);则是为为 row添加salt前缀【这里的salt是为了实现HBase写均衡】
    • 如下是判断是否写追加的数据,【这个可以结合config中的enable_append的值来看】
if (config.enable_appends()) {//如果开启追加功能的话
          final AppendDataPoints kv = new AppendDataPoints(qualifier, value);
          final AppendRequest point = new AppendRequest(table, row, FAMILY, 
              AppendDataPoints.APPEND_COLUMN_QUALIFIER, kv.getBytes());
          result = client.append(point);
        }

否则执行如下代码

else {
          //这个base_time 就是后期在compact row key的时候用来检测是否old的标志
          //如果old,则压缩;否则不压缩
          scheduleForCompaction(row, (int) base_time);
          final PutRequest point = new PutRequest(table, row, FAMILY, qualifier, value);

          //put:Store data in hbase
          result = client.put(point);//这个地方开始真正的put数据了
        }
  • 接着new一个PutRequest对象,这个对象就是需要通过网络传输到底层hbase的数据。其中类PutRequest的释义为Puts some data into HBase.。其中的参数依次释义为:写入的表名,行键,列族,列名,值
  • 最后实现了 client.put(point)方法。

到此整个handleHttpQuery()过程算是结束了。

3.存在疑问

  • query 到底是一个json对象,还是一个啥对象? 该对象的值如下:
HttpQuery{start_time=2502348635391380, request=DefaultHttpRequest(chunked: false)
POST /api/put?summary HTTP/1.1
Content-Length: 86
Content-Type: text/plain; charset=ISO-8859-1
Host: 192.168.211.2:4399
Connection: Keep-Alive
User-Agent: Apache-HttpClient/4.5.2 (Java/1.8.0_77)
Accept-Encoding: gzip,deflate, chan=[id: 0x69793482, /192.168.211.2:4104 => /192.168.211.2:4399], querystring={summary=[]}}
  • 对Deferred类已经函数回调的过程不甚理解。

相关文章:

  • 2021-11-10
  • 2021-08-21
  • 2021-06-17
  • 2021-07-17
  • 2021-05-26
  • 2021-09-23
  • 2021-04-09
  • 2021-07-07
猜你喜欢
  • 2021-11-28
  • 2022-12-23
  • 2021-07-03
  • 2021-11-27
  • 2021-08-30
  • 2021-08-09
  • 2021-12-21
相关资源
相似解决方案