【问题标题】:How do I configure spring-kafka to ignore messages in the wrong format?如何配置 spring-kafka 以忽略格式错误的消息?
【发布时间】:2021-09-08 17:37:12
【问题描述】:

我们的一个 Kafka 主题存在问题,该主题由 here 描述的 DefaultKafkaConsumerFactoryConcurrentMessageListenerContainer 组合与工厂使用的 JsonDeserializer 组合使用。不幸的是,有人有点热情,并在该主题上发布了一些无效消息。似乎 spring-kafka 默默地无法处理这些消息中的第一个。是否可以让 spring-kafka 记录错误并继续?查看记录的错误消息,Apache kafka-clients 库似乎应该处理在迭代一批消息时,其中一个或多个消息可能无法解析的情况?

以下代码是说明此问题的示例测试用例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasKey;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue;

/**
 * @author jfreedman
 */
public class TestSpringKafka {
    private static final String TOPIC1 = "spring.kafka.1.t";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, TOPIC1);

    @Test
    public void submitMessageThenGarbageThenAnotherMessage() throws Exception {
        final BlockingQueue<ConsumerRecord<String, JsonObject>> records = createListener(TOPIC1);
        final KafkaTemplate<String, JsonObject> objectTemplate = createPublisher("json", new JsonSerializer<JsonObject>());

        sendAndVerifyMessage(records, objectTemplate, "foo", new JsonObject("foo"), 0L);

        // push some garbage text to Kafka which cannot be marshalled, this should not interrupt processing
        final KafkaTemplate<String, String> garbageTemplate = createPublisher("garbage", new StringSerializer());
        final SendResult<String, String> garbageResult = garbageTemplate.send(TOPIC1, "bar","bar").get(5, TimeUnit.SECONDS);
        assertEquals(1L, garbageResult.getRecordMetadata().offset());

        sendAndVerifyMessage(records, objectTemplate, "baz", new JsonObject("baz"), 2L);
    }

    private <T> KafkaTemplate<String, T> createPublisher(final String label, final Serializer<T> serializer) {
        final Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "TestPublisher-" + label);
        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
        producerProps.put(ProducerConfig.RETRIES_CONFIG, 2);
        producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
        producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer.getClass());
        final DefaultKafkaProducerFactory<String, T> pf = new DefaultKafkaProducerFactory<>(producerProps);
        pf.setValueSerializer(serializer);
        return new KafkaTemplate<>(pf);
    }

    private BlockingQueue<ConsumerRecord<String, JsonObject>> createListener(final String topic) throws Exception {
        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "TestConsumer");
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        final DefaultKafkaConsumerFactory<String, JsonObject> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        cf.setValueDeserializer(new JsonDeserializer<>(JsonObject.class));
        final KafkaMessageListenerContainer<String, JsonObject> container = new KafkaMessageListenerContainer<>(cf, new ContainerProperties(topic));
        final BlockingQueue<ConsumerRecord<String, JsonObject>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, JsonObject>) records::add);
        container.setBeanName("TestListener");
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
        return records;
    }

    private void sendAndVerifyMessage(final BlockingQueue<ConsumerRecord<String, JsonObject>> records,
                                      final KafkaTemplate<String, JsonObject> template,
                                      final String key, final JsonObject value,
                                      final long expectedOffset) throws InterruptedException, ExecutionException, TimeoutException {
        final ListenableFuture<SendResult<String, JsonObject>> future = template.send(TOPIC1, key, value);
        final ConsumerRecord<String, JsonObject> record = records.poll(5, TimeUnit.SECONDS);
        assertThat(record, hasKey(key));
        assertThat(record, hasValue(value));
        assertEquals(expectedOffset, future.get(5, TimeUnit.SECONDS).getRecordMetadata().offset());
    }

    public static final class JsonObject {
        private String value;

        public JsonObject() {}

        JsonObject(final String value) {
            this.value = value;
        }

        public String getValue() {
            return value;
        }

        public void setValue(final String value) {
            this.value = value;
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o) { return true; }
            if (o == null || getClass() != o.getClass()) { return false; }
            final JsonObject that = (JsonObject) o;
            return Objects.equals(value, that.value);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value);
        }

        @Override
        public String toString() {
            return "JsonObject{" +
                    "value='" + value + '\'' +
                    '}';
        }
    }
}

【问题讨论】:

  • 为此目的,Confluent 平台具有模式注册表:github.com/confluentinc/schema-registry 它包含每个主题的模式,当您生成\消费消息时,它将根据特定模式验证消息。您是否有意避免使用它?
  • 哦,根据这个链接:cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics 这只有在使用 Apache Avro 作为序列化器\反序列化器时才有可能。
  • 是的,我们不使用 Avro 或 Protofbuf,只使用普通 JSON
  • 为什么ErrorHandler 对你来说还不够?
  • 你能说明如何使用ErrorHandler吗?

标签: error-handling apache-kafka spring-kafka


【解决方案1】:

我有一个解决方案,但我不知道它是否是最好的,我将 JsonDeserializer 扩展如下,这导致 spring-kafka 消耗了 null 值,并且需要进行必要的下游更改来处理这种情况.

class SafeJsonDeserializer[A >: Null](targetType: Class[A], objectMapper: ObjectMapper) extends JsonDeserializer[A](targetType, objectMapper) with Logging {
  override def deserialize(topic: String, data: Array[Byte]): A = try {
    super.deserialize(topic, data)
  } catch {
    case e: Exception =>
      logger.error("Failed to deserialize data [%s] from topic [%s]".format(new String(data), topic), e)
      null
  }
}

【讨论】:

  • 你的解决方案是正确的;这不是一个真正的 Spring 问题,因为如果在反序列化器中失败,Spring 不会看到消息。我想我们可以更改反序列化器以执行与您类似的操作,但 null 可能不是要返回的正确“对象”(因为在使用压缩主题时,这在 Kafka 中有意义)。随时提交 PR。
【解决方案2】:

spring-kafka-2.x.x 开始,我们现在可以轻松地在配置文件中为接口KafkaListenerErrorHandler 声明bean,实现类似于

@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
    return (m, e, c) -> {
        this.listen3Exception = e;
        MessageHeaders headers = m.getHeaders();
        c.seek(new org.apache.kafka.common.TopicPartition(
                headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
                headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
                headers.get(KafkaHeaders.OFFSET, Long.class));
        return null;
    };
}

更多资源可以在https://docs.spring.io/spring-kafka/reference/htmlsingle/#annotation-error-handling找到。还有另一个链接与类似问题:Spring Kafka error handling - v1.1.xHow to handle SerializationException after deserialization

【讨论】:

  • 你试过了吗?在SerializationException 的情况下,它看起来甚至没有进入这个错误处理程序,只到达容器错误处理程序。
  • 在 SerializationException 上它没有遇到上述异常。对我有用的是 ErrorHandler() bean。创建后,将其设置在工厂中。 factory.getContainerProperties().setErrorHandler(errorBean());
【解决方案3】:

使用 ErrorHandlingDeserializer2。这是一个委托键/值反序列化器,它捕获异常,在标头中将它们作为序列化的 java 对象返回。

在消费者配置下,添加/更新以下行:

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[ErrorHandlingDeserializer2[JsonDeserializer]].getName)
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ErrorHandlingDeserializer2[StringDeserializer]].getName)
configProps.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, classOf[StringDeserializer].getName)
configProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, classOf[JsonDeserializer].getName)

【讨论】:

    猜你喜欢
    • 2011-07-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-07
    • 1970-01-01
    • 1970-01-01
    • 2017-10-31
    相关资源
    最近更新 更多