【问题标题】:org.apache.kafka.connect.errors.DataException: Struct schemas do not matchorg.apache.kafka.connect.errors.DataException:结构模式不匹配
【发布时间】:2019-09-10 14:43:08
【问题描述】:

尝试使用 java 中的 kafka.connect.data.Schema 在特定对象类型的数组模式中创建和填充字符串类型的数组模式。但是出现错误

org.apache.kafka.connect.errors.DataException:结构模式不匹配。 在 org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239) 在 org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245) 在 org.apache.kafka.connect.data.Struct.put(Struct.java:215) 在 org.apache.kafka.connect.data.Struct.put(Struct.java:204) 在 com.sintec.ra.fixture.BookProgramFixture.createAiringStruct(BookProgramFixture.java:60) 在 com.sintec.ra.handlers.BookProgramTopicHandlerTest.mockAiringStruct(BookProgramTopicHandlerTest.java:132) 在 com.sintec.ra.handlers.BookProgramTopicHandlerTest.init(BookProgramTopicHandlerTest.java:107) 在 com.sintec.ra.handlers.BookProgramTopicHandlerTest.(BookProgramTopicHandlerTest.java:76) 在 sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 在 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 在 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 在 java.lang.reflect.Constructor.newInstance(Constructor.java:423) 在 org.junit.runners.BlockJUnit4ClassRunner.createTest(BlockJUnit4ClassRunner.java:217) 在 org.junit.runners.BlockJUnit4ClassRunner$1.runReflectiveCall(BlockJUnit4ClassRunner.java:266) 在 org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 在 org.junit.runners.BlockJUnit4ClassRunner.methodBlock(BlockJUnit4ClassRunner.java:263) 在 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) 在 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) 在 org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 在 org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 在 org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 在 org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 在 org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 在 org.junit.runners.ParentRunner.run(ParentRunner.java:363) 在 org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) 在 org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 在 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538) 在 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760) 在 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460) 在 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206) 比如

"airingTo" : [ {
      "dow" : [ "SATURDAY" ]
      }]

public static Struct createAiringStruct() 
{
    Schema valueSchema = createValueSchemaForProgram();
    Struct valueStruct = new Struct(valueSchema).put("airingTo", 
    getAiringList());
    return valueStruct;
}

private static Schema createValueSchemaForProgram() 
{
    return 
    SchemaBuilder.struct().name("PROGRAM").field("airingTo", 
    SchemaBuilder.array(createAiringListSchema())).build();
}

private static Schema createAiringListSchema() 
{
   return SchemaBuilder.struct().name("airingTo").field("dow", 
   SchemaBuilder.array(Schema.STRING_SCHEMA)).build();
}

private static List<Struct> getAiringList() 
{
    Struct valueStruct = new 
    Struct(createAiringListSchema()).put("dow", 
    Arrays.asList("SATURDAY"));
    List<Struct> dayPartList = new ArrayList<>();
    dayPartList.add(valueStruct);
    return dayPartList;
}

【问题讨论】:

  • 如果您将整个堆栈跟踪添加到您的问题中会更容易调查
  • 当然可以。

标签: java apache-kafka apache-kafka-connect


【解决方案1】:

不确定你从哪里得到.name("PROGRAM"),但这似乎对我有用。

您似乎将结构名称与字段混合在一起。

    Schema dowSchema = SchemaBuilder.array(Schema.STRING_SCHEMA).build();
    Schema airingToItemSchema = SchemaBuilder.struct()
            .field("dow", dowSchema)
            .build();
    Schema airingToSchema = SchemaBuilder.array(airingToItemSchema).build();
    Schema rootSchema = SchemaBuilder.struct()
            .field("airingTo", airingToSchema);
            .build();

    Struct item = new Struct(airingToItemSchema)
            .put("dow", Collections.singletonList("SATURDAY"));
    Struct rootStruct = new Struct(rootSchema)
            .put("airingTo", Collections.singletonList(item));

    System.out.println(rootStruct);

输出

Struct{airingTo=[Struct{dow=[SATURDAY]}]}

【讨论】:

    猜你喜欢
    • 2021-05-04
    • 1970-01-01
    • 2015-08-10
    • 2023-02-09
    • 1970-01-01
    • 2023-04-04
    • 1970-01-01
    • 1970-01-01
    • 2015-03-14
    相关资源
    最近更新 更多