【问题标题】:How to convert message body to pojo in AggregationStrategy如何在 AggregationStrategy 中将消息正文转换为 pojo
【发布时间】:2017-01-30 05:41:06
【问题描述】:

我有一个路由(camel 2.17.3),它使用丰富的 DSL 调用休息服务并将结果聚合到消息正文中。不过,我遇到了序列化问题。这是我正在尝试的。我的路线如下所示:

rest("myService").produces("application/json")
            .get("test")
            .param().name("text").required(true).type(RestParamType.query).description("The text to be processed.").endParam()
                .to("direct:StepA")
    ;

    from("direct:StepA")
            .removeHeader("CamelHttpQuery")
            .removeHeader("CamelHttpRawQuery")
            .enrich("netty4-http://myOtherService:8080/endpoint?input=${header.text}", new MyAggregator())
            .to("direct:StepB")
    ;

    from("direct:StepB")
            // not really implemented yet
            .transform().simple("Query: ${header.text}\nBody: ${body}")
    ;

如您所见,我想使用enrich() DSL 调用一些现有服务并将这些结果聚合到一个新的消息体中以供进一步处理。我需要将休息调用的结果从 Json 转换为 MyResponse。我想用这个:

.unmarshal().json(JsonLibrary.Jackson, MyResponse.class)

但我需要它已经在我的 AggregationStrategy 中解组,如下所示:

public class MyAggregator implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        final MyResponse intermediateResponse = oldExchange.getIn().getBody(MyResponse.class);

        final MyAggregate response = new MyAggregate(oldExchange.getIn().getHeader("text", String.class), intermediateResponse);

        newExchange.getIn().setBody(response);
        return newExchange;
}

然而,当我说 getBody(..) 时,它返回 null。所以我了解了 TypeConverters 并尝试使用它,以便系统可以自动转换为我的类型:

public class MyConverter extends TypeConverterSupport {

    private static final ObjectMapper MAPPER =
        new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);

    @Override
    public <T> T convertTo(Class<T> type, Exchange exchange, Object value) throws TypeConversionException {
        try {
            return (T)MAPPER.readValue(value.toString(), MyResponse.class);
        } catch (IOException e) {
           throw new TypeConversionException(value, type, e);
        }
}

然后在我的主要:

    public static void main(String... args) throws Exception {
        Main main = new Main();
        main.setApplicationContextUri("classpath:app-ctx.xml");

        for (final CamelContext ctx : main.getCamelContexts()) {
           ctx.getTypeConverterRegistry().addTypeConverter(MyResponse.class, String.class, new MyConverter());
        }

        main.run(args);
    }

但这似乎并没有改变结果。在我的聚合器中,MyResponse 仍然为空。我错过了一些关于如何设置这种类型转换的内容,希望有人能指出我正确的方向。

【问题讨论】:

    标签: java apache-camel


    【解决方案1】:

    好的,当我意识到我可以丰富一个执行 unmarshall 调用的子路由,然后聚合它时,我最终解决了这个问题......

    from("direct:StepA")
            .removeHeader("CamelHttpQuery")
            .removeHeader("CamelHttpRawQuery")
            .enrich("direct:StepAEnricher", new MyAggregator())
            .to("direct:StepB")
    ;
    
    from("direct:StepAEnricher")
        .toD("netty4-http://myOtherService:8080/endpoint?input=${header.text}")
        .unmarshal().json(JsonLibrary.Jackson, MyResponse.class)
    ;
    

    这很完美!我很好奇一个更有经验的骆驼开发者会如何看待这个问题。拥有这种“额外”的路线来处理这样的解组响应是否成本高昂?我仍然很好奇如何克服自定义类型转换器方法。

    【讨论】:

      猜你喜欢
      • 2014-02-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多