【问题标题】:ElasticSearch and Apache HttpAsyncClientElasticSearch 和 Apache HttpAsyncClient
【发布时间】:2016-04-07 04:42:09
【问题描述】:

我正在尝试将 ElasticSearch REST API 与 Java Apache HttpAsyncClient 库一起使用。我想使用持久流水线连接。下面是一些测试代码(输出为 cmets):

@Test
public void testEsPipeliningClient() throws IOException, ExecutionException, InterruptedException
{
    testPost(HttpAsyncClients.createDefault());
    //201: {"_index":"test_index","_type":"test_type","_id":"AVIHYGnqdqqg_TAHm4ix","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"created":true}
    testPost(HttpAsyncClients.createPipelining());
    //400: No handler found for uri [http://127.0.0.1:9200/test_index/test_type] and method [POST]
}

private void testPost(CloseableHttpAsyncClient client) throws ExecutionException, InterruptedException, IOException
{
    client.start();
    HttpPost request = new HttpPost("http://127.0.0.1:9200/test_index/test_type");
    request.setEntity(new StringEntity("{\"some_field\": \"some_value\"}"));
    Future<HttpResponse> responseFuture = client.execute(request, null);
    HttpResponse response = responseFuture.get();
    System.err.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity()));
}

我不明白,为什么它适用于 HttpAsyncClients.createDefault() 客户端,但不适用于 HttpAsyncClients.createPipelining()。我也无法理解这两种创建方法之间的区别。

为什么我使用createPipelining()时会收到错误响应?

我尝试查看与 https://httpbin.org/post 的区别,但它向我展示了两个选项的相同结果。我使用默认的 ElasticSearch 设置。

谢谢!


UPD1

我尝试使用 PUT 文档 (PUT http://127.0.0.1/test_index/test_type/&lt;doc id&gt;) 请求得到相同的结果 - 它适用于 createDefault() 但我在使用 createPipelining() 时遇到类似的错误 - 找不到处理程序 <...> .

但是当我尝试执行创建索引的请求 (PUT http://127.0.0.1/&lt;index name&gt;) 时出现另一个错误。请看下面的代码:

@Test
public void testEsPipeliningClient() throws IOException, ExecutionException, InterruptedException
{
    testCreateIndex(HttpAsyncClients.createDefault());
    //200: {"acknowledged":true}
    testCreateIndex(HttpAsyncClients.createPipelining());
    //400: {"error":{"root_cause":[{"type":"mapper_parsing_exception","reason":"failed to parse, document is empty"}],"type":"mapper_parsing_exception","reason":"failed to parse, document is empty"},"status":400}
}

private void testCreateIndex(CloseableHttpAsyncClient client) throws ExecutionException, InterruptedException, IOException
{
    client.start();
    HttpPut request = new HttpPut("http://127.0.0.1:9200/" + RandomStringUtils.randomAlphabetic(8).toLowerCase());
    Future<HttpResponse> responseFuture = client.execute(request, null);
    HttpResponse response = responseFuture.get();
    System.err.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity()));
}

正如我在this documentation page 看到的那样,ElasticSearch 默认支持 HTTP 流水线。也许我需要在 ES 设置中更改什么?


UPD2

以下是 UPD1 部分中具有不同日志记录设置的代码的一些线路日志:

Dorg.apache.commons.logging.simplelog.log.org.apache.http=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.wire=INFO

http://pastebin.com/v29uvgbj

-Dorg.apache.commons.logging.simplelog.log.org.apache.http.impl.conn=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.impl.client=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.client=DEBUG -Dorg.apache.commons.logging.simplelog.log.org.apache.http.wire=DEBUG

http://pastebin.com/G9ij15d6


UPD3

我只是尝试用 createMinimal() 替换 createDefault(),它导致了与 createPipelining() 相同的错误。 MinimalHttpAsyncClient 中的任何想法可能会导致此问题?也许有一种方法可以让我手动创建流水线客户端(使用构建器类)而不会出现这个问题?

