【发布时间】:2023-03-23 10:33:01
【问题描述】:
我有一个简单的代码,它将一些数据保存在数据库中并返回保存的值
public Flux<Person> savePersons(Flux<Person> persons){
Mono<Integer> lastPersonId = getLastPersonId();
AtomicInteger idNum = new AtomicInteger();
lastPersonId.log().subscribe(idNum::set);
System.out.println("id num "+idNum.get()); //Obviously zero since the data is not available yet from lastPersonId Mono
Flux<Person> personsWithId = persons.map(person->{
person.setId(idNum.getAndIncrement());
return person;
});
StringBuilder builder = new StringBuilder();
//Ignore the Sql Injection, was left as such for brevity.
personsWithId.subscribe(person-> builder.append("insert into person VALUES("+person.getId()+",\""+person.getName()+"\""));
return dbClient.execute(builder.toString()).fetch().rowsUpdated();//This fails as well, the builder has insert statements albeit with null Ids. since Id is not set yet.
}
在保存数据之前,我试图检索最后一个 ID 号并将其用于插入语句。
如何确保这些Publisher 被链接。
P.S:我了解 SQL 注入的风险。我将这些查询格式发送到外部实用程序以清理输入。为简洁起见,我省略了那段代码。我没有使用批处理,因为 mssql-r2dbc 似乎不支持批处理。
更新:就有人从 Imperatve 编码转换而言,这个问题更像是一个新手问题,应该被忽略。 在一些 cmets 的帮助下,在 SO 中回答,在写了一些代码之后,我写了一些我能理解的东西。
public Mono<Integer> savePersons(Flux<Person> persons){
AtomicInteger idNum = new AtomicInteger();
StringBuilder builder = new StringBuilder();
return getLastPersonId() //returns a Mono<Integer>
.map(AtomicInteger::new)
.flatMapMany(val->assignIds(val, persons))
.map(person->Sqllib.escapedSql(builder, person))
.last()
.map(val->dbClient.execute(builder.toString()))
.fetch()
.rowsUpdated();
}
private Flux<Person> assignIds(AtomicInteger val, Flux<Person> persons) {
return persons.map(person-> {
person.setId(val.getAndIncrement());
return person;
});
}
【问题讨论】:
-
可以对
Flux<Person> persons&Mono<Integer> lastPersonId的操作进行ZIP操作 -
@AdityaRewari 您认为 ZIP 在这种情况下会有什么用处?
-
为什么要自己写sql?
-
@123 因为我使用的 r2dbc 客户端本身不支持批处理。
-
您为什么要在自己的应用程序中订阅。发起呼叫的人是订阅者,您的应用程序是生产者,客户端订阅,除非您的应用程序是发起呼叫的人并且您没有客户端。
标签: java spring-webflux project-reactor