【问题标题】:push avro message to kafka topic将 avro 消息推送到 kafka 主题
【发布时间】:2020-08-14 14:15:05
【问题描述】:

我正在尝试使用 kafka-node-avro

将数据推送到现有主题的库。 我已经使用 curl 和主题将模式添加到 SchemaRegistry 中。 我收到以下错误:

Error: Topic name or id is needed to build Schema
    at new Shema (/work/node_modules/kafka-node-avro/lib/schema.js:7:41)
    at Object.Pool.getByName (/work/node_modules/kafka-node-avro/lib/schemaPool.js:70:10)
    at new Promise (<anonymous>)

我的代码 sn-p 如下:

const Settings = {
    "kafka" : {
        "kafkaHost" : config.KafkaHost
    },
    "schema": {
        "registry" : config.KafkaRegistry
    }
};

console.log("settings registry: ", config.KafkaRegistry);
console.log("settings kafkaHost: ", config.KafkaHost)
KafkaAvro.init(Settings).then( kafka => {
const producer = kafka.addProducer();
let payloads = [
    {
        topic: 'classifier-response-test',
        messages: JSON.stringify(kafkaData)
    }
];
producer.send(payloads).then( success => {
// Message was sent encoded with Avro Schema
    console.log("message sent ! Awesome ! :) ")
}, error => {
        // Something wrong happen
        console.log("There seems that there is a mistake ! try Again ;) ")
        console.log(error)
    });
} , error => {
    // something wrong happen
    console.log("There seems that there is a global mistake ! try Again ;) ")
    console.log(error)
});

【问题讨论】:

    标签: node.js apache-kafka kafka-node


    【解决方案1】:

    该库尚未准备好send 消息列表,这是它尝试在发送机制中动态地将 Schema 添加到 Schema Pool 时报错的原因。最简单的解决方案是一次发送 1 条消息,在您的代码示例中可能类似于

    const Settings = {
      "kafka" : {
        "kafkaHost" : config.KafkaHost
      },
      "schema": {
        "registry" : config.KafkaRegistry
      }
    };
    
    KafkaAvro.init(Settings).then( kafka => {
      kafka.send({
        topic: 'classifier-response-test',
        messages: JSON.stringify(kafkaData)
      }).then( success => {
        // Message was sent encoded with Avro Schema
        console.log("message sent ! Awesome ! :) ")
      }, error => {
        // Something wrong happen
        console.log("There seems that there is a mistake ! try Again ;) ")
        console.log(error)
      });
    } , error => {
      // something wrong happen
      console.log("There seems that there is a global mistake ! try Again ;) ")
      console.log(error)
    });
    

    感谢您使用 kafka-node-avro

    【讨论】:

      【解决方案2】:

      问题是我们需要将主题列表放入架构设置中,以放置主题和架构注册表之间的链接。 我们可以输入模式 id 或主题名称。

      const kafkaSettings  = {
          "kafka" : {
            "kafkaHost" : config.KafkaHost
          },
          "schema": {
            "registry" : config.KafkaRegistry,
            "topics": [
              {"name": "classifier-response-test" }
          ]
          }
        };
      

      【讨论】:

      • 如果您添加这样的主题,您将收到连接错误。
      • 它对我有用
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-05-15
      • 2018-01-12
      • 2019-01-10
      • 2019-10-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多