【发布时间】:2016-04-21 08:22:31
【问题描述】:
我正在尝试使用 JEST 将 Elasticsearch HTTP 访问添加到 Titan ES client。 Titan-es 仅支持 ES 的本地和传输 (TCP) 模式。但我想支持通过 ES 的 HTTP 接口进行通信。这将允许像titan-es 这样的客户端库使用AWS Elasticsearch 作为只提供HTTP(S) 接口的索引后端。请参阅this post 了解更多信息。
我正在寻找有关我目前正在考虑的方法的一些反馈:
- 创建一个实现
org.elasticache.client.Client接口的新类ElasticsearchHttpClient。新类将使用JestClient作为其内部客户端。这样它将通过 HTTP 与 ES 通信。新类可能会扩展 ES 的AbstractClient,以减少必须实现的方法:admin()、settings()、execute()、threadPool()和close()。 - 将新枚举
HTTP_CLIENT添加到ElasticSearchSetup - 确保
HTTP_CLIENT上的connect()方法返回Connection的实例,其中包含node和client的正确值。client成员将是新的ElasticsearchHttpClient类的一个实例。 - 如果
INTERFACE配置为HTTP_CLIENT,请确保ElasticSearchIndex.interfaceConfiguration()方法检索Connection的正确实例(包含新的ElasticsearchHttpClient)。从那时起,其余代码应该继续在新协议上工作。
听起来应该可行吗?第一步是我最关心的问题 - 我不相信我可以使用 JestClient 实现所有客户端方法。
package com.thinkaurelius.titan.diskstorage.es;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.HttpClientConfig;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.*;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
public class ElasticsearchHttpClient extends AbstractClient {
private final JestClient internalClient;
private final ThreadPool pool;
public ElasticsearchHttpClient(String hostname, int port) {
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(new HttpClientConfig
.Builder(String.format("http://%s:%d", hostname, port))
.multiThreaded(true)
.build());
JestClient client = factory.getObject();
this.pool = new ThreadPool("jest");
this.internalClient = client;
}
@Override
public AdminClient admin() {
return null;
}
@Override
public Settings settings() {
return null;
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder, Client> action, Request request) {
try {
JestResult response = internalClient.execute(convertRequest(action, request));
return convertResponse(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
execute(action, request);
}
private <Response extends ActionResponse> ActionFuture<Response> convertResponse(JestResult result) {
// TODO How to convert a JestResult a Elasticsearch ActionResponse/ActionFuture?
return null;
}
private <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> io.searchbox.action.Action<JestResult> convertRequest(Action<Request, Response, RequestBuilder, Client> action, Request request) {
// TODO How to convert an Elasticsearch Action<..> and Request to a Jest Action<JestResult>?
return null;
}
@Override
public ThreadPool threadPool() {
return pool;
}
@Override
public void close() throws ElasticsearchException {
pool.shutdownNow();
}
}
[我也在Titan mailing list和Elasticsearch forum上问过这个问题。]
【问题讨论】:
标签: java elasticsearch titan amazon-elasticsearch elasticsearch-jest