【问题标题】:symfony2 FOSElasticaBundle : how to postpone document updating when elasticsearch service is down?symfony2 FOSElasticaBundle:弹性搜索服务关闭时如何推迟文档更新?
【发布时间】:2015-04-18 09:00:51
【问题描述】:

我正在使用 symfony2 和 FOSElasticaBundle。

我的 elasticsearch 服务经常因为未知原因而被终止或失败。我已将 systemctl 与 restart always 作为临时修复。

不过,如果关闭,当学说更新实体时执行索引更新的 elasticsearch 侦听器会给我一个错误 52:

无法连接到主机,Elasticsearch 关闭?

因此,如果还使用更新最后用户连接日期的 FOSUserBundle,则会在记录时发生这种情况。对弹性搜索有如此依赖真是太烦人了。我已经为此错误设置了一个异常侦听器,但我希望捆绑包在以后服务再次可用时保留更新。

查看捆绑文件,我发现:

供应商/friendsofsymfony/elastica-bundle/Persister/ObjectPersister.php

public function replaceMany(array $objects)
{
    $documents = array();
    foreach ($objects as $object) {
        $document = $this->transformToElasticaDocument($object);
        $document->setDocAsUpsert(true);
        $documents[] = $document;
    }

    try {
        $this->type->updateDocuments($documents);
    } catch (BulkException $e) {
        $this->log($e);
    }
}

这是一项服务,我加入的可能会被如下覆盖,但它是另一个类继承的类,子类被实例化而不是作为服务调用,所以我不知道如何覆盖它。我怎么可能?

    try {
        $this->type->updateDocuments($documents);
    } catch (\Exception $e) {
        if ($e instanceof BulkException)
        {
            $this->log($e);
        }
        elseif ($e->getMessage() != "Couldn't connect to host, Elasticsearch down?")
        {
            throw $e;
        }
    }

那么,如何确保下次服务可用时更新文档?

编辑:

我得到错误时的跟踪:

Stack Trace
in vendor/ruflin/elastica/lib/Elastica/Transport/Http.php at line 153   -
        }
        if ($errorNumber > 0) {
            throw new HttpException($errorNumber, $request, $response);
        }
        return $response;
at Http ->exec (object(Request), array('connection' => array('config' => array('headers' => array()), 'host' => 'localhost', 'port' => '9200', 'logger' => 'fos_elastica.logger', 'enabled' => true))) 
in vendor/ruflin/elastica/lib/Elastica/Request.php at line 167   + 
at Request ->send () 
in vendor/ruflin/elastica/lib/Elastica/Client.php at line 587   + 
at Client ->request ('_bulk', 'PUT', '{"update":{"_index":"foodmeup","_type":"user","_id":4}} {"doc":{"firstName":"Dominique","lastName":"Descamps","content":null,"username":"ddescamps","email":"ddescamps@ebp-paris.com","jobSeeker":{"skills":[],"experiences":[],"trainings":[]}},"doc_as_upsert":true} ', array()) 
in vendor/friendsofsymfony/elastica-bundle/Elastica/Client.php at line 47   + 
at Client ->request ('_bulk', 'PUT', '{"update":{"_index":"foodmeup","_type":"user","_id":4}} {"doc":{"firstName":"Dominique","lastName":"Descamps","content":null,"username":"ddescamps","email":"ddescamps@ebp-paris.com","jobSeeker":{"skills":[],"experiences":[],"trainings":[]}},"doc_as_upsert":true} ', array()) 
in vendor/ruflin/elastica/lib/Elastica/Bulk.php at line 342   + 
at Bulk ->send () 
in vendor/ruflin/elastica/lib/Elastica/Client.php at line 270   + 
at Client ->updateDocuments (array(object(Document))) 
in vendor/ruflin/elastica/lib/Elastica/Index.php at line 131   + 
at Index ->updateDocuments (array(object(Document))) 
in vendor/ruflin/elastica/lib/Elastica/Type.php at line 174   + 
at Type ->updateDocuments (array(object(Document))) 
in vendor/friendsofsymfony/elastica-bundle/Persister/ObjectPersister.php at line 144   + 
at ObjectPersister ->replaceMany (array(object(User))) 
in vendor/friendsofsymfony/elastica-bundle/Doctrine/Listener.php at line 151   + 
at Listener ->persistScheduled () 
in vendor/friendsofsymfony/elastica-bundle/Doctrine/Listener.php at line 182   + 
at Listener ->postFlush (object(PostFlushEventArgs)) 
in vendor/symfony/symfony/src/Symfony/Bridge/Doctrine/ContainerAwareEventManager.php at line 63   + 
at ContainerAwareEventManager ->dispatchEvent ('postFlush', object(PostFlushEventArgs)) 
in vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php at line 3318   + 
at UnitOfWork ->dispatchPostFlushEvent () 
in vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php at line 428   + 
at UnitOfWork ->commit (null) 
in vendor/doctrine/orm/lib/Doctrine/ORM/EntityManager.php at line 357   + 
at EntityManager ->flush (null) 
in src/AppBundle/Model/Classes/CustomBaseController.php at line 61   + 
at CustomBaseController ->flush () 
in src/AppBundle/Controller/Core/VoteController.php at line 68   + 
at VoteController ->voteAction (object(Request), 'up', 'Post', 'permettre-le-partage-de-documents-avec-les-equipes') 
at call_user_func_array (array(object(VoteController), 'voteAction'), array(object(Request), 'up', 'Post', 'permettre-le-partage-de-documents-avec-les-equipes')) 
in app/bootstrap.php.cache at line 3029   + 
at HttpKernel ->handleRaw (object(Request), '1') 
in app/bootstrap.php.cache at line 2991   + 
at HttpKernel ->handle (object(Request), '1', true) 
in app/bootstrap.php.cache at line 3140   + 
at ContainerAwareHttpKernel ->handle (object(Request), '1', true) 
in app/bootstrap.php.cache at line 2384   + 
at Kernel ->handle (object(Request)) 
in web/app_dev.php at line 36   + 

