【问题标题】:Go elasticsearch bulk insert去弹性搜索批量插入
【发布时间】:2022-06-16 20:39:57
【问题描述】:

我这几天都无法用elasticsearch Bulk方法解决这个问题,因为我Go语言不强,不久前开始学习它,同时执行代码:

package main

import (
    "bytes"
    json "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
)

type BulkInsertMetaData struct {
    Index []BulkInsertIndex `json:"index"`
}
type BulkInsertIndex struct {
    Index string `json:"_index"`
    ID    string `json:"_id"`
}

type BulInsertData struct {
    Url string `json:"url"`
}

func main() {
    dataMeta := BulkInsertMetaData{
        Index: []BulkInsertIndex{
            {
                Index: "Test",
                ID:    "1234567890",
            },
        },
    }
    data := BulInsertData{
        Url: "http://XXXX.XX",
    }
    TojsBulInsertData, _ := json.Marshal(data)
    TojsBulkInsertMetaData, _ := json.Marshal(dataMeta)
    BulkMetaData := bytes.NewBuffer(append(TojsBulkInsertMetaData, []byte("\n")...))
    BulkData := bytes.NewBuffer(append(TojsBulInsertData, []byte("\n")...))
    log.Println(BulkMetaData)
    log.Println(BulkData)
    respMetaData, err := http.Post("http://127.0.0.1:9200/_bulk", "application/json", BulkMetaData)
    if err != nil {
        log.Println(err)
    }
    body, err := ioutil.ReadAll(respMetaData.Body)
    if err != nil {
        log.Println(err)
    }
    fmt.Println(string(body))
    respBulkData, err := http.Post("http://127.0.0.1:9200/_bulk", "application/json", BulkData)
    if err != nil {
        log.Println(err)
    }
    body2, err := ioutil.ReadAll(respBulkData.Body)
    if err != nil {
        log.Println(err)
    }
    fmt.Println(string(body2))
}

但我得到一个错误:

2022/02/09 14:37:02 {"index":[{"_index":"Test","_id":"1234567890"}]}

2022/02/09 14:37:02 {"url":"http://XXXX.XX"}

{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"Malformed action/metadata line [1], expected START_OBJECT or END_OBJECT but found [START_ARRAY]"}],"type":"illegal_argument_exception","reason":"Malformed action/metadata line [1], expected START_OBJECT or END_OBJECT but found [START_ARRAY]"},"status":400}

请帮助并解释我做错了什么,我在互联网上搜索了我的问题的答案,但没有找到 我在使用 REST 客户端通过时测试插入没有问题

【问题讨论】:

  • 看看:stackoverflow.com/questions/45792309/…。 Bulk API 不需要换行符(最后一行除外)。文档提到:The REST API endpoint is /_bulk, and it expects the following newline delimited JSON (NDJSON) structure
  • 查看我上面的评论,如果您可以分享 JSON 文件的摘录,它将更容易回答问题:)
  • 我没有使用 json 文件,正如您在代码中看到的那样,我以编程方式形成 json,然后输出发生的情况,然后尝试插入到 elastick 中。根据文档中的描述,给出了一个例子: action_and_meta_data\n optional_source\n 。还是我错过了什么?

标签: go elasticsearch


【解决方案1】:

BulkMetaData 应该是{"index":{"_index":"Test","_id":"1234567890"}}(没有[])并且应该与BulkData 一起发送到/_bulk,作为单个有效负载:

{"index":{"_index":"Test","_id":"1234567890"}}
{"url":"http://XXXX.XX"}

【讨论】:

    【解决方案2】:

    抱歉,有点坏死,但我最近还需要在我们的代码库中设计一个 Bulk 连接器,而网络上没有 NDJSON 编码器/解码器的事实令人震惊。这是我的实现:

    func ParseToNDJson(data []map[string]interface{}, dst *bytes.Buffer) error {
        for _, element := range data {
            if err := json.NewEncoder(dst).Encode(element); err != nil {
                if err != io.EOF {
                    return fmt.Errorf("failed to parse NDJSON: %v", err)
                }
                break
            }
        }
        return nil
    }
    

    要测试的驱动代码:

    func main() {
        var b bytes.Buffer
        var data []map[string]interface{}
        // pointless data generation...
        for i, name := range []string{"greg", "sergey", "alex"} {
            data = append(data, map[string]interface{}{name: i})
        }
        if err := ParseToNDJson(query, &body); err != nil {
            return nil, fmt.Errorf("error encoding request: %s", err)
        }
        res, err := esapi.BulkRequest{
            Index:   "tasks",
            Body:    strings.NewReader(body.String()),
        }.Do(ctx, q.es)
    

    希望这对某人有所帮助

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-01-01
      • 2019-06-03
      • 1970-01-01
      • 1970-01-01
      • 2018-03-10
      • 1970-01-01
      • 1970-01-01
      • 2016-10-30
      相关资源
      最近更新 更多