【问题标题】:Event Sourcing With an Event Store and an ORM使用事件存储和 ORM 进行事件溯源
【发布时间】:2019-02-05 22:24:55
【问题描述】:

我正在使用 Django 为微服务架构构建身份验证服务。它更像是一个实验而不是真实世界的实现,以了解更多关于事件溯源的信息。

我确实了解 Django 有一个与框架非常耦合的 ORM,使用 Flask 将是一种更简单的方法,但我正在尝试找出解决方法。

用例非常简单。用户注册他会收到一封电子邮件来激活他的帐户。用户再次被激活我会发送一封电子邮件通知他他的帐户处于活动状态。

据我了解,在事件源系统中。例如,事件被触发和存储,我们可以获取最新状态并将其存储在数据库中。在我的情况下,它将是使用 Django ORM 的 Postgres。

我使用 django 中的信号将事件发布到 kafka。 pre_save() 信号。然后模型将保存对象。

如果更新尚未在下面实施。我只会发布更新的字段。并更新 Django 中的对象。

有没有人看到这种方法的任何警告,或者在模型的保存方法中实现它会更好吗?

我很想听听您对此的反馈。

# app/services.py

class KafkaService:

    def __init__(self):
        try:
            self.producer = KafkaProducer(bootstrap_servers=settings.KAFKA_BROKERS,
                                          value_serializer=lambda m: json.dumps(m).encode('ascii'))
        except KafkaError as ke:
            logger.exception(f"Something went wrong creating a kafka producer: {ke}")
            self.producer = None
        except Exception as ex:
            logger.exception(f"Something went wrong creating a kafka producer: {ex}")
            self.producer = None

    def publish(self, topic, key, data):
        if not self.producer:
            logger.error(f"Kafka Producer could not establish a connection")
            pass
        try:
            self.producer.send(topic, key=bytes(key, encoding='utf-8'), value=data)
            self.producer.flush()
            logger.info("Message has been published to Kafka")
        except Exception as ex:
            logger.info(f"Errored while publishing to Kafka: {ex}")
# app/events.py

class UserEvent:

    def __init__(self, event_store):  # event store here is Kafka
        """ A user event class which is an injectable. I am using here.
        I need a key for kafka to place the correct event in the correct partition.

        :parameters:
            - event_store: a class form example :class:`KafkaService` which publishes data
        """
        self.event_store = event_store

    def user_created(self, email, data):
        """ Publish an event to the event store when a user is created

        :param email: string
        :param data: string - json
        """
        self.event_store.publish('user-created', email, data)

    def user_activated(self, email, data):
        """ Publish an event to the event store when a user is activated """
        self.event_store.publish('user-activated', email, data)
# app/signals.py

kafka_service = KafkaService()

user_event = UserEvent(kafka_service)

def user_added_event(sender, instance, created, **kwargs):
    if created:
        from users.api.v1.serializers import UserSerializer  # Avoid (Apps Not Read)

        value = UserSerializer(instance).data
        user_event.user_created(instance.email, value)

    else:
        logger.info("User Updated")

【问题讨论】:

    标签: django orm apache-kafka cqrs event-sourcing


    【解决方案1】:

    有没有人看到这种方法的任何警告,或者在模型的保存方法中实现它会更好吗?

    通常的设计将域模型(也称为业务逻辑)与持久性问题分开。

    如果事件序列是您的事实来源,那么以下两件事:首先,您要确保在事件发布之前成功存储事件。其次,您要确保您的写入语义是“先写者获胜”。

    如果你做对了,并且每个人都明白数据库中的模型可以及时“落后”于事件存储中的事件,那么你的状态就很好。

    最终一致的系统使“读取您自己的写入”的期望具有挑战性。所以你可能有一些额外的工作。

    【讨论】:

      猜你喜欢
      • 2020-05-18
      • 1970-01-01
      • 2018-08-31
      • 2021-10-01
      • 1970-01-01
      • 2020-05-27
      • 1970-01-01
      • 2021-07-25
      • 1970-01-01
      相关资源
      最近更新 更多