【问题讨论】:

  • 你在 app_dev 上吗?我以前遇到过 symfonys 自定义错误处理程序的问题,由于某种我不记得的原因,我无法捕获异常。如果不是:您确定异常是 BulkException 类型吗?
  • arf,就是这样,我没有看到try catch只是过滤批量异常。
  • 顺便说一句,如果你使用ES进行日志记录,那么使用logstash。记录应该尽可能快,因此异步是首选方式。你可以使用独白。

标签: php symfony doctrine-orm elasticsearch foselasticabundle


【解决方案1】:

消息队列完全符合您的要求。每当您的模型更新时,您都会向 MQ 发送一条消息。这就是网络进程。然后,您有一个工作人员池,它们使用来自 MQ 的消息并尝试更新 ES 索引。如果 ES 现在宕机了,就会出现异常,worker 死掉,消息返回队列。所以消息仍然在 MQ 中,一旦 ES 在线工作人员就可以完成他们的工作。

相同的模式不仅可以用于 ES,还可以用于任何其他 3rd 方服务。例如,您想发送一封非常重要的电子邮件,但邮件服务器已关闭,您不能等待,现在您必须向客户发送回复。所以把它放到 MQ 中,让代理和工作人员完成他们的工作。

这是如何使用enqueue MQ 库来完成的代码。安装和配置是pretty easy to do所以我就跳过了。

标准监听器必须替换为发送消息的监听器:

<?php
use Enqueue\Client\ProducerInterface;

class ElasticaUpdateIndexListener
{
    private $producer;

    public function __construct(ProducerInterface $producer)
    {
        $this->producer = $producer;
    }

    public function postPersist(LifecycleEventArgs $eventArgs)
    {
        $entity = $eventArgs->getObject();

        $this->producer->sendCommand('elastica_index_entity', [
            'entity' => $entity->getId(),
            'type' => 'insert'
        ]);
    }

    public function postUpdate(LifecycleEventArgs $eventArgs)
    {
        $entity = $eventArgs->getObject();

        $this->producer->sendCommand('elastica_index_entity', [
            'entity' => $entity->getId(),
            'type' => 'update'
        ]);
    }

    public function preRemove(LifecycleEventArgs $eventArgs)
    {
        $entity = $eventArgs->getObject();

        $this->producer->sendCommand('elastica_index_entity', [
            'entity' => $entity->getId(),
            'type' => 'delete'
        ]);
    }
}

此消息的处理器如下所示:

<?php

class ElasticaUpdateIndexProcessor implements PsrProcessor, CommandSubscriberInterface
{
    private $doctrine;

    protected $objectPersister;

    protected $propertyAccessor;

    private $indexable;

    public function __construct(Registry $doctrine, ObjectPersisterInterface $objectPersister, IndexableInterface $indexable)
    {
        $this->indexable = $indexable;
        $this->objectPersister = $objectPersister;
        $this->propertyAccessor = PropertyAccess::createPropertyAccessor();
        $this->doctrine = $doctrine;
    }

    public function process(PsrMessage $message, PsrContext $context)
    {
        $data = JSON::encode($message->getBody());

        if ($data['type'] == 'delete') {
            $this->objectPersister->deleteManyByIdentifiers([$data['entityId']]);

            return self::ACK;
        } 

        if (false == $entity = $this->doctrine->getManagerForClass($data['entityClass'])->find($data['entityId'])) {
            return self::REJECT;
        }

        if (false == ($this->objectPersister->handlesObject($entity) && $this->isObjectIndexable($entity))) {
            return self::ACK;
        }

        if ($data['type'] == 'insert') {
            $this->objectPersister->insertMany([$this->scheduledForInsertion]);

            return self::ACK;
        }

        if ($data['type'] == 'update') {
            $this->objectPersister->replaceMany([$this->scheduledForInsertion]);

            return self::ACK;
        }

        return self::REJECT;
    }

    private function isObjectIndexable($object)
    {
        return $this->indexable->isObjectIndexable(
            $this->config['indexName'],
            $this->config['typeName'],
            $object
        );
    }

    public static function getSubscribedCommand()
    {
        return 'elastica_index_entity';
    }
}

并运行一些工人:

./bin/console enqueue:consume --setup-broker -vvv 

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-10
    • 2017-03-25
    • 2015-12-18
    • 1970-01-01
    相关资源
    最近更新 更多