【问题标题】:Reading HAR file from DistributedCache in mapreduce在 mapreduce 中从 DistributedCache 读取 HAR 文件
【发布时间】:2013-03-04 12:51:09
【问题描述】:

我编写了一个 oozie 工作流,它创建 HAR 存档,然后运行需要从该存档中读取数据的 MR-job。 1. 存档创建 2. 作业运行时,mapper 确实在分布式缓存中看到存档。 3. ???我怎样才能阅读这个档案?从这个档案中逐行读取数据的 API 是什么(我的 har 是一批多个换行符分隔的文本文件)。 注意:当我使用存储在 DistirubtedCache 中的常用文件(不是 HAR 存档)时,它工作得很好。我在尝试从 HAR 读取数据时遇到问题。

这是一个代码sn-p:

    InputStream inputStream;
    String cachedDatafileName = System.getProperty(DIST_CACHE_FILE_NAME);
    LOG.info(String.format("Looking for[%s]=[%s] in DistributedCache",DIST_CACHE_FILE_NAME, cachedDatafileName));

    URI[] uris = DistributedCache.getCacheArchives(getContext().getConfiguration());
    URI uriToCachedDatafile = null;
    for(URI uri : uris){
        if(uri.toString().endsWith(cachedDatafileName)){
            uriToCachedDatafile = uri;
            break;
        }
    }
    if(uriToCachedDatafile == null){
        throw new RuntimeConfigurationException(String.format("Looking for[%s]=[%s] in DistributedCache failed. There is no such file",
                DIST_CACHE_FILE_NAME, cachedDatafileName));
    }

    Path pathToFile = new Path(uriToCachedDatafile);
    LOG.info(String.format("[%s] has been found. Uri is: [%s]. The path is:[%s]",cachedDatafileName, uriToCachedDatafile, pathToFile));

    FileSystem fileSystem =  pathToFile.getFileSystem(getContext().getConfiguration());
    HarFileSystem harFileSystem = new HarFileSystem(fileSystem);
    inputStream = harFileSystem.open(pathToFile); //NULL POINTER EXCEPTION IS HERE!
    return inputStream;

【问题讨论】:

    标签: mapreduce hdfs cloudera distributed-cache


    【解决方案1】:
    protected InputStream getInputStreamToDistCacheFile() throws IOException{
            InputStream inputStream;
            String cachedDatafileName = System.getProperty(DIST_CACHE_FILE_NAME);
            LOG.info(String.format("Looking for[%s]=[%s] in DistributedCache",DIST_CACHE_FILE_NAME, cachedDatafileName));
    
            URI[] uris = DistributedCache.getCacheArchives(getContext().getConfiguration());
            URI uriToCachedDatafile = null;
            for(URI uri : uris){
                if(uri.toString().endsWith(cachedDatafileName)){
                    uriToCachedDatafile = uri;
                    break;
                }
            }
            if(uriToCachedDatafile == null){
                throw new RuntimeConfigurationException(String.format("Looking for[%s]=[%s] in DistributedCache failed. There is no such file",
                        DIST_CACHE_FILE_NAME, cachedDatafileName));
            }
    
            //Path pathToFile = new Path(uriToCachedDatafile +"/stf/db_bts_stf.txt");
            Path pathToFile = new Path("har:///"+"home/ssa/devel/megalabs/kyc-solution/kyc-mrjob/target/test-classes/GSMCellSubscriberHomeIntersectionJobDescriptionClusterMRTest/in/gsm_cell_location_stf.har" +"/stf/db_bts_stf.txt");
            //Path pathToFile = new Path(("har://home/ssa/devel/megalabs/kyc-solution/kyc-mrjob/target/test-classes/GSMCellSubscriberHomeIntersectionJobDescriptionClusterMRTest/in/gsm_cell_location_stf.har"));
    
            LOG.info(String.format("[%s] has been found. Uri is: [%s]. The path is:[%s]",cachedDatafileName, uriToCachedDatafile, pathToFile));
            FileSystem harFileSystem = pathToFile.getFileSystem(context.getConfiguration());
            FSDataInputStream fin = harFileSystem.open(pathToFile);
            LOG.info("fin: " + fin);
    //        FileSystem fileSystem =  pathToFile.getFileSystem(getContext().getConfiguration());
    //        HarFileSystem harFileSystem = new HarFileSystem(fileSystem);
    //        harFileSystem.exists(new Path("har://home/ssa/devel/mycompany/my-solution/my-mrjob/target/test-classes/HomeJobDescriptionClusterMRTest/in/locations.har"));
    //        LOG.info("harFileSystem.exists(pathToFile):"+ harFileSystem.exists(pathToFile));
    //        harFileSystem.initialize(uriToCachedDatafile, context.getConfiguration());
    
    
    
            FileStatus[] statuses = harFileSystem.listStatus(new Path("har:///"+"har://home/ssa/devel/mycompany/my-solution/my-mrjob/target/test-classes/HomeJobDescriptionClusterMRTest/in/locations.har"));
            for(FileStatus fileStatus : statuses){
                LOG.info("fileStatus isDir"+fileStatus.isDirectory() +" len:" + fileStatus.getLen());
            }
    
    //        String tmpPathToFile = "har:///"+pathToFile.toString(); //+"/stf/db_bts_stf.txt";
    //        Path tmpPath = new Path(tmpPathToFile);
    //        LOG.info("KILL ME PATH TO FILE IN ARCHIVE: " +tmpPath);
    //        inputStream = harFileSystem.open(tmpPath);
    //        return inputStream;
            return fin;
        }
    

    如您所见,这很糟糕。您已手动读取存储在存档中的索引文件并使用索引文件元数据重建路径。如果您知道存档中存储的文件的确切名称(例如在我的示例中),则可以手动构建路径。

    这不太方便。我确实期待像 Zip->zipEntry 这样的东西,当您可以在不知道其结构的情况下迭代存档条目时。

    【讨论】:

      猜你喜欢
      • 2016-01-25
      • 2016-03-22
      • 2016-01-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-05-18
      • 1970-01-01
      • 2015-01-02
      相关资源
      最近更新 更多