【问题讨论】:

  • 你用的是哪个版本的ES?
  • 今天的最后一个,2.1.1
  • 请发布两个会话的电报日志
  • @oleg 有什么方法可以让 ES 记录所有请求吗?或者我需要手动嗅探我的流量?另外,我已经用一些新信息更新了这个问题,也许它会有用

标签: java http elasticsearch apache-httpclient-4.x apache-httpasyncclient


【解决方案1】:

实际上,您只需要从 URL 中提取主机,并仅使用绝对路径创建一个 HttpPost 对象。请参阅下面第二、第三和第五行的更改:

client.start();
HttpHost targetHost = new HttpHost("127.0.0.1", 9200);
HttpPost request = new HttpPost("/test_index/test_type");
request.setEntity(new StringEntity("{\"some_field\": \"some_value\"}"));
Future<HttpResponse> responseFuture = client.execute(targetHost, request, null);
HttpResponse response = responseFuture.get();
System.out.println(response.getStatusLine().getStatusCode() + ": " + EntityUtils.toString(response.getEntity()));

执行这三个更改并再次运行代码将产生以下结果:

201: {"_index":"test_index","_type":"test_type","_id":"AVISSimIZHOoPG8ibOyF","_version":1,"created":true}
201: {"_index":"test_index","_type":"test_type","_id":"AVISSimjZHOoPG8ibOyG","_version":1,"created":true}

【讨论】:

  • 似乎不可能在 Stack Overflow 设置第二个赏金 :( 但无论如何非常感谢!
【解决方案2】:

服务器必须在请求行中的绝对请求 URI 上阻塞

[DEBUG] wire - http-outgoing-1 >> "PUT http://127.0.0.1:9200/ydiwdsid HTTP/1.1[\r][\n]"

流水线模式下的HttpAsyncClient采用最小的协议处理链。它不会尝试重写请求对象的请求 URI。

对于您的特定情况,请求流水线似乎没有多大意义。更不用说,除非您分批提交请求,否则您甚至不会使用流水线执行。

【讨论】:

  • 如何检查它是否真的与绝对请求 URI 有关?关于流水线执行:createPipelining() 客户端每次通过该客户端执行请求(对同一主机)时只使用一个连接,对吗?关于批次-您能解释一下我该如何尝试吗?我的真正目的是一次又一次地处理新请求而不等待响应(我有时想接收它们,但又一次 - 我不希望响应等待阻止新请求)。我应该使用什么客户端?
  • 我意识到当我使用MinimalHttpAsyncClient 执行我的代码时,它会以某种方式在 ElasticSearch 中创建名为http: 的索引。当我尝试执行以下命令时,同样的事情:nc 127.0.0.1 9200 &lt; absolute.http 其中 absolute.http 是:pastebin.com/zgfSHcNG 。但是当我尝试 nc 127.0.0.1 9200 &lt; relative.http 其中 relative.http 是:pastebin.com/b0yFCAB3 时,它可以正常工作。我无法管理如何使用nc 接收响应或如何使用curl 做同样的事情,但我认为absolute.http 的响应将是400(就像问题中一样)。跨度>
  • 我设法解决了这个问题(参见 Val 的回答)。但是我仍然想知道如果我想使用 HTTP 管道,如何正确编码所有内容,并避免等待响应变慢的因素?
  • 需要批量提交请求才能利用请求管道。见hc.apache.org/httpcomponents-asyncclient-4.1.x/httpasyncclient/…
  • 谢谢,我想我会的。但是我不能完全理解,为什么我不使用请求批处理而不利用请求管道?如果我执行一个请求,为它接收Future&lt;HttpResponse&gt;,然后执行另一个请求,则不必等到Future 变为isDone(),对吧?另外,在这种情况下只打开一个连接是对的吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-27
相关资源
最近更新 更多