【发布时间】:2020-03-07 08:17:00
【问题描述】:
我阅读了大量 Gary Russell 的答案和帖子,但没有找到以下序列同步常见用例的实际解决方案:
recieve from topic A => save to DB via Spring-data => send to topic B
据我了解:在这种情况下无法保证完全原子处理,我需要在客户端处理消息重复数据删除,但主要问题是 ChainedKafkaTransactionManager 不与 JpaTransactionManager 同步强>(见下文@KafkaListener)
卡夫卡配置:
@Production
@EnableKafka
@Configuration
@EnableTransactionManagement
public class KafkaConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaConfig.class);
@Bean
public ConsumerFactory<String, byte[]> commonConsumerFactory(@Value("${kafka.broker}") String bootstrapServer) {
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(AUTO_OFFSET_RESET_CONFIG, 'earliest');
props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(MAX_POLL_RECORDS_CONFIG, 10);
props.put(MAX_POLL_INTERVAL_MS_CONFIG, 17000);
props.put(FETCH_MIN_BYTES_CONFIG, 1048576);
props.put(FETCH_MAX_WAIT_MS_CONFIG, 1000);
props.put(ISOLATION_LEVEL_CONFIG, 'read_committed');
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(
@Qualifier("commonConsumerFactory") ConsumerFactory<String, byte[]> consumerFactory,
@Qualifier("chainedKafkaTM") ChainedKafkaTransactionManager chainedKafkaTM,
@Qualifier("kafkaTemplate") KafkaTemplate<String, byte[]> kafkaTemplate,
@Value("${kafka.concurrency:#{T(java.lang.Runtime).getRuntime().availableProcessors()}}") Integer concurrency
) {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setTransactionManager(chainedKafkaTM);
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
var arbp = new DefaultAfterRollbackProcessor<String, byte[]>(new FixedBackOff(1000L, 3));
arbp.setCommitRecovered(true);
arbp.setKafkaTemplate(kafkaTemplate);
factory.setAfterRollbackProcessor(arbp);
factory.setConcurrency(concurrency);
factory.afterPropertiesSet();
return factory;
}
@Bean
public ProducerFactory<String, byte[]> producerFactory(@Value("${kafka.broker}") String bootstrapServer) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
configProps.put(BATCH_SIZE_CONFIG, 16384);
configProps.put(ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
var kafkaProducerFactory = new DefaultKafkaProducerFactory<String, byte[]>(configProps);
kafkaProducerFactory.setTransactionIdPrefix('kafka-tx-');
return kafkaProducerFactory;
}
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public KafkaTransactionManager kafkaTransactionManager(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) {
KafkaTransactionManager ktm = new KafkaTransactionManager<>(producerFactory);
ktm.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
}
@Bean
public ChainedKafkaTransactionManager chainedKafkaTM(JpaTransactionManager jpaTransactionManager,
KafkaTransactionManager kafkaTransactionManager) {
return new ChainedKafkaTransactionManager(kafkaTransactionManager, jpaTransactionManager);
}
@Bean(name = "transactionManager")
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
return new JpaTransactionManager(em);
}
}
卡夫卡监听器:
@KafkaListener(groupId = "${group.id}", idIsGroup = false, topics = "${topic.name.import}")
public void consume(List<byte[]> records, @Header(KafkaHeaders.OFFSET) Long offset) {
for (byte[] record : records) {
// cause infinity rollback (perhaps due to batch listener)
if (true)
throw new RuntimeExcetion("foo");
// spring-data storage with @Transactional("chainedKafkaTM"), since Spring-data can't determine TM among transactionManager, chainedKafkaTM, kafkaTransactionManager
var result = storageService.persist(record);
kafkaTemplate.send(result);
}
}
Spring-kafka 版本:2.3.3 春季启动版本:2.2.1
实现这种用例的正确方法是什么? Spring-kafka 文档仅限于小型/特定示例。
P.s.当我在 @KafkaListener 方法上使用 @Transactional(transactionManager = "chainedKafkaTM", rollbackFor = Exception.class) 时,我面临无限循环回滚,但是设置了 FixedBackOff(1000L, 3L)。
编辑:我计划通过可配置的重试次数在侦听器、生产者和数据库之间实现最大负担得起的同步。
编辑: 上面的代码 sn-ps 已根据建议的配置进行了编辑。使用 ARBP 并不能解决我的无限回滚循环,因为第一条语句的谓词始终为 false (SeekUtils.doSeeks):
DefaultAfterRollbackProcessor
...
@Override
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
boolean recoverable) {
if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
getSkipPredicate((List) records, exception), LOGGER)
&& isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
ConsumerRecord<K, V> skipped = records.get(0);
this.kafkaTemplate.sendOffsetsToTransaction(
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
new OffsetAndMetadata(skipped.offset() + 1)));
}
}
值得一提的是,Kafka Consumer方法(TransactionSynchronizationManager.isActualTransactionActive())中没有活跃的事务。
【问题讨论】:
标签: java spring-boot apache-kafka spring-data spring-kafka