【问题标题】:How to use File lock in Spring batch-integration?如何在 Spring 批处理集成中使用文件锁?
【发布时间】:2016-05-10 23:24:52
【问题描述】:

在我的 spring-batch-integration 应用程序中,文件轮询为每个文件调用批处理作业,并且该应用程序可以在多个服务器(节点)上运行,但它们都应该读取一个公共目录。现在,我编写了一个自定义储物柜锁定文件,这样任何其他实例都无法处理同一个文件。代码如下

public class MyFileLocker extends AbstractFileLockerFilter{

private final ConcurrentMap<File, FileLock> lockCache = new ConcurrentHashMap<File, FileLock>();
private final ConcurrentMap<File, FileChannel> ChannelCache = new ConcurrentHashMap<File, FileChannel>();

@Override
public boolean lock(File fileToLock) {
    FileChannel channel;
    FileLock lock;
    try {
        channel = new RandomAccessFile(fileToLock, "rw").getChannel();
        lock = channel.tryLock();

        if (lock == null || !lock.isValid()) {  
            System.out.println(" Problem in acquiring lock!!" + fileToLock);
            return false;
        }
        lockCache.put(fileToLock, lock);
        ChannelCache.put(fileToLock, channel);

    } catch (FileNotFoundException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    return true;
}

@Override
public boolean isLockable(File file) {

    return file.canWrite();
}

@Override
public void unlock(File fileToUnlock) {
    FileLock lock = lockCache.get(fileToUnlock);
    try {
    if(lock!=null){
        lock.release();
        ChannelCache.get(fileToUnlock).close();
    }
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

}

}

现在,当我调用我的 Spring 批处理并尝试使用 flatfileitemreader 读取该文件时,它给了我

org.springframework.batch.item.file.NonTransientFlatFileException

我相信这是因为文件被锁定了。我做了一些谷歌搜索,发现 NIOLocker 以一种即使当前线程也无法读取文件的方式锁定文件。我找到了link,它显示了如何读取锁定的文件,但它们正在使用缓冲区。 如何让我的 FlatfileItemReader 可以访问我的文件。

请提出建议。

【问题讨论】:

    标签: spring spring-integration spring-batch


    【解决方案1】:

    是的,您确实只能通过ByteBuffer 访问锁定的文件内容:

    FileChannel fileChannel = channelCache.get(lockedFile);
    ByteBuffer byteBuffer = ByteBuffer.allocate((int) fileChannel.size());
    fileChannel.read(byteBuffer);
    System.out.println("Read File " + lockedFile.getName() + " with content: " + new String(byteBuffer.array()));
    

    哦!是的。你真的指向了我的回购 :-)。

    因此,对于locker,您别无选择,除非在FlatfileItemReader 之前复制/粘贴文件byte[],或者只是将一些自定义BufferedReaderFactory 注入相同的FlatfileItemReader,这将转换锁定的文件到相应的BufferedReader:

    new BufferedReader(new CharArrayReader(byteBuffer.asCharBuffer().array()));
    

    【讨论】:

    • 在实际生产场景中,我正在读取的文件的大小可能高达 GB,并且可能有数千个这样的文件。你建议创建这么大的缓冲区吗?
    • 是的...我对ByteBuffers 不太熟悉,也无法说明它们在内部是如何工作的。另外,我也只能找到读取锁定文件的方法。可能人们只是以其他方式进行锁定,并且可能不认为这种FileChannel 方法有用。您可以考虑将数据从锁定文件转移到临时文件。并且已经在FlatfileItemReader 中逐行阅读了最后一行。 FileChannel.transferTo() 就是为此目的。
    【解决方案2】:

    根据您分享的link,您可以尝试以下方法。

    //This is from your link (except the size variable)
    FileChannel fileChannel = channelCache.get(lockedFile);
    int size = (int) fileChannel.size();
    ByteBuffer byteBuffer = ByteBuffer.allocate(size);
    fileChannel.read(byteBuffer);
    
    //Additional code that you could try
    byte[] bArray = new byte[size];
    
    //Write data to the byte array
    byteBuffer.get(bArray);
    
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new ByteArrayResource(bArray));
    
    //Next you can try reading from your flatFileItemReader as usual
    ...
    

    如果它没有解决您的问题,请告诉我。

    【讨论】:

    • 既然你说过,你的 prod 实现可能包含高达多个 GB 的文件,你可以在单独的线程中读取锁定的文件并继续将其写入缓冲区,该缓冲区最终将由 FlatFileItemReader 在在InputStreamResource的帮助下分离线程
    • 我已经为此发布了答案。你看到我必须处理的任何问题吗??
    • @VarunMaurya ,我无法在那里发表评论,所以在这里写评论。您的解决方案对我来说看起来不错,但它是否解决了海量数据的问题?如果它正在流式传输数据以写入输出而不是将所有数据加载到内存中,那很好。否则应该采用其他一些流机制,如 BufferedInputStream。你有机会测试它吗?
    【解决方案3】:

    解决方案:我正在创建一个包含锁定文件内容的临时文件并对其进行处理。处理完成后,我存档该文件并删除锁定和临时的两个文件。这里的关键是创建一个具有锁定文件内容的新文件。以下代码如下:

    File tmpFile = new File(inputFile.getAbsolutePath() + ".lck"); FileChannel fileChannel = MyFileLocker.getChannelCache().get(new File(inputFile.getAbsolutePath())); InputStream inputStream = Channels.newInputStream(fileChannel); ByteStreams.copy(inputStream, Files.newOutputStreamSupplier(tmpFile));

    这里的 inputFile 是我的锁定文件,而 tmp 文件是具有锁定文件内容的新文件。我还在我的储物柜类中创建了一个方法来解锁和删除

        public void unlockAndDelete(File fileToUnlockandDelete) {
        FileLock lock = lockCache.get(fileToUnlockandDelete);
        String fileName = fileToUnlockandDelete.getName();
        try {
        if(lock!=null){
            lock.release();
            channelCache.get(fileToUnlockandDelete).close();
            //remove from cache
            lockCache.remove(fileToUnlockandDelete);
            channelCache.remove(fileToUnlockandDelete);
            boolean isFiledeleted = fileToUnlockandDelete.delete();
            if(isFiledeleted){
                System.out.println("File deleted successfully" + fileName);
            }else{
                System.out.println("File is not deleted."+fileName);
            }
        }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    
    }
    

    【讨论】:

    • @Artem Bilan 你看到这里有什么问题吗,我应该担心。
    • M-m-m。一切看起来都很好。除非我建议您通过锁定文件的通道复制字节。所以,我的回答仍然可以接受:-)。 OTOH 你有没有试过只依赖`Channels.newInputStream(fileChannel);? Looking into its source code I'd say it is good way to go. It still uses the same ByteBuffer`,但在InputStream .read() 期间有一个小的。所以,你仍然可以使用 FlatfileItemReader,但仍然需要自定义 BufferedReaderFactory 来围绕锁定文件执行逻辑!
    • 为什么我需要'BufferedReaderFactory'?在我的 FlatfileItemReader 中,我提供了我的 'tmpFile' 作为资源。我在这里错过了什么吗?
    • 您错过了完全可以避免使用 tmp 文件的事实。感谢自定义BufferedReaderFactory 中的Channels.newInputStream(fileChannel);,我们可以正常流式传输锁定的文件!因此我在 tmp 文件中看不到原因。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-05-13
    • 2018-07-01
    • 2022-07-04
    • 1970-01-01
    • 2021-02-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多