【问题标题】:How do I make an async call to Hive in Java?如何在 Java 中对 Hive 进行异步调用?
【发布时间】:2011-01-11 23:50:28
【问题描述】:

我想以异步方式在服务器上执行 Hive 查询。 Hive 查询可能需要很长时间才能完成,所以我不想阻止调用。我目前正在使用 Thirft 进行阻塞调用(client.execute() 上的阻塞),但我还没有看到如何进行非阻塞调用的示例。这是阻塞代码:

        TSocket transport = new TSocket("hive.example.com", 10000);
        transport.setTimeout(999999999);
        TBinaryProtocol protocol = new TBinaryProtocol(transport);
        Client client = new ThriftHive.Client(protocol);
        transport.open();
        client.execute(hql);  // Omitted HQL

        List<String> rows;
        while ((rows = client.fetchN(1000)) != null) {
            for (String row : rows) {
                // Do stuff with row
            }
        }

        transport.close();

上面的代码缺少 try/catch 块以保持简短。

有人知道如何进行异步调用吗? Hive/Thrift 可以支持吗?有没有更好的办法?

谢谢!

【问题讨论】:

  • 我现在对 Thrift 还不是很了解,但是你不能把它包装在一个 runnable 中并创建一个新的 Thread 吗?
  • 是的,很明显我可以自己完成这项工作,但有些东西让我觉得它已经内置在 Thrift 中,比如 TNonblockingSocket。我找不到任何关于如何使用它的例子,或者即使 Hive 支持它。

标签: java asynchronous rpc thrift hive


【解决方案1】:

安装此补丁后,现在可以在 Java thrift 客户端中进行异步调用: https://issues.apache.org/jira/browse/THRIFT-768

使用新的 thrift 生成异步 java 客户端并按如下方式初始化您的客户端:

TNonblockingTransport transport = new TNonblockingSocket("127.0.0.1", 9160);
TAsyncClientManager clientManager = new TAsyncClientManager();
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
Hive.AsyncClient client = new Hive.AsyncClient(protocolFactory, clientManager, transport);

现在您可以像在同步接口上一样在此客户端上执行方法。唯一的变化是所有方法都采用了一个额外的回调参数。

【讨论】:

    【解决方案2】:

    AFAIK,在撰写本文时,Thrift 不会生成异步客户端。此链接here(搜索“异步”文本)中解释的原因是 Thrift 是为假设延迟较低的数据中心设计的。

    不幸的是,您知道调用和结果之间的延迟并不总是由网络引起的,而是由正在执行的逻辑引起的!我们在从 Java 应用程序服务器调用 Cassandra 数据库时遇到了这个问题,我们希望限制总线程数。

    总结:目前您所能做的就是确保您有足够的资源来处理所需数量的阻塞并发线程并等待更有效的实现。

    【讨论】:

      【解决方案3】:

      与 Hive 邮件列表交谈后,Hive 不支持使用 Thirft 的异步调用。

      【讨论】:

        【解决方案4】:

        我们触发对AWS Elastic MapReduce 的异步调用。 AWS MapReduce 可以通过调用 AWS MapReduce Web 服务在 Amazon 的云上运行 hadoop/hive 作业。

        您还可以监控作业的状态,并在作业完成后从 S3 中获取结果。

        由于对 Web 服务的调用本质上是异步的,因此我们从不阻塞其他操作。我们继续在单独的线程中监控我们的作业状态,并在作业完成时获取结果。

        【讨论】:

          【解决方案5】:

          我对 Hive 一无所知,但作为最后的手段,你可以使用 Java 的并发库:

           Callable<SomeResult> c = new Callable<SomeResult>(){public SomeResult call(){
          
              // your Hive code here
          
           }};
          
           Future<SomeResult> result = executorService.submit(c);
          
           // when you need the result, this will block
           result.get();
          

          或者,如果您不需要等待结果,请使用 Runnable 而不是 Callable

          【讨论】:

            【解决方案6】:

            我不特别了解 Hive,但 任何 阻塞调用都可以通过生成新线程并使用回调在异步调用中打开。您可以查看java.util.concurrent.FutureTask,它旨在允许轻松处理此类异步操作。

            【讨论】:

              猜你喜欢
              • 1970-01-01
              • 2018-07-07
              • 2014-11-13
              • 1970-01-01
              • 1970-01-01
              • 2014-05-08
              • 1970-01-01
              • 2019-08-07
              • 1970-01-01
              相关资源
              最近更新 更多