【问题标题】:ElasticSearch Java API Client - Send already serialized data and avoid serializationElasticSearch Java API 客户端 - 发送已经序列化的数据并避免序列化
【发布时间】:2022-06-15 06:44:33
【问题描述】:

我有一个带有 JSON 数据的 Kafka 主题。现在我尝试使用新的“Java API 客户端”(https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/index.html)将这些 JSON 字符串发送到 ES 主题,但我遇到了解析器异常:

co.elastic.clients.elasticsearch._types.ElasticsearchException: [es/index] failed: [mapper_parsing_exception] failed to parse
at co.elastic.clients.transport.rest_client.RestClientTransport.getHighLevelResponse(RestClientTransport.java:281)
at co.elastic.clients.transport.rest_client.RestClientTransport.performRequest(RestClientTransport.java:147)
at co.elastic.clients.elasticsearch.ElasticsearchClient.index(ElasticsearchClient.java:953)

此异常发生在以下代码的最后一行:

final IndexRequest<String> request =
          new IndexRequest.Builder<String>()
              .index("myIndex")
              .id(String.valueOf(UUID.randomUUID()))
              .document(consumerRecord.value()) //already serialized json data
              .build();
elasticsearchClient.index(request);

据我了解,发生此异常是因为 ES 客户端尝试序列化我提供的数据,该数据已经序列化,导致 JSON 字符串格式错误。

有没有办法解决这个问题,只发送简单的 JSON 字符串?我也相信早期的“低级Java库”可以做到这一点,对吧?是的,我知道有一些方法可以让 Kafka 和 ES 之间进行通信,而无需编写 Consumer。

感谢任何提示。

【问题讨论】:

    标签: java elasticsearch elasticsearch-client


    【解决方案1】:

    我通过将“Java API Client”(https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/introduction.html) 替换为“Java Low Level Rest Client”(https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/java-rest-low.html) 解决了这个问题。

    这个库允许向 ES 发送任意 JSON 字符串:

      final Request request = new Request("POST", "/twitter/_doc");
      request.setJsonEntity(record.value());
      restClient.performRequest(request);
    

    【讨论】:

      【解决方案2】:

      如果您在创建ElasticsearchTransport 时使用JacksonJsonpMapper,则可以使用Jackson 的SerializedString 类发送已经序列化的JSON。

      ElasticsearchTransport transport = new RestClientTransport(
          createLowLevelRestClient(), // supply your own!
          new JacksonJsonpMapper()
      );
      
      ElasticsearchClient client = new ElasticsearchClient(transport);
      
      IndexResponse response = client.index(indexReq -> indexReq
          .index("my-index")
          .id("docId")
          .document(new SerializedString("{\"foo\":\"bar\"}"))
      );
      System.out.println(response);
      

      如果预序列化的 JSON 存储在字节数组中,则此解决方案需要先将其转换为 String。这并不理想,但它比反序列化和重新序列化 JSON 更有效。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2021-01-03
        • 2014-04-12
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-09-29
        • 1970-01-01
        相关资源
        最近更新 更多