【问题标题】:@Tailable(spring-data-reactive-mongodb) equivalent in spring-data-r2dbc@Tailable(spring-data-reactive-mongodb) 等效于 spring-data-r2dbc
【发布时间】:2020-07-04 07:03:54
【问题描述】:

我正在尝试使用 spring-data-r2dbc。我在 Postgresql 上试试这个。我之前尝试过 spring-data-mongodb-reactive 。我忍不住比较了两者。

我发现查询派生还不支持。但我想知道@Tailable 是否有等价物。这样我就可以实时收到数据库更改的通知。任何人都可以分享与此相关的任何代码示例。

我了解底层数据库应该支持这一点。我相信 Postgresql 确实支持这种使用逻辑解码的东西(如果我在这里错了,请纠正我)。

spring-data-r2dbc 中是否有 @Tailable 等效项?

【问题讨论】:

    标签: postgresql spring-boot spring-data spring-data-r2dbc


    【解决方案1】:

    我遇到了同样的问题,不确定您是否找到了解决方案,但我能够通过执行以下操作来完成类似的事情。首先,我在表中添加了触发器

    CREATE TRIGGER trigger_name
        AFTER INSERT OR DELETE OR UPDATE 
        ON table_name
        FOR EACH ROW
        EXECUTE PROCEDURE trigger_function_name;
    

    每当更新、删除或插入行时,这都会在表上设置触发器。然后它会调用我设置的触发函数,看起来像这样:

    CREATE FUNCTION trigger_function_name
    RETURNS trigger
    LANGUAGE 'plpgsql'
    COST 100
    VOLATILE NOT LEAKPROOF
    AS 
    $BODY$
    DECLARE
        payload JSON;
    BEGIN
        payload = row_to_json(NEW);
        PERFORM pg_notify('notification_name', payload::text);
        RETURN NULL;
    END;
    $BODY$;
    

    这将允许我从我的 Spring Boot 项目中“收听”任何这些更新,并将整个行作为有效负载发送。 接下来,在我的 Spring Boot 项目中,我配置了与我的数据库的连接。

    @Configuration
    @EnableR2dbcRepositories("com.(point to wherever repository is)")
    public class R2DBCConfig extends AbstractR2dbcConfiguration {
        @Override
        @Bean
        public ConnectionFactory connectionFactory() {
            return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
                    .host("host")
                    .database("db")
                    .port(port)
                    .username("username")
                    .password("password")
                    .schema("schema")
                    .connectTimeout(Duration.ofMinutes(2))
                    .build());
        }
    }
    

    我将其自动装配(依赖注入)到我的服务类的构造函数中,并将其转换为 r2dbc PostgressqlConnection 类,如下所示:

    this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();
    

    现在我们想“监听”我们的表并在对我们的表执行一些更新时得到通知。为此,我们使用 @PostContruct 注释设置了一个在依赖注入后执行的初始化方法

    @PostConstruct
    private void postConstruct() {
        postgresqlConnection.createStatement("LISTEN notification_name").execute()
                .flatMap(PostgresqlResult::getRowsUpdated).subscribe();
    }
    

    请注意,我们会监听我们在 pg_notify 方法中放置的任何名称。我们还想设置一个方法来在 bean 即将被扔掉时关闭连接,如下所示:

    @PreDestroy
    private void preDestroy() {
        postgresqlConnection.close().subscribe();
    }
    

    现在我只需创建一个方法,该方法返回当前表中任何内容的 Flux,并将它与通知合并,正如我在通知以 json 形式出现之前所说的那样,所以我必须反序列化它,然后我决定使用 ObjectMapper。所以,它看起来像这样:

    private Flux<YourClass> getUpdatedRows() {
        return postgresqlConnection.getNotifications().map(notification -> {
            try {
                //deserialize json
                return objectMapper.readValue(notification.getParameter(), YourClass.class);
            } catch (IOException e) {
                //handle exception
            }
        });
    }
    
    public Flux<YourClass> getDocuments() {
        return documentRepository.findAll().share().concatWith(getUpdatedRows());
    }
    

    希望这会有所帮助。 干杯!

    【讨论】:

      猜你喜欢
      • 2020-11-29
      • 2020-06-21
      • 2019-08-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-09-22
      • 1970-01-01
      • 2022-11-07
      相关资源
      最近更新 更多