【问题标题】:Implementing a Custom Process Strategy with Apache Camel File Component使用 Apache Camel 文件组件实现自定义流程策略
【发布时间】:2011-03-10 20:18:59
【问题描述】:

问题背景

我目前正在开发一个基于骆驼的 ETL 应用程序,该应用程序处理出现在日期目录中的文件组。这些文件需要作为一个由文件名开头确定的组一起处理。只有将完成的文件(“.flag”)写入目录后,才能处理这些文件。我知道骆驼文件组件有一个完成文件选项,但这只允许您检索与完成文件同名的文件。应用程序需要持续运行并在日期滚动时开始轮询第二天的目录。

示例目录结构:

 /process-directory
      /03-09-2011
      /03-10-2011
           /GROUPNAME_ID1_staticfilename.xml
           /GROUPNAME_staticfilename2.xml
           /GROUPNAME.flag
           /GROUPNAME2_ID1_staticfilename.xml
           /GROUPNAME2_staticfilename2.xml
           /GROUPNAME2_staticfilename3.xml
           /GROUPNAME2.flag


迄今为止的尝试

我有以下启动处理的路线(名称被混淆):

@Override
public void configure() throws Exception 
{
    getContext().addEndpoint("processShare", createProcessShareEndpoint());

    from("processShare")
        .process(new InputFileRouter())
        .choice()
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE1 + "'")
                .to("seda://type1?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE2 + "'")
                .to("seda://type2?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE3 + "'")
                .to("seda://type3?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE4 + "'")
                .to("seda://type4?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE5 + "'")
                .to("seda://type5?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE6 + "'")
                .to("seda://type6?size=1")
            .when()
                .simple("${header.processorName} == '" + InputFileType.TYPE7 + "'")
                .to("seda://type7?size=1")
            .otherwise()
                .log(LoggingLevel.FATAL, "Unknown file type encountered during processing! --> ${body}");
}


我的问题是如何配置文件端点。我目前正在尝试以编程方式配置端点,但运气不佳。到目前为止,我在骆驼方面的经验主要是使用 Spring DSL 而不是 Java DSL。

我尝试实例化 FileEndpoint 对象的路线,但每当路线构建时,我都会收到一条错误消息,指出文件属性为空。我相信这是因为我应该创建一个 FileComponent 而不是端点。我不会在不使用 uri 的情况下创建端点,因为我无法使用 uri 在目录名称中指定动态日期。

private FileEndpoint createProcessShareEndpoint() throws ConfigurationException
    {
        FileEndpoint endpoint = new FileEndpoint();

        //Custom directory "ready to process" implementation.
        endpoint.setProcessStrategy(getContext().getRegistry().lookup(
                "inputFileProcessStrategy", MyFileInputProcessStrategy.class));

        try 
        {
            //Controls the number of files returned per directory poll.
            endpoint.setMaxMessagesPerPoll(Integer.parseInt(
                    PropertiesUtil.getProperty(
                            AdapterConstants.OUTDIR_MAXFILES, "1")));
        } 
        catch (NumberFormatException e) 
        {
            throw new ConfigurationException(String.format(
                    "Property %s is required to be an integer.", 
                    AdapterConstants.OUTDIR_MAXFILES), e);
        }

        Map<String, Object> consumerPropertiesMap = new HashMap<String, Object>();

        //Controls the delay between directory polls.
        consumerPropertiesMap.put("delay", PropertiesUtil.getProperty(
                AdapterConstants.OUTDIR_POLLING_MILLIS));

        //Controls which files are included in directory polls.
        //Regex that matches file extensions (eg. {SOME_FILE}.flag)
        consumerPropertiesMap.put("include", "^.*(." + PropertiesUtil.getProperty(
                AdapterConstants.OUTDIR_FLAGFILE_EXTENSION, "flag") + ")");

        endpoint.setConsumerProperties(consumerPropertiesMap);

        GenericFileConfiguration configuration = new GenericFileConfiguration();

        //Controls the directory to be polled by the endpoint.
        if(CommandLineOptions.getInstance().getInputDirectory() != null)
        {
            configuration.setDirectory(CommandLineOptions.getInstance().getInputDirectory());
        }
        else
        {
            SimpleDateFormat dateFormat = new SimpleDateFormat(PropertiesUtil.getProperty(AdapterConstants.OUTDIR_DATE_FORMAT, "MM-dd-yyyy"));

            configuration.setDirectory(
                    PropertiesUtil.getProperty(AdapterConstants.OUTDIR_ROOT) + "\\" +
                    dateFormat.format(new Date()));
        }

        endpoint.setConfiguration(configuration);

        return endpoint;


问题

  1. 在这种情况下实施 GenericFileProcessingStrategy 是否正确?如果是这样,在某个地方有这样的例子吗?我查看了骆驼文件单元测试,并没有看到任何突然出现在我身上的东西。

  2. 我在配置端点时做错了什么?我觉得清理这个烂摊子的答案与问题 3 相关。

  3. 您能否将文件端点配置为在轮询和日期更改时滚动过时的文件夹?

一如既往地感谢您的帮助。

【问题讨论】:

    标签: apache-camel


    【解决方案1】:

    您可以使用 processStrategy 选项从端点 uri 引用自定义 ProcessStrategy,例如 file:xxxx?processStrategy=#myProcess。请注意我们如何在值前面加上 # 表示它应该从注册表中查找它。所以在 Spring XML 中你只需添加一个 标签

    在 Java 中,从 CamelContext API 获取端点可能更容易:

    FileEndpoint file = context.getEndpoint("file:xxx?aaa=123&bbb=456", FileEndpoint.class);
    

    这允许您预先配置端点。当然,之后您可以使用 FileEndpoint 上的 API 来设置其他配置。

    【讨论】:

      【解决方案2】:

      在 java 中,这是如何使用 GenericFileProcessingStrategy :

      @Component
      public class CustomGenericFileProcessingStrategy<T> extends GenericFileProcessStrategySupport<T> {
      public CustomFileReadyToCopyProcessStrategy() {
      }
      
      public boolean begin(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
          super.begin(operations, endpoint, exchange, file);
          ...
      }
      
      public void commit(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
          super.commit(operations, endpoint, exchange, file);
          ...
      }
      
      public void rollback(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint, Exchange exchange, GenericFile<T> file) throws Exception {
         super.rollback(operations, endpoint, exchange, file);
         ...
         }
      }
      

      然后为你创建路由 Builer 类:

      public class myRoutes() extends RouteBuilder {
          
          private final static CustomGenericFileProcessingStrategy  customGenericFileProcessingStrategy; 
      
          public myRoutes(CustomGenericFileProcessingStrategy 
       customGenericFileProcessingStrategy) {   
              this.customGenericFileProcessingStrategy  = customGenericFileProcessingStrategy ; }
          
      @Override public void configure() throws Exception {
          
          FileEndpoint fileEndPoint= camelContext.getEndpoint("file://mySourceDirectory"); 
          fileEndPoint.setProcessStrategy(myCustomGenericFileProcessingStrategy ); 
          from(fileEndPoint).setBody(...)process(...).toD(...);
          ...
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-10-06
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多