【发布时间】:2019-09-10 14:43:08
【问题描述】:
尝试使用 java 中的 kafka.connect.data.Schema 在特定对象类型的数组模式中创建和填充字符串类型的数组模式。但是出现错误
org.apache.kafka.connect.errors.DataException:结构模式不匹配。 在 org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239) 在 org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245) 在 org.apache.kafka.connect.data.Struct.put(Struct.java:215) 在 org.apache.kafka.connect.data.Struct.put(Struct.java:204) 在 com.sintec.ra.fixture.BookProgramFixture.createAiringStruct(BookProgramFixture.java:60) 在 com.sintec.ra.handlers.BookProgramTopicHandlerTest.mockAiringStruct(BookProgramTopicHandlerTest.java:132) 在 com.sintec.ra.handlers.BookProgramTopicHandlerTest.init(BookProgramTopicHandlerTest.java:107) 在 com.sintec.ra.handlers.BookProgramTopicHandlerTest.(BookProgramTopicHandlerTest.java:76) 在 sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 在 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 在 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 在 java.lang.reflect.Constructor.newInstance(Constructor.java:423) 在 org.junit.runners.BlockJUnit4ClassRunner.createTest(BlockJUnit4ClassRunner.java:217) 在 org.junit.runners.BlockJUnit4ClassRunner$1.runReflectiveCall(BlockJUnit4ClassRunner.java:266) 在 org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 在 org.junit.runners.BlockJUnit4ClassRunner.methodBlock(BlockJUnit4ClassRunner.java:263) 在 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) 在 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) 在 org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 在 org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 在 org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 在 org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 在 org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 在 org.junit.runners.ParentRunner.run(ParentRunner.java:363) 在 org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) 在 org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 在 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538) 在 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760) 在 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460) 在 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206) 比如
"airingTo" : [ {
"dow" : [ "SATURDAY" ]
}]
public static Struct createAiringStruct()
{
Schema valueSchema = createValueSchemaForProgram();
Struct valueStruct = new Struct(valueSchema).put("airingTo",
getAiringList());
return valueStruct;
}
private static Schema createValueSchemaForProgram()
{
return
SchemaBuilder.struct().name("PROGRAM").field("airingTo",
SchemaBuilder.array(createAiringListSchema())).build();
}
private static Schema createAiringListSchema()
{
return SchemaBuilder.struct().name("airingTo").field("dow",
SchemaBuilder.array(Schema.STRING_SCHEMA)).build();
}
private static List<Struct> getAiringList()
{
Struct valueStruct = new
Struct(createAiringListSchema()).put("dow",
Arrays.asList("SATURDAY"));
List<Struct> dayPartList = new ArrayList<>();
dayPartList.add(valueStruct);
return dayPartList;
}
【问题讨论】:
-
如果您将整个堆栈跟踪添加到您的问题中会更容易调查
-
当然可以。
标签: java apache-kafka apache-kafka-connect