【问题标题】:Elasticsearch partial bulk updateElasticsearch 部分批量更新
【发布时间】:2018-05-25 08:59:46
【问题描述】:

我在 ElasticSearch 中有 6k 的数据要更新。我必须使用PHP。 我在文档中搜索并找到了这个,Bulk Indexing,但这并没有保留以前的数据。

我有结构:

[
  {
    'name': 'Jonatahn',
    'age' : 21
  }
]

我的代码 sn-p 更新:

$params =[
    "index" => "customer",
    "type" => "doc",
    "body" => [
        [
            "index" => [
                "_index" => "customer",
                "_type" => "doc",
                "_id" => "09310451939"
            ]
        ],
        [
            "name" => "Jonathan"
        ]
    ]
];

$client->bulk($params);

当我发送['name' => 'Jonathan'] 时,我希望name 会更新并保留age,但age 会被删除。 当然,我仍然可以逐个更新数据,但这需要很长时间,有没有更好的方法来做到这一点?

【问题讨论】:

    标签: php json elasticsearch bigdata bulk


    【解决方案1】:

    $batch_elastics 是结果数组 我每次都从行中取消设置这两个值.... 因为我在插入或更新中不需要这个值

    unset($batch_row['type']);
    
    unset($batch_row['diamonds_id']);
    

    代码从这里开始...

        if(count($batch_elastics)){
            // echo 'hi';die;
            $params = array();                
            $params = ['body' => []]; 
            $i=1;       
            foreach($batch_elastics as $batch_row){
                $type=$batch_row['type'];
                $id=$batch_row['diamonds_id'];
                unset($batch_row['type']);
                unset($batch_row['diamonds_id']); 
                if($type=="create"){                                    
                    $params["body"][]= [
                            "create" => [
                                "_index" => 'diamonds',                                                        
                                "_id" => $id,
                            ]
                        ];        
                        $params["body"][]= $batch_row;                             
                    if ($i % 1000 == 0) {
                        $responses = $client->bulk($params);                                
                        $params = ['body' => []];                                
                        unset($responses);
                    }
                } 
                $i=$i+1;
            }
            
            // Send the last batch if it exists
            if (!empty($params['body'])) {
                $responses = $client->bulk($params);
            }
            $params = array();                
            $params = ['body' => []]; 
            $i=1; 
            foreach($batch_elastics as $batch_row){
                $type=$batch_row['type'];
                $id=$batch_row['diamonds_id'];
                unset($batch_row['type']);
                unset($batch_row['diamonds_id']); 
                if($type=="update"){                                    
                    $params["body"][]= [
                            "update" => [
                                "_index" => 'diamonds',                                                        
                                "_id" => $id,
                            ]
                        ];        
                    $params["body"][]= [
                        "doc"=>$batch_row
                    ];                           
                    if ($i % 1000 == 0) {
                        $responses = $client->bulk($params);                                
                        $params = ['body' => []];                                
                        unset($responses);
                    }
                } 
                $i=$i+1;
            }
            
            // Send the last batch if it exists
            if (!empty($params['body'])) {
                $responses = $client->bulk($params);
            }
        }
    

    【讨论】:

      【解决方案2】:

      这是我的最终代码。

      <?php
      
      require_once('../elasticsearch.php');
      
      //initialize elasticsearch
      $params = array();
      
      $params['index'] = $elastcsearch_index;
      $params['type']  = $elastcsearch_type;
      
      ///////////////////////////////////////////////////
      //update seeders n leechers in elasticsearch 
      
      //get updated records
      $get_updated_records = mysqli_query($conn, "SELECT content_id, seeders, leechers FROM content WHERE is_updated = '1' order by seeders DESC") ;
      
      //create blank array
      $results = array();
      
      while($row = mysqli_fetch_assoc($get_updated_records)){
          //put all results in array
          $results[] = $row;
      
      }   
      
      //from https://www.elastic.co/guide/en/elasticsearch/client/php-api/current/_indexing_documents.html
      
      $params = ['body' => []];
      
      for($i = 0; $i < count($results); $i++) {
      
          $params["body"][]= [
                  "update" => [
                      "_index" => $elastcsearch_index,
                      "_type" => $elastcsearch_type,
                      "_id" => $results[$i]['content_id']
                  ]
              ];
      
          $params["body"][]= [
                  "doc" => [
                      "seeders" => intval($results[$i]['seeders']) ,
                      "leechers" => intval($results[$i]['leechers']) ,
                  ]
              ];
      
          // Every 1000 documents stop and send the bulk request
           if ($i % 1000 == 0) {
              $responses = $elasticsearch->bulk($params);
      
              // erase the old bulk request
              $params = ['body' => []];
      
              // unset the bulk response when you are done to save memory
              unset($responses);
          } 
      }
      
      // Send the last batch if it exists
      if (!empty($params['body'])) {
          $responses = $elasticsearch->bulk($params);
      }
      

      【讨论】:

      • 实际上_type 现在已被弃用。您不再需要添加它。将使用默认文档类型
      【解决方案3】:

      根据docs,批量 API 可能的操作是索引、创建、删除和updateupdate 期望在下一行指定部分 doc、upsert 和 script 及其选项。

      POST _bulk
      { "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
      { "doc" : {"field2" : "value2"} }
      

      【讨论】:

      【解决方案4】:

      我的错误是使用"index",但正确的做法是"update"

      最终代码为:

      $params =[
      "index" => "customer",
      "type" => "doc",
      "body" => [
          [
              "update" => [
          //   ^^^^^^ Here I change from index to update
                  "_index" => "customer",
                  "_type" => "doc",
                  "_id" => "09310451939"
              ]
          ],
          [
              "doc" => [
                  "name" => "Jonathan"
              ]
          ]
      ]
      ];
      
      $client->bulk($params);
      

      使用上面的代码,我的数据保留了以前的数据,只是更新了我在参数中传递的数据。

      回复:

      Array
      (
          [took] => 7
          [timed_out] =>
          [_shards] => Array
              (
                  [total] => 5
                  [successful] => 5
                  [skipped] => 0
                  [failed] => 0
              )
      
          [hits] => Array
              (
                  [total] => 1
                  [max_score] => 1
                  [hits] => Array
                      (
                          [0] => Array
                              (
                                  [_index] => customer
                                  [_type] => doc
                                  [_id] => 09310451939
                                  [_score] => 1
                                  [_source] => Array
                                      (
                                          [name] => Jonathan
                                          [age] => 23
                                      )
      
                              )
      
                      )
      
              )
      
      )
      

      【讨论】:

        猜你喜欢
        • 2019-02-16
        • 1970-01-01
        • 2017-08-02
        • 1970-01-01
        • 1970-01-01
        • 2022-08-10
        • 1970-01-01
        • 2021-06-14
        • 1970-01-01
        相关资源
        最近更新 更多